2

我想從Kafka讀取數據並通過Spark RDD存儲到Cassandra表中。值分裂不是(String,String)的成員

獲取錯誤,而編譯代碼:

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String) 

[error]  val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
[error]            ^
[error] one error found 

[error] (compile:compileIncremental) Compilation failed 

下面的代碼:當我通過互動spark-shell手動運行該代碼,它工作正常,但而​​錯誤編譯代碼來。

// Create direct kafka stream with brokers and topics 
val topicsSet = Set[String] (kafka_topic) 
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

// Create the processing logic 
// Get the lines, split 
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long")) 
+0

@RameshMaharjan:請不要格式化專有名詞作爲代碼。卡夫卡和卡桑德拉只需要一個初始資金,就是這樣 - 他們本身並不是代碼。然而,像'spark-shell'這樣的東西都可以,因爲代碼格式適合於控制檯I/O(假定'spark-shell'是一個鍵入的命令)。 – halfer

回答

1

KafkaUtils.createDirectStream返回的鍵和值的元組(因爲在卡夫卡消息被任選鍵控)。在你的情況下,它的類型是(String, String)。如果您要拆分的,你必須首先把它拿出來:

val lines = 
    messages 
    .map(line => line._2.split(',')) 
    .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 

或者使用部分函數的語法:

val lines = 
    messages 
    .map { case (_, value) => value.split(',') } 
    .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
2

在卡夫卡的所有消息是有方向性的。原始的Kafka流,在這種情況下爲messages,是一個元組流(key,value)

而且由於編譯錯誤指出,元組上沒有split方法。

我們要在這裏做的是:

messages.map{ case (key, value) => value.split(','))} ... 
相關問題