2017-08-31 199 views
1

因此,我有一個Apache Spark流,每20分鐘一天一小時地寫入S3 parquet文件分區。
看來,每個批處理在寫入之前,都會在此表(/根文件夾)名稱的所有文件夾上執行「ls」和「head」。Spark寫入文件並追加到s3 - 成本問題

由於我們有多個天X 24小時X個不同的表,這會導致總體上相對較高的S3成本。

請注意我們的模式正在動態改變。

所以我的問題是:

  1. 它是正確的,因爲在寫遞歸讀取所有實木複合地板的頭上?

  2. 爲什麼流不會緩存這些信息/是否可以緩存它?

  3. 你能建議最佳實踐嗎?

//編寫代碼:

withPartition.write() 
       .format(format) 
       .mode(SaveMode.Append) 
       .partitionBy("day","hour") 
       .save(path); 

看來,這個問題涉及到:

https://issues.apache.org/jira/browse/SPARK-20049

Spark partitionBy much slower than without it

回答

0

我發現火花分區是由這個問題的原因:

Spark partitionBy much slower than without it

所以我實現它,如下所示,它解決了這一問題,而且它提高了性能自動:

withPartition = withPartition.persist(StorageLevel.MEMORY_AND_DISK()); 
    Dataset<DayAndHour> daysAndHours = withPartition.map(mapToDayHour(), Encoders.bean(DayAndHour.class)).distinct(); 

    DayAndHour[] collect = (DayAndHour[])daysAndHours.collect(); 
    Arrays.sort(collect); 
    logger.info("found " + collect.length +" different days and hours: " 
      + Arrays.stream(collect).map(DayAndHour::toString).collect(Collectors.joining(",")) ); 
    long time = System.currentTimeMillis(); 
    for(DayAndHour dayAndHour : collect){ 
     int day = dayAndHour.getDay(); 
     int hour = dayAndHour.getHour(); 
     logger.info("Start filter on " + dayAndHour); 
     Dataset<Row> filtered = withPartition.filter(filterDayAndHour(day, hour)) 
       .drop("day", hour"); 

      String newPath = path + "/" 
        + "day" +"=" +day +"/" 
        + "hour" +"=" + hour; 

      long specificPathCount = filtered.count(); 
      long timeStart = System.currentTimeMillis(); 
      logger.info("writing " + specificPathCount+ " event to " + newPath ); 

      filtered.write() 
        .format(format) 
        .mode(SaveMode.Append) 
        .save(newPath); 

      logger.info("Finish writing partition of " + dayAndHour+ " to "+ newPath+ ". Wrote [" + specificPathCount +"] events in " + TimeUtils.tookMinuteSecondsAndMillis(timeStart, System.currentTimeMillis())); 
} 
    logger.info("Finish writing " + path+ ". Wrote [" + cnt +"] events in " + MinuteTimeUtils.tookMinuteSecondsAndMillis(time, System.currentTimeMillis())); 
    withPartition.unpersist(); 

private static MapFunction<Row, DayAndHour> mapToDayHour() { 
    return new MapFunction<Row, DayAndHour>() { 
     @Override 
     public DayAndHour call(Row value) throws Exception { 
      int day = value.getAs("day"); 
      int hour = value.getAs(hour"); 
      DayAndHour dayAndHour = new DayAndHour(); 
      dayAndHour.setDay(day); 
      dayAndHour.setHour(hour); 
      return dayAndHour; 
     } 
    }; 
} 

private static FilterFunction<Row> filterDayAndHour(int day, int hour) { 
    return new FilterFunction<Row>() { 
     @Override 
     public boolean call(Row value) throws Exception { 
      int cDay = value.getAs("day"); 
      int cHour = value.getAs(hour"); 

      return day == cDay && hour == cHour; 
     } 
    }; 
} 

//而另一POJO

public class DayAndHour implements Serializable , Comparable<DayAndHour>{ 

    private int day; 
    private int hour; 

    public int getDay() { 
     return day; 
    } 

    public void setDay(int day) { 
     this.day = day; 
    } 

    public int getHour() { 
     return hour; 
    } 

    public void setHour(int hour) { 
     this.hour = hour; 
    } 

    @Override 
    public boolean equals(Object o) { 
     if (this == o) return true; 
     if (o == null || getClass() != o.getClass()) return false; 

     DayAndHour that = (DayAndHour) o; 

     if (day != that.day) return false; 
     return hour == that.hour; 
    } 

    @Override 
    public int hashCode() { 
     int result = day; 
     result = 31 * result + hour; 
     return result; 
    } 

    @Override 
    public String toString() { 
     return "(" + 
       "day=" + day + 
       ", hour=" + hour + 
       ')'; 
    } 

    @Override 
    public int compareTo(DayAndHour dayAndHour) { 
     return Integer.compare((day * 100) + hour, (dayAndHour.day * 100) + dayAndHour.hour); 
    } 
}