2016-05-30 27 views
1

這是我的Spark應用程序的一部分。第一部分是我在過去1小時內獲得所有文章的部分,代碼的第二部分抓住了所有這些文章的評論。第三部分將評論添加到文章中。 問題是,articles.map(lambda x:(x.id,x.id)).join(axes)部分太慢,大約需要1分鐘。我想把這個提高到10秒甚至更少,但不知道如何去做?感謝您的回覆。如何通過在spark數據框API中加入來實現服務器端過濾

articles = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="articles", keyspace=source).load() \ 
         .map(lambda x:x).filter(lambda x:x.created_at!=None).filter(lambda x:x.created_at>=datetime.now()-timedelta(hours=1) and x.created_at<=datetime.now()-timedelta(hours=0)).cache() 

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().map(lambda x:(x.article,x)) 

speed_rdd = articles.map(lambda x:(x.id,x.id)).join(axes) 

編輯

這是我的新代碼,根據您的建議,我改變了。現在已經是以前的2倍了,所以非常感謝;)。又一個改進我想提出與我在軸部分的代碼,它仍然太慢,需要38秒30萬個數據的最後一部分:

range_expr = col("created_at").between(
          datetime.now()-timedelta(hours=timespan), 
          datetime.now()-timedelta(hours=time_delta(timespan)) 
         ) 
     article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').persist() 


     axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 

我想這在這裏(應該替代最後軸的我的代碼部分),這也是我想有解決方案,但它似乎沒有正常工作:

in_expr = col("article").isin(article_ids.collect()) 
     axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().where(in_expr) 

我總是收到此錯誤信息:

in_expr = col("article").isin(article_ids.collect()) 
Traceback (most recent call last):            
    File "<stdin>", line 1, in <module> 
TypeError: 'Column' object is not callable 

謝謝你的幫助。

回答

2

由於mentioned before如果要實現合理的性能,請不要將您的數據轉換爲RDD。它不僅不僅使像謂詞下推這樣的優化變得不可能,而且還引入了將數據從JVM移動到Python的巨大開銷。

相反,你應該使用使用SQL表達式/ DataFrame API類似這樣的方式:

from pyspark.sql.functions import col, expr, current_timestamp 

range_expr = col("created_at").between(
    current_timestamp() - expr("INTERVAL 1 HOUR"), 
    current_timestamp()) 

articles = (sqlContext.read.format("org.apache.spark.sql.cassandra") 
    .options(...).load() 
    .where(col("created_at").isNotNull()) # This is not really required 
    .where(range_expr)) 

應該也可以使用標準的Python工具來制定謂詞表達你做之前:

import datetime 

range_expr = col("created_at").between(
    datetime.datetime.now() - datetime.timedelta(hours=1), 
    datetime.datetime.now() 
) 

隨後join應在不移動數據出的數據幀,以及執行0

+1

a)'isin'已經在1.5中引入b)我非常確定你想要的是首先壓扁這個'article_ids.collect()'。 – zero323

+0

我試圖壓扁它,但它並沒有真正提高速度。加載時,這部分sqlContext.read。格式有一個默認的分區號爲255,我想使它更小,因爲這部分根據分區從cassandra讀取是緩慢的,但不知道如何。有任何想法嗎?謝謝 – peter

2

1)謂詞下推自動由火花卡桑德拉連接器,只要該過濾是在卡桑德拉可能(使用用於過濾或仲索引主鍵)檢測:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushing-down-clauses-to-cassandra

2)爲了更有效聯接,你可以調用方法repartitionByCassandraReplica。不幸的是,這種方法可能不適用於PySpark,只適用於Scala/Java API。閱讀文檔:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

3)另一個提示是嘗試調試和了解連接器如何創建Spark分區。在文檔中提到了一些示例和注意事項:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md

相關問題