2017-05-26 91 views
0

我有一個數據庫對象,用於從所有Spark執行器插入數據。當我將此對象定義爲static時,它在這些執行程序中具有null值。所以我在驅動程序中聲明它,然後播放它,然後在每個執行者中獲得它的價值。當運行應用程序時,下面的拋出異常:在Spark中如何定義要廣播的對象Java

Exception in thread "main" java.io.NotSerializableException: database.Database 

注:

  • 執行人類是可序列化
  • 我除去廣播對象是在該類別定義爲瞬態瞬態但它沒有工作
+0

數據庫對象的意思是? DTO或其他東西? –

+0

請查看[如何創建最小,完整和可驗證的示例](https://stackoverflow.com/help/mcve)並相應地重寫您的問題。 –

+0

我創建了一個處理連接數據庫和所有數據庫交互的類。 –

回答

1

我以這種方式解釋你的問題:

我想從所有Spark執行程序的RDD中插入數據。我試圖在驅動程序上創建一個數據庫連接,並以某種方式將它作爲廣播傳遞給執行者,但Spark一直在投擲NotSerializableException。我怎樣才能實現我的目標?

簡短的回答是:

您需要單獨創建的每一個執行節點上的一個新的連接。
您不應該將數據庫連接處理程序,文件處理程序等等傳遞給其他進程,尤其是遠程計算機。

這裏的問題是哪兒來創建數據庫連接,因爲有大量的執行者可以很容易地超過DB連接池的大小。

什麼你其實可以做的是使用foreachPartition,喜歡這裏:

// numPartitions == number of simultaneous DB connections you can afford 
    yourRdd.repartition(numPartitions) 
    .foreachPartition { 
    iter => 
     val connection = createConnection() 
     while (iter.hasNext) { 
     connection.execute("INSERT ...") 
     } 
     connection.commit() 
    } 

這裏面.foreachPartition代碼將每個執行者機器上執行,連接對象將不會通過網絡發送,你贏了沒有序列化異常,數據將被插入。

關於使用foreachPartition的相同推理也在this問題的答案中提到。

相關問題