2017-02-07 20 views
4

我正面臨一個奇怪的問題。據我瞭解,Spark中的DAG操作只在執行操作時執行。但是,我可以看到reduceByKey()操作(作爲轉換)開始執行DAG。Apache Spark:爲什麼reduceByKey轉換執行DAG?

重現步驟。嘗試以下代碼段

SparkConf conf =new SparkConf().setMaster("local").setAppName("Test"); 
JavaSparkContext context=new JavaSparkContext(conf); 

JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist 

JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); 
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1)); 

注意:文件的路徑不應該是任何現有的路徑。換句話說,文件不應該存在。

如果執行此代碼,沒有什麼如果你添加下面一行到程序和執行情況作爲expected.However

mapToPair.reduceByKey((x, y) -> x + y); 

它提供了以下異常:

Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: 

這意味着它已經開始執行DAG。由於reduceByKey()是一種轉換,因此在執行諸如collect()或take()之類的操作之前,情況不應該如此。

Spark版本:2.0.0。請提供您的建議。

+0

什麼ü通過這條線意味着JavaRDD TEXTFILE = context.textFile(「任何非現有路徑」 ); //此路徑不應該存在 –

+0

context.textFile()理想地將hdfs或本地內容加載到RDD。如果路徑不存在,RDD將如何形成 –

+0

@AviralKumar問題是關於爲什麼代碼被執行,因爲轉換是懶惰地評估的。在調用reduceByKey之後,文件不存在並拋出異常,證明在轉換後正在執行* something *。 – ImDarrenG

回答

2

這是因爲它並不是真正的DAG被執行(例如:它的整個實現)。

發生什麼是reduceByKey 需要分區程序才能工作。如果你不提供一個,Spark會根據約定和默認值創建一個。 「默認partiionner」作爲代碼如下評論:

/** 
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs. 
* 
* If any of the RDDs already has a partitioner, choose that one. 
* 
* Otherwise, we use a default HashPartitioner. For the number of partitions, if 
* spark.default.parallelism is set, then we'll use the value from SparkContext 
* defaultParallelism, otherwise we'll use the max number of upstream partitions. 
* 
* Unless spark.default.parallelism is set, the number of partitions will be the 
* same as the number of partitions in the largest upstream RDD, as this should 
* be least likely to cause out-of-memory errors. 
* 
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. 
*/ 

這一定義意味着,在某些情況下,計算分區的所有上游RDDS數量。在你的情況下,這意味着要求「文件系統」(可以是Hadoop,可以是本地的......)執行任何必要的操作(例如,對Hadoop文件系統的單個調用可以返回多個文件,每個文件也可以被分割根據其InputFormat定義的各種優化,只能通過實際查找它們才能知道)。

這就是在這裏執行的,而不是實際的DAG(例如,你的map/flatMap/aggregate,...)。

您可以通過在此者皆變種減少提供自己partitionner可能避免呢?

reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] 
+0

感謝您的回答。我嘗試提供自定義的Partitoner,現在也不會拋出錯誤。但是,如果沒有執行任何操作,我仍然困惑爲什麼它開始計算部分函數。 下面作爲參考的劃分器代碼:) '分區程序分區=新分區程序({ \t \t \t最終詮釋maxPartitions = 2; \t \t \t @Override \t \t \t公衆詮釋numPartitions(){ \t \t \t \t \t \t \t \t返回maxPartitions; \t \t \t} \t \t \t \t \t \t @Override \t \t \t公衆詮釋getPartition(最終對象OBJ){ \t \t \t \t字符串OBJ1 =(字符串)OBJ; \t \t \t \t返回obj1.hashCode()%maxPartitions; \t \t \t} \t \t}; ' –

+0

由於分區不是在「動作執行」上「即時」創建的,而是在創建需要它們的轉換時創建的。 (這更像是一個權衡,而不是Spark上的一個缺點)。默認的一個會試圖聰明一點(例如,如果它創建了少量的分區,那麼它可能會導致內存不足,如果它創建了太多的,這可能是浪費),但智能意味着獲取上游RDD的描述,你稱之爲「計算分區」。如果這是你真正想要的,那麼設置默認的並行性參數將會將其切換爲「關閉」。 – GPI

+0

你手工Partitionner看起來像一個HashPartitionner,火花已擁有:https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/HashPartitioner.html – GPI

相關問題