2017-08-02 84 views
1

我想用這樣的代碼(斯卡拉)登錄一個RDD每個mapPartition操作的執行時間:阿帕奇星火mapPartition奇怪的行爲(懶惰的評價是什麼?)

rdd.mapPartitions{partition => 
    val startTime = Calendar.getInstance().getTimeInMillis 
    result = partition.map{element => 
     [...] 
    } 
    val endTime = Calendar.getInstance().getTimeInMillis 
    logger.info("Partition time "+(startTime-endTime)+ "ms") 
    result 
} 

的問題是,它在開始執行映射操作之前立即記錄「分區時間」,所以我總是獲得2毫秒的時間。

我注意到了Spark Web UI,在日誌文件中,有關執行時間的行在任務開始後立即出現,而不是如預期的那樣結束。

有人能解釋我爲什麼?在mapPartitions中,代碼應該線性執行,否則我錯了?

由於

問候 盧卡

+0

轉換被懶惰地評估。 – philantrovert

+0

好的,謝謝! 我解決了在結束時間之前放置「result.size」。 我認爲默認情況下,mapPartitions中的地圖是一個Scala操作,不是懶惰的。 – Gaglia88

+0

@philantrovert不,這不是原因,map裏面的mapPartitions不是Spark轉換,這是純粹的scala相關 –

回答

3

partitionsmapPartitions內部是一個Iterator[Row]Iterator是Scala懶惰地評估(即,當迭代器被消耗)。這與Spark的懶惰評論無關!

調用partitions.size將觸發評估您的映射,但會消耗迭代器(因爲它只能迭代一次)。一個例子

val it = Iterator(1,2,3) 
it.size // 3 
it.isEmpty // true 

你能做什麼是迭代器轉換爲無延遲的集合類型:

rdd.mapPartitions{partition => 
    val startTime = Calendar.getInstance().getTimeInMillis 
    result = partition.map{element => 
     [...] 
    }.toVector // now the statements are evaluated 
    val endTime = Calendar.getInstance().getTimeInMillis 
    logger.info("Partition time "+(startTime-endTime)+ "ms") 
    result.toIterator 
} 

編輯:請注意,您可以使用System.currentTimeMillis()(甚至System.nanoTime())而不是使用Calendar