1

我的火花流應用程序有以下幾行:火花Datastax卡桑德拉連接失敗大聲不可思議

我試圖寫在卡桑德拉表一系列對象(以及到一個文本文件)。我有以下代碼:

val rmqReceiver = new RMQReceiver(queueIp, "vehicle-data") 
val statusMessageStream = myStreamingContext.receiverStream[String](rmqReceiver) 
val vsStream = customReceiverStream.map(jsonToVehicleStatus) 

customReceiverStream.foreachRDD((vs: RDD[String])=> vs.saveAsTextFile("/var/log")) 
vsStream.foreachRDD((vs: RDD[Vehicle_Status])=> vs.saveToCassandra("vehicle_data","vehicles",AllColumns)) 
vsStream.foreachRDD((vs: RDD[Vehicle_Status])=> vs.saveToCassandra("vehicle_data","vehicle_locations",AllColumns)) 

我已經嘗試了一堆的變化,但在這裏會發生什麼:

文本文件被寫入(有時)

第一個電話「saveToCassandra」成功保存記錄 第二次調用拋出以下列出的異常?

我覺得我錯過了一些明顯的東西,我只是沒有看到它是什麼。

產生java.io.IOException:無法準備的語句INSERT INTO 「vehicle_data」 「vehicle_locations」( 「時間戳」, 「vehicle_id」, 「LON」, 「geobin」, 「LAT」)VALUES(:。」時間戳「,:」vehicle_id「,:」lon「,:」geobin「,:」lat「):所有主機嘗試查詢失敗(未嘗試主機) at com.datastax.spark.connector.writer .TableWriter.com $ datastax $ spark $ connector $ writer $ TableWriter $$ prepareStatement(TableWriter.scala:96) at com.datastax.spark.connector.writer.TableWriter $$ anonfun $ write $ 1.apply(TableWriter.scala: 122) at com.datastax.spark.connector.writer.TableWriter $$ anonfun $ write $ 1.apply(TableWriter.scala:120) at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1。申請(CA在com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala:99) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse( CassandraConnector.scala:151) 在com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) 在com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) at com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36) at com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala: 36) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.r un(Task.scala:64) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 引起:com.datastax.driver.core.exceptions.NoHostAvailableException:所有主機嘗試查詢失敗(未嘗試主機) at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException (DefaultResultSetFuture.java:289) at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91) 在sun.reflect.GeneratedMethodAccessor57.invoke(未知來源) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:497) 在玉米.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) at com.sun.proxy。$ Proxy11.prepare(Unknown Source) at com.datastax.spark.connector.cql.PreparedStatementCache $。 prepareStatement(PreparedStatementCache。scala:45) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28) at com.sun.proxy。$ Proxy11.prepare(Unknown Source) at com.datastax.spark。 connector.writer.TableWriter.com $ datastax $ spark $ connector $ writer $ TableWriter $$ prepareStatement(TableWriter.scala:92) ... 15更多 引起:com.datastax.driver.core.exceptions.NoHostAvailableException:All主機(s)嘗試查詢失敗(未嘗試主機) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107) at com.datastax.driver.core.SessionManager.execute(SessionManager。 java:538) at com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:124) at com.datastax.driver.core.AbstractSession.prepa reAsync(AbstractSession.java:103) 在com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89) ...... 24多個

另外,此異常:

錯誤QueryExecutor:無法執行:[email protected] com.datastax.driver.core.exceptions.NoHostAvailableException:嘗試查詢的所有主機失敗(嘗試:/ 52。{ MYIP}:9042(com.datastax.driver.core.TransportException:[/52.{MYIP}:9042] Connection has been closed)) at com.datastax.driver.core.RequestHa在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)

我已經迷上這部長達一個很好錯誤的羣集,我得到這些錯誤試圖超過6寫/秒(每個表3)

+0

這可能是因爲你的所有cassandra主機被標記。您運行的是什麼版本的spark-cassandra連接器,並且您可以看到有關主機被標記或查詢超時的日誌中的更多內容? –

+0

啊是的,我一直在想這個額外的例外。 我已經更新的問題 – lostinplace

+0

我快: libraryDependencies + = 「com.datastax.spark」 % 「火花卡桑德拉 - connector_2.10」 % 「1.2.0-RC3」 – lostinplace

回答

0

如果你在你的本地機器上使用它,然後檢查你的keyspace的複製因子,使其爲1和再試一次。這對我解決