2017-02-09 149 views
0

我正在嘗試用Cassandra做java Spark流式傳輸。我已經對Scala做過同樣的事情,但我不知道如何繼續使用Java。網絡沒有給我任何java火花流和Cassandra的例子。用Cassandra進行Java Spark流式傳輸

能有一個人請告訴我如何有下面的Scala代碼在Java中:

import org.apache.spark.streaming.dstream.ConstantInputDStream 

val ssc = new StreamingContext(conf, Seconds(10)) 

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

val dstream = new ConstantInputDStream(ssc, cassandraRDD) 

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output 
    println(rdd.collect.mkString("\n")) 
} 
ssc.start() 
ssc.awaitTermination() 

任何幫助表示讚賞。謝謝

回答

1

在您的foreachRDD轉換中,您可以按照cassandra表格格式轉換數據。

JavaRDD<TestBean> cassandraRDD = testRDD 
       .flatMap(new FlatMapFunction<Tuple2<String, List<Map<String, Object>>>, TestBean>() { 

        private static final long serialVersionUID = 1L; 

        @Override 
        public Iterable<TestBean> call(Tuple2<String, List<Map<String, Object>>> tuple) throws Exception { 

         return rawData; 
        } 
       }); 

      javaFunctions(jsonRDD).writerBuilder(CASSANDRA_KEYSPACE,CASSANDRA_TABLE, mapToRow(TestBean.class)).saveToCassandra(); 
+0

這些都是我的import語句 進口靜態com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; –