2016-12-05 115 views
0

插入物30天的數據我有1個事件,從卡夫卡與此內容:火花,從uniq的卡夫卡要求

(date, 
user_id, 
app_id, 
duration, 
session_id, 
....) 

我用這個代碼來獲取主題:

val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, Map(topicSessionDuration -> 2), StorageLevel.MEMORY_AND_DISK_2) 
     .map(_._2) 
     .map(RawSessionData(_)) 

而且我店在卡桑德拉與:

kafkaStream.map(session_duration => (
     session_duration.year, 
     session_duration.month, 
     session_duration.day, 
     session_duration.publisher_id, 
     session_duration.app_id, 
     session_duration.user_id 
     )).saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_by_app")) 

從這個事件,我存儲在15表已經爲不同的用途。會話的數目,用戶,持續時間的數目....

我需要在另外一個存儲,但是從該事件我需要存儲30條型動物線(日期+0天迄今爲止+30天)。

我試圖做這樣的:

for (a <- 0 to 30) { 
     val toto = a 
     kafkaStream.map(x => { 
      val date = new DateTime(x.date_create).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0).plusDays(toto)    
      (
       date.getYear, 
       date.getMonthOfYear, 
       date.getDayOfMonth, 
       x.user_id 
       ) 
     }).saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_30d")) 

但它不將不起作用。事實上,它只保存12或15或20行或其他號碼。並非所有線路。

我有這樣的錯誤:

Could not compute split, block input-11-1480932491800 not found 

我可能做錯了什麼,但什麼? 你能幫我嗎? :)

+0

該錯誤可能表明超載。你的處理延遲如何看起來像?你在監視Spark UI的Streaming選項卡嗎? – maasg

+0

我的延遲是第一個記錄(3秒) 壞但隨後變好,平均爲409ms,平均232.00的記錄/秒,較無呢? 我有3個服務器(1 8特效32G,1與8個特效64克和1與12 PROC 64B) 我停止當我使用有錯誤:MEMORY_AND_DISK_SER_2代替MEMORY_AND_DISK_2。任務,同時移動循環中,成爲1 - 但隨着你的答案flatmap所以好嘛:) –

+0

不同的是,外環增加了30個額外的(>商店變換)降低延遲。 – maasg

回答

2

這將是更好地代表轉型爲:

kafkaStream.flatMap{x => 
    (0 to 30).map{day => 
     val date = new DateTime(x.date_create).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0).plusDays(day)    
     (
      date.getYear, 
      date.getMonthOfYear, 
      date.getDayOfMonth, 
      x.user_id 
     ) 
    }}.saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_30d"))