2017-04-13 50 views
1

我們使用操作鏈Java8流處理事件的流。作爲處理的一部分,我們希望跟蹤事件的數量及其狀態以進行測試和監控。以下是我們的用例的簡化示例,可以打印給定日期流的星期幾。如何處理Java8並行流中的狀態變量?

public class StreamStateHandling { 

    private static enum Status {RECEIVED, SUCCESS, ERROR}; 

    private Map<Status,Integer> results = new EnumMap<>(Status.class); 

    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/yyyy"); 


    private static Optional<LocalDate> parseDate(String dateString){ 
    LocalDate localDate = null; 
    try { 
     localDate = LocalDate.from(formatter.parse(dateString)); 
    }catch (DateTimeParseException e){ 
     return Optional.empty(); 
    } 
    return Optional.of(localDate); 
    } 

    private void doWork(){ 
    Stream.of("12/31/2014", 
      "01-01-2015", 
      "12/31/2015", 
      "not a date", 
      "01/01/2016") 
      //.parallel() 
      .peek(v -> addResult(Status.RECEIVED)) 
      .map(StreamStateHandling::parseDate) 
      .peek(v -> {if (!v.isPresent()) addResult(Status.ERROR);}) 
      .filter(Optional::isPresent) 
      .map(Optional::get) 
      .map(DayOfWeek::from) 
      .peek(v -> addResult(Status.SUCCESS)) 
      .forEach(System.out::println); 

     System.out.println(results); 
    } 
    public static void main(String args[]) { 
    new StreamStateHandling().doWork(); 
    } 

    private void addResult(Status status){ 

    int current = results.getOrDefault(status, 0); 
    results.put(status, current + 1); 
    } 
} 

基本上我們正在跟蹤地圖中的狀態計數。這在單線程處理中工作正常,但在並行流中產生非確定性輸出。

在現實世界中,我們有幾個狀態和操作的鏈條。一般來說,測試流程和跟蹤進度的最佳方式是什麼?喜歡香草Java8的實現,但確定如果使用開源庫更容易。

會感謝你的幫助。

+0

'.peek.foreach'似乎是多餘的。你可以將'forEach'變成一個滿足兩個角色的函數。 – the8472

回答

3

EnumMap是不是線程安全的,也不是在addResult()讀 - 修改 - 寫邏輯。嘗試使用原子ConcurrentHashMap.merge()來增加計數:

private Map<Status, Integer> results = new ConcurrentHashMap<>(); 

private void addResult(Status status) { 
    results.merge(status, 1, Integer::sum); 
} 
+0

不錯!我不知道'merge()'函數。在真正的實現中,我必須通過兩個維度來跟蹤計數,並且我正在使用不支持同步實現的谷歌guava表。一般來說,這是一個好方法嗎?我們試圖通過使用併發處理來加快速度,但同步跟蹤狀態可能會使其變慢。 – Raja

+3

@Raja我可以考慮幾個選項。 a)使用具有複合鍵而不是「表」的'ConcurrentHashMap'。 b)用'AtomicInteger'預填充你的表,它可以安全地增加而不鎖定。 c)圍繞'addResult()'進行同步。正如您所提到的,根據您的流的其餘部分發生多少處理,這可能是一個嚴重的並行化瓶頸。 – shmosel

+1

更一般地,我認爲這是值得懷疑您的方案是否是一個很好的使用情況一般流。流應該通常具有有限的副作用併產生單一的總體結果。您正嘗試從單個流生成多個結果值。 – shmosel