2017-01-03 97 views
3

我試圖調整下面的過程,因爲我有一個非常Java heap space error.調整星火工作

望着星火UI,有一個cogroup,在一個非常奇怪的方式表現。 在這個階段之前,一切看起來都很平衡(目前我已經硬編碼了48個分區)。在方法loadParentMPoint中有cogroup轉換,基本上當我要執行下一個計數時,cogroup被計算,基本上有48個任務被安排,但是其中有47個立即終止(似乎沒有什麼可處理的),除非開始做洗牌讀取,直到它填滿堆空間並引發異常。

我已經用相同的數據集啓動了幾次進程,並且結束總是相同的。 每次它只是一個執行者,而以前是平衡的。

爲什麼我有這種行爲?也許我錯過了什麼?我在cogroup之前試過repartition的數據,因爲我認爲它是不平衡的,但它不起作用,當我試圖使用partitionBy時也是如此。

這是代碼摘錄:

class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler { 

    implicit val ctx = sc 
    val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess]) 
    val ipc = new Handler[ConsumptionComputationBigDataIPC] 
    val billingOrderDao = new Handler[BillingOrderDao] 
    val mPointDao = new Handler[MeasurementPointDAO] 
    val billingOrderBDao = new Handler[BillingOrderBDAO] 
    val ccmDiscardBdao = new Handler[CCMDiscardBDAO] 
    val ccmService = new Handler[ConsumptionComputationBillingService] 
    val registry = new Handler[IncrementalRegistryTableData] 
    val podTimeZoneHelper = new Handler[PodDateTimeUtils] 
    val billingPodStatusDao = new Handler[BillingPodStatusBDAO] 
    val config = new Handler[PropertyManager] 
    val paramFacade = new Handler[ConsumptionParameterFacade] 
    val consumptionMethods = new Handler[ConsumptionMethods] 
    val partitions = config.get.defaultPartitions() 
    val appName = sc.appName 
    val appId = sc.applicationId 
    val now = new DateTime 

    val extracted = ctx.accumulator(0l, "Extracted from planning") 
    val generated = ctx.accumulator(0l, "Billing orders generated") 
    val discarded = ctx.accumulator(0l, "Billing orders discarded") 

    // initialize staging 
    val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea()) 
    staging.prepareReading 

    val rddExtractedFromPlanning = staging 
     .read[ExtractedPO]() 
     .repartition(48) 
     .setName("rddExtractedFromPlanning") 
     .cache 

    val rddExtracted = rddExtractedFromPlanning 
     .filter { x => 
     extracted += 1 
     (x.getExtracted == EExtractedType.EXTRACTED || 
     x.getExtracted == EExtractedType.EXTRACTED_BY_USER || 
     x.getExtracted == EExtractedType.EXTRACTED_BY_TDC) 
     } 
     .map { x => 
     log.info("1:extracted>{}", x) 
     val bo = MapperUtil.mapExtractedPOtoBO(x) 
     bo 
     } 

    val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e => 
     val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate()) 
     val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr) 
     log.info("2:last Billing order>{}", last); 
     (e.getPod, e, last) 
    } 
     .setName("podWithExtractedAndLastBillingOrderPO") 
     .cache() 

    val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3)))) 

    val rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO 
     .map(e => (e._1,1)) 
     .reduceByKey(_+_) 
     .keys 

    val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List()) 

    val rddExtractedWithMPoint = ConsumptionComputationUtil 
     .groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory) 
     .filter{ e => 
     val mPoint = e._3 
     val condition = mPoint != null 
     condition match { 
      case false => log.error("MPoint is NULL for POD -> " + e._1) 
      case true => 
     } 
     condition 
     } 
     .setName("rddExtractedWithMPoint") 
     .cache 

    rddExtractedWithMPoint.count 

    val rddExtractedWithMPointWithParent = ConsumptionComputationUtil 
     .groupWithParent(rddExtractedWithMPoint) 
     .map{ 
     case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) => 
      if (!parentMpointId.isEmpty) { 
      val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get) 
      log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod) 
      (pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory) 
      } else { 
      log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod) 
      (pod, extracted, measurementPoint, billOrder, null, factory) 
      } 
     } 
     .setName("rddExtractedWithMPointWithParent") 
     .cache() 

    rddExtractedWithMPointWithParent.count 

    val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent 
     .filter(e => Option(e._5).isDefined) 
     .map(e => (e._5,1)) 
     .reduceByKey(_+_) 
     .keys 

    rddRegistryFactoryParentKeys.count 

    val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List()) 

    rddRegistryFactoryParent.count 

    val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder] 

    val rddNew = rddExtractedWithMPointWithParent.map({ 
     case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) => 
     (parentPod, (pod, extracted, measurementPoint, billingOrder, factory)) 
    }) 
    rddNew.count 

    val p = rddNew.cogroup(rddRegistryFactoryParent) 
    p.count 

    val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty } 
    .flatMap{ case (pod, (inputs, mpFactories)) => 
     val factory = mpFactories.headOption //eventually one or none factory 
     val results = inputs.map{e => 
      val measurementPointTupla = factory.flatMap{f => 
      Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f) 
     } 
      val tupla = measurementPointTupla.getOrElse(null) 
      val toBeBilled = if(tupla!=null && tupla._1!=null) false else true 
      val m = if(tupla!=null && tupla._1!=null) tupla._1 else null 
      val f = if(tupla!=null && tupla._2!=null) tupla._2 else null 
      (e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f) 
     } 
     results 
    } 
    .setName("rddExtractedWithMPointWithMpointParent") 
    .cache() 

    rddExtractedWithMPointWithMpointParent.foreach({ e => 
     log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1) 
    }) 
} 

