我以這種方式解釋你的問題:
我想從所有Spark執行程序的RDD中插入數據。我試圖在驅動程序上創建一個數據庫連接,並以某種方式將它作爲廣播傳遞給執行者,但Spark一直在投擲NotSerializableException
。我怎樣才能實現我的目標?
簡短的回答是:
您需要單獨創建的每一個執行節點上的一個新的連接。
您不應該將數據庫連接處理程序,文件處理程序等等傳遞給其他進程,尤其是遠程計算機。
這裏的問題是哪兒來創建數據庫連接,因爲有大量的執行者可以很容易地超過DB連接池的大小。
什麼你其實可以做的是使用foreachPartition,喜歡這裏:
// numPartitions == number of simultaneous DB connections you can afford
yourRdd.repartition(numPartitions)
.foreachPartition {
iter =>
val connection = createConnection()
while (iter.hasNext) {
connection.execute("INSERT ...")
}
connection.commit()
}
這裏面.foreachPartition
代碼將每個執行者機器上執行,連接對象將不會通過網絡發送,你贏了沒有序列化異常,數據將被插入。
關於使用foreachPartition
的相同推理也在this問題的答案中提到。
數據庫對象的意思是? DTO或其他東西? –
請查看[如何創建最小,完整和可驗證的示例](https://stackoverflow.com/help/mcve)並相應地重寫您的問題。 –
我創建了一個處理連接數據庫和所有數據庫交互的類。 –