2017-10-08 242 views
0

我使用Spark 2.1和Cassandra(3.9)作爲數據源。 C *有一個有50列的大表,對我的用例來說這不是一個好的數據模型。所以我爲每個傳感器創建了拆分表以及分區鍵和集羣鍵列。Spark從一個數據幀創建多個數據幀

All sensor table 
----------------------------------------------------- 
| Device | Time  | Sensor1 | Sensor2 | Sensor3 | 
| dev1 | 1507436000 | 50.3 | 1 | 1 | 
| dev2 | 1507436100 | 90.2 | 0 | 1 | 
| dev1 | 1507436100 | 28.1 | 1 | 1 | 
----------------------------------------------------- 
Sensor1 table 
------------------------------- 
| Device | Time  | value | 
| dev1 | 1507436000 | 50.3 | 
| dev2 | 1507436100 | 90.2 | 
| dev1 | 1507436100 | 28.1 | 
------------------------------- 

現在我正在使用spark將舊錶中的數據複製到新表中。

df = spark.read\ 
    .format("org.apache.spark.sql.cassandra")\ 
    .options(table="allsensortables", keyspace="dataks")\ 
    .load().cache() 
df.createOrReplaceTempView("data") 
query = ('''select device,time,sensor1 as value from data ''') 
vgDF = spark.sql(query) 
vgDF.write\ 
    .format("org.apache.spark.sql.cassandra")\ 
    .mode('append')\ 
    .options(table="sensor1", keyspace="dataks")\ 
    .save() 

逐個複製數據對於單個表格需要很多時間(2.1)小時。有什麼辦法可以select *併爲每個傳感器創建多個df並立即保存? (或者甚至是依次)。在代碼

+0

你如何使用spark-shell命令運行你的代碼? –

+0

@Vijay_Shinde使用spark-submit – Junaid

+0

Ok @Junaid,嘗試增加驅動程序內存和執行程序內存。它會幫助你。 –

回答

0

的一個問題是緩存

df = spark.read\ 
.format("org.apache.spark.sql.cassandra")\ 
.options(table="allsensortables", keyspace="dataks")\ 
.load().cache() 

在這裏,我怎麼看不到DF多次使用從保存分開。所以這裏的緩存是反生產的。您正在讀取數據,將其過濾並保存到單獨的cassandra表中。現在在數據框上發生的唯一行爲就是保存,沒有別的。

因此,在這裏緩存數據沒有任何好處。刪除緩存會給你一些加速。

按順序創建多個表。我建議使用partitionBy並將數據首先寫入HDFS作爲分區數據w.r.t傳感器,然後將其寫回cassandra。

+0

謝謝,將刪除緩存並測試。讓我試試hdfs和partitionBy。另外在連接器中有c *端過濾功能,但我不確定它如何在python中使用。如果選擇添加它說對象沒有屬性'選擇'。 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md – Junaid

+0

是的,cassandra允許謂詞下推。不確定在python中是否存在lib –