0

我使用Spark Structured Streaming進行實時機器學習,並且希望將預測存儲在我的Cassandra集羣中。Spark(SQL /結構化流式傳輸)Cassandra - PreparedStatement

由於我處於流式上下文中,每秒執行多次相同的請求,所以一個強制優化是使用PreparedStatement。

在卡桑德拉火花驅動程序(https://github.com/datastax/spark-cassandra-connector)有沒有辦法在使用PreparedStatement(Scala中或Python,我不考慮Java作爲一個選項)

我應該用一個斯卡拉(https://github.com/outworkers/phantom)/蟒蛇( https://github.com/datastax/python-driver)cassandra驅動程序? 它是如何工作的,那麼我的連接對象需要被序列化以傳遞給工人?

如果有人能幫助我!

謝謝:)

回答

1

爲了在卡桑德拉做一個準備好的聲明,然後註冊數據和結構化的火花流,同時還處理流,你需要:

  • 進口com.datastax.driver.core。會議
  • 進口com.datastax.spark.connector.cql.CassandraConnector

然後,建立你的連接器:

val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf) 

有兩個會議連接器,你現在就可以給你打電話聲明斯卡拉類

connector.withSessionDo { session => 
Statements.PreparedStatement() 

}

你終於可以寫的準備好的聲明功能用Cassandra將數據寫入以下函數完成,cql是結合變量到準備好的聲明,並執行它的功能:

private def processRow(value: Commons.UserEvent) = { 
    connector.withSessionDo { session => 
    session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream)) 
} 

}

當然,你必須在foreach作家調用這個函數(processRow

 // This Foreach sink writer writes the output to cassandra. 
import org.apache.spark.sql.ForeachWriter 
val writer = new ForeachWriter[Commons.UserEvent] { 
    override def open(partitionId: Long, version: Long) = true 
    override def process(value: Commons.UserEvent) = { 
    processRow(value) 
    } 
    override def close(errorOrNull: Throwable) = {} 
} 

val query = 
    ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start 
相關問題