dstream

    0熱度

    1回答

    列表的DSTREAM我有串的名單,但我不能找到一個辦法列表改爲火花流的DSTREAM。 我嘗試這樣做: val tmpList = List("hi", "hello") val rdd = sqlContext.sparkContext.parallelize(Seq(tmpList)) val rowRdd = rdd.map(v => Row(v: _*)) 但日食說sparkCo

    2熱度

    3回答

    我已經通過this stackoverflow的問題,根據答案它創建一個DStream與批間隔只有一個RDD。 例如: 我的批次間隔是1分鐘和Spark流作業從卡夫卡主題消耗數據。 我的問題是,DStream中可用的RDD是否在最後一分鐘內提取/包含整個數據?我們需要設置什麼標準或選項來提取最後一分鐘創建的所有數據? 如果我有一個帶有3個分區的卡夫卡主題,並且所有3個分區都包含最後一分鐘的數據,那

    1熱度

    1回答

    我有一個從Kafka消耗的流式作業(使用createDstream)。 它的「身份證」 [id1,id2,id3 ..] 流我有一個接受的id的數組,並做了一些外部呼叫並接收回一些信息說「T」每個ID [id:t1,id2:t2,id3:t3...] 的工具或API 我希望在調用實用程序保留Dstream時保留DStream。我不能在Dstream rdd上使用地圖轉換,因爲它會調用每個i

    0熱度

    1回答

    我有一個問題,從spark流(pyspark)索引數據到elasticserach。數據類型爲dstream。下面它的外觀 (u'01B', 0) (u'1A5', 1) .... 下面是我使用的彈力指數:指數=的CLU和類型=數據 GET /clus/_mapping/data { "clus": { "mappings": { "data": {

    1熱度

    1回答

    我是Spark Streaming上的新手,我陷入困境,試圖找出如何處理這個問題,因爲我發現了很多單個(K,V)對的例子,但還有更多。爲了找到使用Spark對Java進行轉換的最佳方法,我將不勝感激。 讓我簡要介紹一下情況, 的目標是獲得一個集中的時間窗口內的元素的錯誤率。 考慮下面的輸入, (A, Error) (B, Success) (B, Error) (B, Success) (

    4熱度

    1回答

    在以下代碼中,似乎函數fn1 & fn2以順序方式應用於inRDD,正如我在Spark Web UI的階段部分中看到的。 DstreamRDD1.foreachRDD(new VoidFunction<JavaRDD<String>>() { public void call(JavaRDD<String> inRDD) { inRDD.foreach(fn1

    2熱度

    1回答

    我有通過DStream從Kafka到達的數據。我想要執行特徵提取以獲得一些關鍵字。 我不想等待所有數據的到來(因爲它打算是連續的流,可能永遠不會結束),所以我希望能夠以大塊的方式執行提取 - 如果精度將會忍受一點。 到目前爲止,我放在一起類似的東西: def extractKeywords(stream: DStream[Data]): Unit = { val spark: Spar

    0熱度

    1回答

    以下兩個相同嗎? val dstream = stream.window(Seconds(60), Seconds(1)) val x = dstream.map(x => ...) 和 val dstream = stream.window(Seconds(60), Seconds(1)) val x = dstream.transform(rdd => rdd.map(x => ...

    0熱度

    1回答

    我正在使用Dstream(Spark Streaming)的Transform API對數據進行排序。 我正在使用netcat從TCP套接字讀取數據。 繼使用的代碼行: myDStream.transform(rdd => rdd.sortByKey()) 無法找到函數sortByKey。任何人都可以請幫助這一步中的問題是什麼?

    1熱度

    1回答

    我正在設置Apache Spark長時間運行的流式作業,以使用InputDStream執行(非並行化)流式傳輸。 我想要實現的是當隊列上的批處理時間過長(基於用戶定義的超時時間)時,我希望能夠跳過批處理並完全放棄它 - 並繼續其餘部分執行。 我無法在Spark API或在線上找到解決這個問題的方法 - 我使用StreamingContext awaitTerminationOrTimeout進行了