2016-11-08 98 views
5

我有一個火花招聘以下輸入數據(在木地板):星火據幀與範圍內緩慢加入

Person (millions of rows) 
+---------+----------+---------------+---------------+ 
| name | location |  start  |  end  | 
+---------+----------+---------------+---------------+ 
| Person1 |  1230 | 1478630000001 | 1478630000010 | 
| Person2 |  1230 | 1478630000002 | 1478630000012 | 
| Person2 |  1230 | 1478630000013 | 1478630000020 | 
| Person3 |  3450 | 1478630000001 | 1478630000015 | 
+---------+----------+---------------+---------------+ 


Event (millions of rows) 
+----------+----------+---------------+ 
| event | location | start_time | 
+----------+----------+---------------+ 
| Biking |  1230 | 1478630000005 | 
| Skating |  1230 | 1478630000014 | 
| Baseball |  3450 | 1478630000015 | 
+----------+----------+---------------+ 

,我需要把它改造成以下預期結果:

[{ 
    "name" : "Biking", 
    "persons" : ["Person1", "Person2"] 
}, 
{ 
    "name" : "Skating", 
    "persons" : ["Person2"] 
}, 
{ 
    "name" : "Baseball", 
    "persons" : ["Person3"] 
}] 

換句話說:結果是每個事件的列表,每個事件都有一個參與這個事件的人員的列表。

一個人算作如果

Person.start < Event.start_time 
&& Person.end > Event.start_time 
&& Person.location == Event.location 

我曾嘗試不同的方法參加,但實際上似乎工作的唯一一個是 加入兩個dataframes,然後組/按事件聚集他們。 但是,連接速度非常慢,並且不能很好地分佈在多個CPU內核中。

當前的加入代碼:

final DataFrame fullFrame = persons.as("persons") 
    .join(events.as("events"), col("persons.location").equalTo(col("events.location")) 
       .and(col("events.start_time").geq(col("persons.start"))) 
       .and(col("events.start_time").leq(col("persons.end"))), "inner"); 

//count to have an action 
fullFrame.count(); 

我使用星火獨立和Java,如果有差別。

有沒有人有更好的主意如何解決與Spark 1.6.2這個問題?

回答

1

範圍連接是作爲交叉產品執行的,具有後續的過濾步驟。一個潛在的更好的解決方案可能是,廣播表可能更小events表,然後映射persons表:在地圖內部,檢查連接條件併產生相應的結果。

+0

實際上,使用「廣播連接」改進了很多,我不得不將事件表分解成多個適合內存的小塊,並逐個加入它們。 –