2016-11-25 61 views
0

我有一個RDD(test_rdd),如下Pyspark保存RDD卡桑德拉

[ { 'user_lname':u'TEst1' , 'USER_ID':u'2aa8ae30-c0e5-48bb-AB16-a2ed2e78c8c3' , 'user_phone':u'1234567890','user_fname':u'TestingTesting2','amount':1222,'event_timestamp':u'2016-09-29T07:49:50.866 + 00:00'},

{'user_lname':u'TEst2','user_id':u'2aa8ae30-c0e5-48bb-ac16-a2ed2e78c8c3','user_phone':u'1234567891','user_fname':u'TestingTesting','amount':12 ,'event_timestamp':u'2016-10-27T07:49:50.866 + 00:00'},

{'user_lname':u'test3','u ser_id':u'2aa8ae30-c1e5-48bb-ab16-a2ed2e78c8c3','user_phone':u'1234567892','user_fname':u'TestingTesting3','amount':122,'event_timestamp':u'2016-09- 27T07:49:50.866 + 00:00'} ]

我想將上面的RDD保存到cassandra表中。
我得到下面的錯誤,當我使用
test_rdd.saveToCassandra( 「keyspace1」, 「表1」)

回溯(最近通話最後一個):
文件「/var/spark/test/k.py 」,線路179,在
parsed_data.saveToCassandra( 「keyspace1」, 「表1」)
AttributeError的: 'PipelinedRDD' 對象沒有屬性 'saveToCassandra'

回答

0

要麼

  • 遵循官方spark-cassandra-connector
  • 指令轉換爲DataFrametoDF
  • Dataframe

    df.write.format("org.apache.spark.sql.cassandra").options(
        table=table, keyspace=keyspace 
    ).save() 
    
+1

謝謝。我使用第二種方法。 pyspark-cassandra和spark-cassandra-connector之間的區別是什麼。 –