2014-09-02 454 views
-2
package com.spse.pricing.client.main; 

import java.util.stream.IntStream; 

public class NestedParalleStream { 

    int total = 0; 

    public static void main(String[] args) { 

     NestedParalleStream nestedParalleStream = new NestedParalleStream(); 
     nestedParalleStream.test(); 
    } 


    void test(){ 

     try{ 

      IntStream stream1 = IntStream.range(0, 2); 
      stream1.parallel().forEach(a ->{ 
       IntStream stream2 = IntStream.range(0, 2); 
       stream2.parallel().forEach(b ->{ 
        IntStream stream3 = IntStream.range(0, 2); 
        stream3.parallel().forEach(c ->{ 
         //2 * 2 * 2 = 8; 
         total ++; 

        }); 
       }); 
      }); 

      //It should display 8 
      System.out.println(total); 

     }catch(Exception e){ 

      e.printStackTrace(); 
     } 
    } 
} 

請幫助我們如何自定義parallestream以確保我們能夠獲得一致性結果。Java 8嵌套ParallelStream不能正常工作

+0

它實際顯示的是什麼? – 2014-09-02 14:58:11

+1

'它應該顯示8'好,它顯示什麼呢?您是否嘗試過使用'total' ['volatile'](http://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html)? – kajacx 2014-09-02 14:58:12

+0

@kajacx volatile不會幫助,因爲'a ++'是'a = a + 1'的快捷鍵,這不是原子的 – talex 2014-09-02 15:01:02

回答

3

聲明中的問題total ++;它在多個線程中同時被調用。

你應該​​保護或使用AtomicInteger

4

由於多個線程遞增total 您必須聲明它 volatile避免競爭條件


編輯:揮發性品牌的讀/寫操作是原子操作,但total++需要超過一個操作。在多個線程變異值

AtomicInteger total = new AtomicInteger(); 
... 
total.incrementAndGet(); 
3

LongAdderLongAccumulator優於AtomicLongAtomicInteger和它打算在年底要讀取相對較少的時間,如一次:因爲這個原因,你應該使用的AtomicInteger計算。加法器/累加器對象避免了原子對象可能發生的爭用問題。 (對於double值有相應的加法器/累加器對象。)

通常有一種方法可以使用reduce()collect()重寫累加。這些往往是可取的,特別是如果積累(或收集)的價值不是longdouble

2

關於解決方法的可變性存在一個主要問題。以您想要的方式解決問題的更好方法如下:

int total = IntStream.range(0,2) 
      .parallel() 
      .map(i -> { 
       return IntStream.range(0,2) 
         .map(j -> { 
          return IntStream.range(0,2) 
            .map(k -> i * j * k) 
            .reduce(0,(acc, val) -> acc + 1); 
         }).sum(); 
      }).sum();