2017-08-08 46 views
1

所以據我所知,Apache Spark沒有模仿更新 SQL命令的功能。就像,我可以在一定條件下更改列中的單個值。唯一的方法就是使用我被指示使用的以下命令(這裏是在Stackoverflow中):withColumn(columnName, where('condition', value));更新數據集中的值

但是,條件應該是column類型,這意味着我必須使用內置的列過濾函數apache has (equalTo,isin,lt,gt等)。有沒有一種方法,我可以改爲使用SQL語句,而不是那些內置函數?

問題是我給出的SQL語句的文本文件,像WHERE ID > 5WHERE AGE != 50,等等。然後我根據這些條件,標籤值,我以爲繼withColumn()的做法,但我不能插 - 在該函數中的SQL語句中。任何想法如何我可以解決這個問題?

回答

1

我找到了一種方法去解決這個:

你想你的數據集,分爲兩組:要更新的價值和價值觀,你不想更新

Dataset<Row> valueToUpdate = dataset.filter('conditionToFilterValues'); 
Dataset<Row> valuesNotToUpdate = dataset.except(valuesToUpdate); 

valueToUpdate = valueToUpdate.withColumn('updatedColumn', lit('updateValue')); 

Dataset<Row> updatedDataset = valuesNotToUpdate.union(valueToUpdate); 

然而,這不保存記錄的原始數據集的順序相同,因此,如果順序是重要的給你,這是不夠的你的需求。

0

如果您正在使用數據框,您可以用df.registerTempTable(「事件」)

然後你就可以像查詢,註冊該數據幀作爲臨時表, sqlContext.sql(「SELECT * FROM事件」 +)

+0

但我仍然無法更新數據集,因爲火花不接受**更新**聲明 –

+0

一旦你得到一個基於輸入條件過濾數據框,您可以創建具有更新值的新列在數據框中。 – magic

0

when條款轉化爲case clause,你可以涉及到SQL case子句。

scala> val condition_1 = when(col("col_1").isNull,"NA").otherwise("AVAILABLE") 
condition_1: org.apache.spark.sql.Column = CASE WHEN (col_1 IS NULL) THEN NA ELSE AVAILABLE END 

,或者你可以鏈when條款以及

scala> val condition_2 = when(col("col_1") === col("col_2"),"EQUAL").when(col("col_1") > col("col_2"),"GREATER"). 
    | otherwise("LESS") 
condition_2: org.apache.spark.sql.Column = CASE WHEN (col_1 = col_2) THEN EQUAL WHEN (col_1 > col_2) THEN GREATER ELSE LESS END 

scala> val new_df = df.withColumn("condition_1",condition_1).withColumn("condition_2",condition_2) 

不過,如果你想使用的表,那麼你就可以在你的數據幀/數據集作爲temperory表登記並執行sql查詢

df.createOrReplaceTempView("tempTable")//spark 2.1 + 
df.registerTempTable("tempTable")//spark 1.6 
現在

,你可以進行SQL查詢

spark.sql("your queries goes here with case clause and where condition!!!")//spark 2.1 
sqlContest.sql("your queries goes here with case clause and where condition!!!")//spark 1.6