我正面臨一個奇怪的問題。據我瞭解,Spark中的DAG操作只在執行操作時執行。但是,我可以看到reduceByKey()操作(作爲轉換)開始執行DAG。Apache Spark:爲什麼reduceByKey轉換執行DAG?
重現步驟。嘗試以下代碼段
SparkConf conf =new SparkConf().setMaster("local").setAppName("Test");
JavaSparkContext context=new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist
JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1));
注意:文件的路徑不應該是任何現有的路徑。換句話說,文件不應該存在。
如果執行此代碼,沒有什麼如果你添加下面一行到程序和執行情況作爲expected.However
mapToPair.reduceByKey((x, y) -> x + y);
它提供了以下異常:
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
這意味着它已經開始執行DAG。由於reduceByKey()是一種轉換,因此在執行諸如collect()或take()之類的操作之前,情況不應該如此。
Spark版本:2.0.0。請提供您的建議。
什麼ü通過這條線意味着JavaRDD TEXTFILE = context.textFile(「任何非現有路徑」 ); //此路徑不應該存在 –
context.textFile()理想地將hdfs或本地內容加載到RDD。如果路徑不存在,RDD將如何形成 –
@AviralKumar問題是關於爲什麼代碼被執行,因爲轉換是懶惰地評估的。在調用reduceByKey之後,文件不存在並拋出異常,證明在轉換後正在執行* something *。 – ImDarrenG