1

下面的代碼片段:任務不可序列試圖通過Java SDK編寫從星火據幀到Dynamodb記錄時

val client = AmazonDynamoDBClientBuilder.standard.withRegion(Regions.the_region).withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("access_key", "secret_key"))).build() 
val dynamoDB = new DynamoDB(client) 
val table = dynamoDB.getTable("tbl_name") 

def putItem(email: String, name: String): Unit = { 
    val item = new Item().withPrimaryKey("email", email).withNumber("ts", System.currentTimeMillis).withString("name", name) 
    table.putItem(item) 
} 

spark.sql(""" 
select 
    email, 
    name 
from db.hive_table_name 
""").rdd.repartition(40).map(row => putItem(row.getString(0), row.getString(1))).collect() 

我打算寫通過Java SDK每個記錄Dynamodb表由AWS提供的,但它抱怨如下錯誤:

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.map(RDD.scala:369) 

我怎麼能爲了以便採取星火工作並行的優勢,創建每個分區的DynamoDBTable對象調整代碼。謝謝!

+0

'DynamoDB'無法序列化... –

回答

1

代替mapcollect我會使用foreachPartition

spark.sql(query).rdd.repartition(40).foreachPartition(iter => { 

    val client = AmazonDynamoDBClientBuilder.standard.withRegion(Regions.the_region) 
    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("access_key", "secret_key"))).build() 
    val dynamoDB = new DynamoDB(client) 
    val table = dynamoDB.getTable("tbl_name") 


    iter.foreach(row => putItem(row.getString(0), row.getString(1))) 
})