2016-10-02 69 views
2

我有一個非常複雜的查詢,需要連接9個或更多的表,有一些'group by'表達式。這些表中的大多數具有幾乎相同的行數。這些表格也有一些列可以用作分區表的「關鍵」。如何讓Spark處理更大的數據集?

此前,該應用程序運行良好,但現在的數據集有3〜4倍的數據。我的測試表明,如果每個表的行數少於4,000,000,應用程序仍然可以很好地運行。但是,如果計數不止於此,應用程序將寫入數百TB的洗牌和應用程序停頓(無論我如何調整內存,分區,執行程序等)。實際數據可能只有幾十Gs。

我會認爲,如果分區工作正常,Spark不應該洗牌這麼多,並應該在每個節點上完成連接。令人費解的是,爲什麼Spark不那麼聰明呢?

我可以將數據集(使用上面提到的「關鍵字」)分成許多數據集,這些數據集可以獨立處理。但是這個負擔將會在我自己身上......它折扣了使用Spark的原因。還有哪些其他方法可以幫助?

我使用Spark 2.0 over Hadoop YARN。

+0

你是如何調整分區的? –

回答

0

我的測試結果是如果每個表的行數小於4,000,000,應用程序仍然可以很好地運行。然而,如果計數比更大時,應用程序寫入數百改組TB的

當加入的數據集,如果一邊的尺寸小於一定配置的大小,火花廣播整個表給每個執行器,以便該連接可以在本地執行。您的上述觀察與此一致。您也可以直接向火花提供廣播提示,如df1.join(broadcast(df2))

除此之外,您能否提供關於您的問題的更多細節?

[前段時間,我也在爲加入和洗牌問題而苦苦掙扎,因爲我們需要處理幾個TB的工作。我們正在使用RDD(而不是數據集API)。我寫了關於我的發現[這裏] 1。這些可能是一些使用到你嘗試的原因有關基礎數據交叉]

更新:根據documentation - spark.sql.autoBroadcastJoinThreshold是可配置的屬性鍵。 10 MB是其默認值。它執行以下操作:

配置在執行連接時將廣播到所有工作節點的表的最大大小(以字節爲單位)。通過將此值設置爲-1,可以禁用廣播。請注意,當前統計信息僅支持運行命令ANALYZE TABLE COMPUTE STATISTICS noscan的Hive Metastore表。

顯然,這僅支持Hive表。

+0

感謝您的幫助。我看着DAG並確認了你的理解。我目前的實現是使用sql,將所有DataFrames(Spark 2.0中的數據集)註冊爲表格,但我不知道如何廣播表格。我曾經試驗過首先廣播DataFrames,並將它們註冊爲表格。有時看起來它播放,有時卻不播放。但即使是「廣播」,該節目仍然會在某個時刻停滯不前。 –

+1

您提到「如果一邊的尺寸小於某個可配置的尺寸」。什麼是參數?謝謝! –

+0

我更新了我的答案,以包含來自Spark Sql文檔的相關位。 (我相信我的原始答案沒有說明問題,可能還有更多需要複製的內容。)您能否提供更多關於您擁有什麼樣的數據,以及從哪裏加載數據的詳細信息。我希望能夠重現它。 –