0
插入物30天的數據我有1個事件,從卡夫卡與此內容:火花,從uniq的卡夫卡要求
(date,
user_id,
app_id,
duration,
session_id,
....)
我用這個代碼來獲取主題:
val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topicSessionDuration -> 2), StorageLevel.MEMORY_AND_DISK_2)
.map(_._2)
.map(RawSessionData(_))
而且我店在卡桑德拉與:
kafkaStream.map(session_duration => (
session_duration.year,
session_duration.month,
session_duration.day,
session_duration.publisher_id,
session_duration.app_id,
session_duration.user_id
)).saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_by_app"))
從這個事件,我存儲在15表已經爲不同的用途。會話的數目,用戶,持續時間的數目....
我需要在另外一個存儲,但是從該事件我需要存儲30條型動物線(日期+0天迄今爲止+30天)。
我試圖做這樣的:
for (a <- 0 to 30) {
val toto = a
kafkaStream.map(x => {
val date = new DateTime(x.date_create).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0).plusDays(toto)
(
date.getYear,
date.getMonthOfYear,
date.getDayOfMonth,
x.user_id
)
}).saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_30d"))
但它不將不起作用。事實上,它只保存12或15或20行或其他號碼。並非所有線路。
我有這樣的錯誤:
Could not compute split, block input-11-1480932491800 not found
我可能做錯了什麼,但什麼? 你能幫我嗎? :)
該錯誤可能表明超載。你的處理延遲如何看起來像?你在監視Spark UI的Streaming選項卡嗎? – maasg
我的延遲是第一個記錄(3秒) 壞但隨後變好,平均爲409ms,平均232.00的記錄/秒,較無呢? 我有3個服務器(1 8特效32G,1與8個特效64克和1與12 PROC 64B) 我停止當我使用有錯誤:MEMORY_AND_DISK_SER_2代替MEMORY_AND_DISK_2。任務,同時移動循環中,成爲1 - 但隨着你的答案flatmap所以好嘛:) –
不同的是,外環增加了30個額外的(>商店變換)降低延遲。 – maasg