這些是參與到協同組操作兩個RDDS階段,rddNew:

enter image description here

rddRegistryFactory:

enter image description here

,這是協同組的階段:

enter image description here

這是存儲的情況:

enter image description here

這是執行人凸片:

enter image description here

注:我添加了計數操作僅用於調試目的。

UPDATE:

  • 我試圖刪除緩存ADN再次啓動的過程中,現在每個執行人大約有用於存儲數據100M,但行爲是一樣的:隨機讀恰好一遺囑執行人。
  • 我也試圖在cogroup之前做相同的兩個RDD之間的連接操作,只是爲了知道我遇到的問題是僅與cogroup相關還是擴展到了所有寬轉換以及連接,行爲已經完全一樣。
+0

好像你的「緩存」正在創造內存壓力。爲什麼要在這裏緩存?你有沒有嘗試過緩存? –

+0

我添加了兩張圖片,代表存儲和執行者的情況。也許是有堆壓力的一點點,但行爲是奇怪的,這可能只是緩存濫用? – Giorgio

+0

有各種各樣的因素沒有一個,請刪除緩存,並看到 –

回答

2

我解決了它,問題與partitioning有關。基本上,數據到rdd調用cogroup操作中的所有密鑰都具有相同的值,因此當發生cogroup時,Spark試圖對兩個RDD進行散列分區,將兩個rdd的密鑰放在同一個執行程序中以便對它們進行組合。

+0

那麼你如何解決它? –

+0

基本上問題不在於cogroup本身,而是在執行cogroup的那一刻,數據完全沒有分區,實際上我發現了一個導致所有數據進入同一個分區的bug,所以修復只是簡單地修復了進入cogroup的數據。 – Giorgio

+0

@Giorgio,你好我在使用'cogroup'時遇到了內存問題,任何建議和提示都會很棒,因爲我是新手,並且不知道如何解決它。請在這裏找到問題[https://stackoverflow.com/questions/47180307/how-to-use-cogroup-for-large-datasets] – Vignesh

2
  • 我堅信這Java heap space error是因爲緩存RDDS這似乎是基於你的最後截屏存儲選項卡沒有必要的。

enter image description here

根據數據集多少次訪問,並參與這樣的工作量,重新計算可以比增加的內存壓力所付出的代價要快。

毫無疑問,如果你只讀過一個數據集而沒有緩存它,它實際上會讓你的工作變慢。

  • 爲了計算調試目的,您可以使用countApprox()而不是count。一旦測試完成,你可以將其刪除,以便實際使用你的工作
+0

我跑了它,但行爲是一樣的,正在從一個執行者讀取洗牌。現在執行者可以免費存儲(每個只有大約100M)。任何其他建議? – Giorgio

+0

確定堆空間錯誤再次出現? –

+0

是的,和以前完全一樣,請看帖子,我已經用我做的下一個測試更新了它 – Giorgio