0

我使用SparkSQL 2.2.0從Cassandra加載數據並將其索引到Elasticsearch。我擁有的數據由客戶(第一張表people)和訂單(第二張表orders)組成。
表格訂單具有指向相應客戶的列person_id
我的需求是查詢(並在Elasticsearch稍後索引)people表和orders,因此我可以爲每個客戶購買她購買的訂單數量。
我想出的最簡單的方法是在person_id列中將兩個表讀入org.apache.spark.sql.Dataset<Row> s並在加入。然後我groupBy(person_id)
這給了我一個數據集有兩列:person_idcount,我不得不加入people表,所以我可以計數與其他人的數據。SparkSQL加入父/子數據集

Dataset<Row> peopleWithOrders = people.join(orders, people.col("id").equalTo(orders.col("person_id")), "left_outer"); 

Dataset<Row> peopleOrdersCounts = peopleWithOrders.groupBy("id").count().withColumnRenamed("id", "personId"); 

Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer") 
      .withColumnRenamed("count", "nbrOfOrders") 
      .select("id", "name", "birthDate", "nbrOfOrders"); 

people表具有1_000_000行和orders一個2_500_000。每個客戶有2或3個訂單。
我正在使用MAC Book專業版,配備2,2 GHz Intel Core i7處理器和16 GB 1600 MHz DDR3內存。所有Cassandra,Spark 2.2 master和(single)worker都在同一臺機器上。
這3個連接需要15到20秒。
我的問題是:是否有性能提升的空間。做窗口聚合函數有益處,因爲我在日誌中看到ShuffleMapTask。

在此先感謝

回答

0

我認爲第一步是不必要的。你可以這樣做:

Dataset<Row> peopleOrdersCounts = orders.groupBy("person_id").count(); 

Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer") 
      .withColumnRenamed("count", "nbrOfOrders") 
      .select("id", "name", "birthDate", "nbrOfOrders"); 

我希望這會有所幫助。

+0

是的,的確如此。我的錯。但這仍然「相對緩慢」(ab 16s)。我想知道「窗口聚合函數」是否會有幫助,或者這是正常的方法 –

+0

據我所知,這是實現它的方法。特別是在「group by」的情況下。您可以查看用戶定義的聚合函數(UDAF),但即使這些函數也適用於特定情況。有沒有其他的操作可能會減慢速度? – Nikhil