2016-11-28 102 views
1

我正在使用Zeppelin 0.6.2和Spark 2.0。如何使用更新值更新給定另一個DataFrame的DataFrame?

我想在循環內執行一個查詢,它不是很有效。

我需要爲數據幀的每一行循環大約5000行並執行一個查詢,它將在另一個數據幀中增加一個值。

這裏是我的嘗試吧:

val t2 = time 
t2.registerTempTable("t2") 
u.collect().foreach{ r => 
println(r(0)) 
val c=r(1) 
val start="\""+r(2)+"\"" 
val end="\""+r(3)+"\"" 
sql("INSERT INTO TABLE t2 SELECT time, recordings + "+c+" AS recordings FROM time WHERE time >= " + start + " AND time < " + end) 
} 

我想取兩個dataframes的一小部分,但它仍然很慢。我覺得我沒有這樣做。

任何想法如何快速更新數據幀?

回答

1

我需要爲數據幀的每一行循環約5000行並執行一個查詢,它將在另一個數據幀中增加一個值。

我可以看到u,timet2表。 t2是別名time因此您可以稍後在INSERT查詢中使用它。對?

PROTIP:我很高興有他們的模式。

讓我們假設你有5000行數據幀稱爲df5k

// it's a fake 5k = a mere 5 rows for the sake of simplicity 
// I think `u` is your 5k table (that you unnecessarily `collect` to `foreach`) 
val u = Seq(
    (0, 0, 0, 3), 
    (1, 3, 4, 5), 
    (2, 6, 6, 8), 
    (3, 9, 9, 17)).toDF("id", "c", "start", "end") 

// I think `t2` is an alias for `time` and you want to update `t2` 
val time = Seq(
    (1, 10), 
    (4, 40), 
    (9, 90)).toDF("time", "recordings") 

// this is the calculation of the new records 
val new_t2 = u.join(time) 
    .where('time >= 'start) 
    .where('time < 'end) 
    .withColumn("recordings + c", 'recordings + 'c) 
    .select('time, $"recordings + c" as 'recordings) 

// the following is an equivalent of INSERT INTO using Dataset API 
val solution = time.union(new_t2) 

注意:你沒有更新的數據幀,但建立新的價值觀的新數據幀。

+0

非常感謝!我甚至不確定我的問題對其他人有意義。我使用你的輸入修改了我的代碼,並且我收到了一條消息,說我應該添加spark.conf.set(「spark.sql.crossJoin.enabled」,true)來做到這一點,並在那一刻我意識到,交叉連接是解決方案I需要而不是一個foreach行。 – ieaiaio

+0

有趣。我每天都在使用2.1.0-SNAPSHOT,所以我們確實在使用不同版本的Spark,但我不知道在2.x版本之間可能會有如此重要的變化。你的Spark版本究竟是什麼?使用'spark.version'來找出它。你絕對應該堅持Spark SQL計算「事物」的方式。如果它解決了您的使用案例,請儘早接受我的答案。謝謝! –

+0

順便說一句,能否請您將最新版本的代碼添加到問題中(以便其他人也可以從您的更改中受益)?我和其他SOers會非常感激。我也可以幫助你使用'spark.conf.set(「spark.sql.crossJoin.enabled」,true)''。謝謝! –

相關問題