我們使用操作鏈Java8流處理事件的流。作爲處理的一部分,我們希望跟蹤事件的數量及其狀態以進行測試和監控。以下是我們的用例的簡化示例,可以打印給定日期流的星期幾。如何處理Java8並行流中的狀態變量?
public class StreamStateHandling {
private static enum Status {RECEIVED, SUCCESS, ERROR};
private Map<Status,Integer> results = new EnumMap<>(Status.class);
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/yyyy");
private static Optional<LocalDate> parseDate(String dateString){
LocalDate localDate = null;
try {
localDate = LocalDate.from(formatter.parse(dateString));
}catch (DateTimeParseException e){
return Optional.empty();
}
return Optional.of(localDate);
}
private void doWork(){
Stream.of("12/31/2014",
"01-01-2015",
"12/31/2015",
"not a date",
"01/01/2016")
//.parallel()
.peek(v -> addResult(Status.RECEIVED))
.map(StreamStateHandling::parseDate)
.peek(v -> {if (!v.isPresent()) addResult(Status.ERROR);})
.filter(Optional::isPresent)
.map(Optional::get)
.map(DayOfWeek::from)
.peek(v -> addResult(Status.SUCCESS))
.forEach(System.out::println);
System.out.println(results);
}
public static void main(String args[]) {
new StreamStateHandling().doWork();
}
private void addResult(Status status){
int current = results.getOrDefault(status, 0);
results.put(status, current + 1);
}
}
基本上我們正在跟蹤地圖中的狀態計數。這在單線程處理中工作正常,但在並行流中產生非確定性輸出。
在現實世界中,我們有幾個狀態和操作的鏈條。一般來說,測試流程和跟蹤進度的最佳方式是什麼?喜歡香草Java8的實現,但確定如果使用開源庫更容易。
會感謝你的幫助。
'.peek.foreach'似乎是多餘的。你可以將'forEach'變成一個滿足兩個角色的函數。 – the8472