0

我使用DSE 5.1(火花2.0.2.6和卡桑德拉3.10.0.1652)火花的作業(斯卡拉)寫類型日期卡桑德拉

我卡桑德拉表:

CREATE TABLE ks.tbl (
    dk int, 
    date date, 
    ck int, 
    val int, 
PRIMARY KEY (dk, date, ck) 
) WITH CLUSTERING ORDER BY (date DESC, ck ASC); 

數據如下:

dk | date  | ck | val 
----+------------+----+----- 
    1 | 2017-01-01 | 1 | 100 
    1 | 2017-01-01 | 2 | 200 

我的代碼必須讀取這些數據,並寫同樣的事情,但與昨天的日期(它編譯成功):

package com.datastax.spark.example 

import com.datastax.spark.connector._ 
import com.datastax.spark.connector.cql.CassandraConnector 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.{SparkConf, SparkContext} 
import com.github.nscala_time.time._ 
import com.github.nscala_time.time.Imports._ 

object test extends App { 

    val conf = new SparkConf().setAppName("DSE calculus app TEST") 
    val sc = new SparkContext(conf) 

    val yesterday= (DateTime.now - 1.days).toString(StaticDateTimeFormat.forPattern("yyyy-MM-dd")) 

    val tbl = sc.cassandraTable("ks","tbl").select("dk","date","ck","val").where("dk=1") 

    tbl.map(row => (row.getInt("dk"),yesterday,row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl") 

    sc.stop() 
    sys.exit(0) 
} 

當我運行這個程序:

dse spark-submit --class com.datastax.spark.example.test test-assembly-0.1.jar 

它不能正確寫入卡桑德拉。看來日期變量沒有正確插入地圖中。 我得到的錯誤是:

Error: 
WARN 2017-05-08 22:23:16,472 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <IP of one of my nodes>): java.io.IOException: Failed to write statements to ks.tbl. 
     at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:207) 
     at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175) 
     at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112) 
     at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
     at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145) 
     at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111) 
     at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175) 
     at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162) 
     at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149) 
     at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) 
     at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
     at org.apache.spark.scheduler.Task.run(Task.scala:86) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:748) 

然而,當我直接在地圖語句插入日期(字符串)如下代碼沒有正確地插入數據:

tbl.map(row => (row.getInt("dk"),"2017-02-02",row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl") 

還插入數據正確,如果我昨天設置爲一個整數(自紀元以來的天數)。這將是最佳的,但不能得到'昨天'表現這種方式

編輯:這不會正確插入數據,實際上。無論將「昨天」設置爲1還是100,000,000,它始終插入紀元('1970-01-01)

失敗的代碼表現正確,並且正如我在DSE Spark控制檯中所期望的那樣。

我只是無法弄清楚我做錯了什麼。歡迎任何幫助。

編輯2:excecutor 0 stderr log確實表明它試圖在列日期插入一個空值,這顯然不可能,因爲它是一個聚類列。

+0

您需要發佈執行日誌。由於驅動程序只能看到「寫入語句失敗」,因此它不會導致造成它的個別嘗試和失敗。 – RussS

+0

你的意思是來自Spark主UI中應用程序的所有執行者的標準輸出stderr? – Mematematica

+0

至少你得到了例外:) :) – RussS

回答

1

在爲Spark作業編寫代碼時,瞭解何時設置了特定變量以及何時序列化是很重要的。讓我們來看看一個音符從App特質文檔

注意事項

應當指出的是,這種特性是使用 DelayedInit功能,這意味着該對象的字段將 不會一直執行在主方法執行之前初始化。

這意味着在實際運行代碼時,可能不會在Executors上初始化對App正文中使用的變量的引用。

我的猜測是,你寫的lambda包含一個val的引用,它是在App類的Delayed init部分初始化的。這意味着執行程序上未運行Main方法的代碼的序列化版本會獲取該值的未初始​​化版本(空值)。

將常量切換到lazy val(或將其移動到單獨的對象或類中)可以通過確保該值遠程初始化(lazy val)或簡單地序列化初始化(單獨的類/對象)來解決此問題。

0

我想我知道你的問題是什麼。
您可能會看到完整的日誌文件。您只需附加其中的一部分...
今天有類似的錯誤,當創建keyspace與replication_factor:3時,我只有一個cassandra實例。

所以我改變它,問題不復存在。

ALTER KEYSPACE "some_keyspace_name" WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; 

Here is my error.log file

和日誌的重要組成部分:

Logging.scala[logError]:72) - Failed to execute: [email protected] 
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)