2016-04-03 51 views
3

這是一個非常簡單的問題:在main()方法中,Spark Streaming/Spark是否像while循環一樣工作?

我使用Spark在下面的方式流:

private static int count=0; 

public static void main(String[] args) throws Exception { 
    if (args.length < 2) { 
     System.err.println("Usage: sparkstreaminggetjson <hostname> <port>"); 
     System.exit(1); 
    } 

SparkConf sparkConf = new 
SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson"); 

JavaSparkContext sc=new JavaSparkContext(sparkConf); 

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1)); 

JavaReceiverInputDStream<String> lines = ssc.socketTextStream(

args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); 

count=count+1; 

lines.print; 

ssc.start(); 

ssc.awaitTermination(); 
} 

每當一個新的「批時間」開始時,可​​變線從插座獲取新的價值並打印,並通過1

我的問題的變量數的增加是:現在我在main()之外聲明計數,因爲如果我不喜歡這樣

public static void main(String[] args) throws Exception { 
    if (args.length < 2) { 
     System.err.println("Usage: sparkstreaminggetjson <hostname> <port>"); 
     System.exit(1); 
    } 

//declare count here 
int count=0; 
SparkConf sparkConf = new 
SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson"); 

JavaSparkContext sc=new JavaSparkContext(sparkConf); 

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1)); 

JavaReceiverInputDStream<String> lines = ssc.socketTextStream(

args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); 

count=count+1; 

lines.print; 

ssc.start(); 

ssc.awaitTermination(); 
} 

每當新的批處理時間開始時,變量計數就會被重置爲0.所以,它基本上像while循環一樣工作。

有沒有什麼辦法讓我在main()方法中聲明count,以便Spark Streaming循環不會將它重置爲0?如果是這樣,我該在哪裏申報?

基本上我想知道當我們創建一個Spark Context時,我們使整個main()方法的行爲像一個while循環,或者如果有一個特定的地方,在這個循環開始。

非常感謝你,我希望我不會感到困惑。

+1

你是如何看待的情況下(1)櫃檯增加? – maasg

+0

此外,在案例#1中,'lines'變量在每個流式傳輸間隔中都沒有獲得新值。它只有一個值:對創建時實例化的'socketTextStream'的引用。每個間隔的變化是底層RDD的內容,只能通過DStream上的操作訪問。 – maasg

回答

3

編號Spark Streaming不會在「main」上執行「while循環」。 Spark Streaming使用調度程序以提供的批處理間隔觸發註冊的output operators

那些輸出操作員將在每個批處理間隔觸發基礎RDD的實現。在這個實現過程中,RDD操作將在Spark集羣中執行。

使用與羣集中序列化的任何代碼交互的全局靜態變量將導致意外的行爲。

Spark是一個分佈式計算框架,它的操作是面向這個目標的。諸如「map」,「filter」等經典轉換將在羣集中的節點間分佈的數據上執行。

Spark中最接近的一個「全局變量」是broadcast variables,但那些不能在閉包中更新。

從提供的代碼中,我瞭解到那些是瞭解Spark Streaming模型的初始實驗。花點時間研究related material,充分了解它的工作原理。在這種特殊情況下,增加一個靜態變量只能在本地模式下工作,因爲所有代碼都在同一個JVM上執行,但這不是Spark Streaming的目標。

+0

@manuelmourato另請參閱:https://www.youtube.com/watch?v=mgvYg-0OXkU – maasg

+0

非常感謝。你是對的,這只是我想要解釋我的問題的一個非常簡單的例子。實際上,我想要做的是將來自'lines' DStream的每個RDD的值存儲到一個.txt文件中,每行一個值。 使用'savetextasfiles。DSTREAM()'我能夠每RDD保存在批次間隔,但另一批間隔開始時,接下來的RDD被存儲在不同的文件中,都具有相同的HDFS目錄。 我想過使用FileWriter的在給定的時間間隔保存來自每個RDD的值成一個唯一的文件。 –

+0

但是在每個批處理間隔中,FileWriter filew = new FileWriter(),一次又一次地重複,並且我一直在相同的文件中存儲我想要的值。結果是隻有一個值保存在我的.txt文件中。當我嘗試將這些值存儲在SQL數據庫中時,情況也是如此。 如果我被迷惑我很抱歉,但感謝你的解釋,這是非常必要的。 –