2017-02-21 39 views
0

我嘗試使用cyclops-react從隊列中根據大小批量調整元素,但也是按時進行的,所以不會在沒有元素時阻塞使用Cyclops反應在異步隊列流上進行批處理

也許功能不是我所期待的還是我做錯了什麼

完整的代碼(Groovy中)是這樣的,在另一個線程生產者:

  Queue<String> queue = QueueFactories.<String>unboundedQueue().build(); 
    new Thread({ 
     while (true) { 
      sleep(1000) 
      queue.offer("New message " + System.currentTimeMillis()); 
     } 
    }).start(); 

    StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor)) 
      .groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS) 
      .forEach({i->println(i + " Batch Time: ${System.currentTimeMillis()}")}) 

輸出是:

[New message 1487673650332, Batch Time: 1487673651356] 
    [New message 1487673651348, New message 1487673652352, Batch Time: 1487673653356] 
    [New message 1487673653355, New message 1487673654357, Batch Time: 1487673655362] 
    [New message 1487673655362, New message 1487673656364, Batch Time: 1487673657365] 

但是我期待在每批一個元件由於提供元件之間的延遲是10秒但配料是每隔半秒鐘

此外,我試圖與一個異步流(Groovy代碼):

Queue<String> queue = QueueFactories.<String>unboundedQueue().build(); 
    StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor)) 
      .async() 
      .groupedBySizeAndTime(10, 500,TimeUnit.MILLISECONDS) 
      .peek({i->println(i + "Batch Time: ${System.currentTimeMillis()}")}).run(); 

    while (true) { 
     queue.offer("New message " + System.currentTimeMillis()); 
     sleep(1000) 
    } 

再次,它僅批處理每2秒,有時等待每批兩個元件,即使在間歇的超時是半秒:

[New message 1487673877780, Batch Time: 1487673878819] 
    [New message 1487673878811, New message 1487673879812, Batch Time: 1487673880815] 
    [New message 1487673880814, New message 1487673881819, Batch Time: 1487673882823] 
    [New message 1487673882823, New message 1487673883824, Batch Time: 1487673884828] 
    [New message 1487673884828, New message 1487673885831, Batch Time: 1487673886835] 

我做了一個非未來非懶流的第三個實驗,這一次它的工作。

Queue<String> queue = QueueFactories.<String>unboundedQueue().build(); 
    new Thread({ 
     while (true) { 
      sleep(1000) 
      queue.offer("New message " + System.currentTimeMillis()); 
     } 
    }).start(); 

    queue.stream() 
      .groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS) 
      .forEach({i->println(i + " Batch Time " + System.currentTimeMillis())}) 

結果:

[New message 1487673288017, New message 1487673289027, Batch Time , 1487673289055] 
    [New message 1487673290029, Batch Time , 1487673290029] 
    [New message 1487673291033, Batch Time , 1487673291033] 
    [New message 1487673292037, Batch Time , 1487673292037] 

爲什麼配料的行爲似乎當您使用未來流是錯誤的?

回答

0

差異行爲是由於一個會降低asyn.Queue的FutureStreams分組效率的錯誤(基本上這意味着下一個結果出現在前一個的500ms限制內,並且Stream將向隊列請求另一個值並等到它到達)。這將在未來發布的獨眼巨人反應中得到解決。

有可能解決這個在錯誤報告耶穌·梅嫩德斯提出了幾種方法

  1. 使用一種變通方法

    queue.stream() 
        .groupedBySizeAndTime(batchSize, batchTimeoutMillis, TimeUnit.MILLISECONDS) 
        .futureStream(new LazyReact(ThreadPools.getSequential())) 
        .async() 
        .peek(this::executeBatch) 
        .run(); 
    

這避免了開銷的結果在兩個值被一起批。

  • 我們可以超時之後500毫秒(並且不等到值到達隊列爲配料)通過利用streamBatch操作者

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build(); 
    new Thread(()->{ 
        for(int i=0;i<10;i++){ 
    
         queue.offer("New message " + i); 
         sleep(10000); 
        } 
        queue.close(); 
    }).start(); 
    
    long toRun = TimeUnit.MILLISECONDS.toNanos(500l); 
    
    queue.streamBatch(new Subscription(), source->{ 
    
        return()->{ 
         List<String> result = new ArrayList<>(); 
    
    
          long start = System.nanoTime(); 
    
           while (result.size() < 10 && (System.nanoTime() - start) < toRun) { 
            try { 
             String next = source.apply(1l, TimeUnit.MILLISECONDS); 
             if (next != null) { 
              result.add(next); 
             } 
            }catch(Queue.QueueTimeoutException e){ 
    
            } 
    
    
           } 
    
         start=System.nanoTime(); 
    
         return result; 
        }; 
    }).filter(l->l.size()>0) 
        .futureStream(new LazyReact(ThreadPools.getSequential())) 
         .async() 
         .peek(System.out::println) 
         .run(); 
    
  • 的在這種情況下,我們將總是在500ms後分組,而不是等到我們要求的值到達隊列。