這是我的Spark應用程序的一部分。第一部分是我在過去1小時內獲得所有文章的部分,代碼的第二部分抓住了所有這些文章的評論。第三部分將評論添加到文章中。 問題是,articles.map(lambda x:(x.id,x.id)).join(axes)
部分太慢,大約需要1分鐘。我想把這個提高到10秒甚至更少,但不知道如何去做?感謝您的回覆。如何通過在spark數據框API中加入來實現服務器端過濾
articles = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="articles", keyspace=source).load() \
.map(lambda x:x).filter(lambda x:x.created_at!=None).filter(lambda x:x.created_at>=datetime.now()-timedelta(hours=1) and x.created_at<=datetime.now()-timedelta(hours=0)).cache()
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().map(lambda x:(x.article,x))
speed_rdd = articles.map(lambda x:(x.id,x.id)).join(axes)
編輯
這是我的新代碼,根據您的建議,我改變了。現在已經是以前的2倍了,所以非常感謝;)。又一個改進我想提出與我在軸部分的代碼,它仍然太慢,需要38秒30萬個數據的最後一部分:
range_expr = col("created_at").between(
datetime.now()-timedelta(hours=timespan),
datetime.now()-timedelta(hours=time_delta(timespan))
)
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').persist()
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
我想這在這裏(應該替代最後軸的我的代碼部分),這也是我想有解決方案,但它似乎沒有正常工作:
in_expr = col("article").isin(article_ids.collect())
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().where(in_expr)
我總是收到此錯誤信息:
in_expr = col("article").isin(article_ids.collect())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'Column' object is not callable
謝謝你的幫助。
a)'isin'已經在1.5中引入b)我非常確定你想要的是首先壓扁這個'article_ids.collect()'。 – zero323
我試圖壓扁它,但它並沒有真正提高速度。加載時,這部分sqlContext.read。格式有一個默認的分區號爲255,我想使它更小,因爲這部分根據分區從cassandra讀取是緩慢的,但不知道如何。有任何想法嗎?謝謝 – peter