5
我有一個火花招聘以下輸入數據(在木地板):星火據幀與範圍內緩慢加入
Person (millions of rows)
+---------+----------+---------------+---------------+
| name | location | start | end |
+---------+----------+---------------+---------------+
| Person1 | 1230 | 1478630000001 | 1478630000010 |
| Person2 | 1230 | 1478630000002 | 1478630000012 |
| Person2 | 1230 | 1478630000013 | 1478630000020 |
| Person3 | 3450 | 1478630000001 | 1478630000015 |
+---------+----------+---------------+---------------+
Event (millions of rows)
+----------+----------+---------------+
| event | location | start_time |
+----------+----------+---------------+
| Biking | 1230 | 1478630000005 |
| Skating | 1230 | 1478630000014 |
| Baseball | 3450 | 1478630000015 |
+----------+----------+---------------+
,我需要把它改造成以下預期結果:
[{
"name" : "Biking",
"persons" : ["Person1", "Person2"]
},
{
"name" : "Skating",
"persons" : ["Person2"]
},
{
"name" : "Baseball",
"persons" : ["Person3"]
}]
換句話說:結果是每個事件的列表,每個事件都有一個參與這個事件的人員的列表。
一個人算作如果
Person.start < Event.start_time
&& Person.end > Event.start_time
&& Person.location == Event.location
我曾嘗試不同的方法參加,但實際上似乎工作的唯一一個是 加入兩個dataframes,然後組/按事件聚集他們。 但是,連接速度非常慢,並且不能很好地分佈在多個CPU內核中。
當前的加入代碼:
final DataFrame fullFrame = persons.as("persons")
.join(events.as("events"), col("persons.location").equalTo(col("events.location"))
.and(col("events.start_time").geq(col("persons.start")))
.and(col("events.start_time").leq(col("persons.end"))), "inner");
//count to have an action
fullFrame.count();
我使用星火獨立和Java,如果有差別。
有沒有人有更好的主意如何解決與Spark 1.6.2這個問題?
實際上,使用「廣播連接」改進了很多,我不得不將事件表分解成多個適合內存的小塊,並逐個加入它們。 –