這是一個非常簡單的問題:在main()方法中,Spark Streaming/Spark是否像while循環一樣工作?
我使用Spark在下面的方式流:
private static int count=0;
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: sparkstreaminggetjson <hostname> <port>");
System.exit(1);
}
SparkConf sparkConf = new
SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");
JavaSparkContext sc=new JavaSparkContext(sparkConf);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(1));
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
count=count+1;
lines.print;
ssc.start();
ssc.awaitTermination();
}
每當一個新的「批時間」開始時,可變線從插座獲取新的價值並打印,並通過1
我的問題的變量數的增加是:現在我在main()之外聲明計數,因爲如果我不喜歡這樣
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: sparkstreaminggetjson <hostname> <port>");
System.exit(1);
}
//declare count here
int count=0;
SparkConf sparkConf = new
SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");
JavaSparkContext sc=new JavaSparkContext(sparkConf);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(1));
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
count=count+1;
lines.print;
ssc.start();
ssc.awaitTermination();
}
每當新的批處理時間開始時,變量計數就會被重置爲0.所以,它基本上像while循環一樣工作。
有沒有什麼辦法讓我在main()方法中聲明count,以便Spark Streaming循環不會將它重置爲0?如果是這樣,我該在哪裏申報?
基本上我想知道當我們創建一個Spark Context時,我們使整個main()方法的行爲像一個while循環,或者如果有一個特定的地方,在這個循環開始。
非常感謝你,我希望我不會感到困惑。
你是如何看待的情況下(1)櫃檯增加? – maasg
此外,在案例#1中,'lines'變量在每個流式傳輸間隔中都沒有獲得新值。它只有一個值:對創建時實例化的'socketTextStream'的引用。每個間隔的變化是底層RDD的內容,只能通過DStream上的操作訪問。 – maasg