2017-09-14 55 views
0

我一直在嘗試使用Java編寫自定義源代碼。具體來說,我寫了一個來自BlockingQueue的元素。我知道Source.queue,但是我不知道如何獲得物化值,如果我將其中幾個連接到合併階段。總之,這裏的實施:Akka Streams-合併階段有時僅在所有上游源推送到下游時纔會向下遊推進

public class TestingSource extends GraphStage<SourceShape<String>> { 
    private static final ExecutorService executor = Executors.newCachedThreadPool(); 

    public final Outlet<String> out = Outlet.create("TestingSource.out"); 
    private final SourceShape<String> shape = SourceShape.of(out); 

    private final BlockingQueue<String> queue; 
    private final String identifier; 

    public TestingSource(BlockingQueue<String> queue, String identifier) { 
     this.queue = queue; 
     this.identifier = identifier; 
    } 

    @Override 
    public SourceShape<String> shape() { 
     return shape; 
    } 

    @Override 
    public GraphStageLogic createLogic(Attributes inheritedAttributes) { 
     return new GraphStageLogic(shape()) { 
      private AsyncCallback<BlockingQueue<String>> callBack; 

      { 
       setHandler(out, new AbstractOutHandler() { 
        @Override 
        public void onPull() throws Exception { 
         String string = queue.poll(); 
         if (string == null) { 
          System.out.println("TestingSource " + identifier + " no records in queue, invoking callback"); 
          executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream 
         } else { 
          System.out.println("TestingSource " + identifier + " found record during pull, pushing"); 
          push(out, string); 
         } 
        } 
       }); 
      } 

      @Override 
      public void preStart() { 
       callBack = createAsyncCallback(queue -> { 
        String string = null; 
        while (string == null) { 
         Thread.sleep(100); 
         string = queue.poll(); 
        } 
        push(out, string); 
        System.out.println("TestingSource " + identifier + " found record during callback, pushed"); 
       }); 
      } 
     }; 
    } 
} 

這個示例工作,如此看來,我的來源是否正常工作:

private static void simpleStream() throws InterruptedException { 
    BlockingQueue<String> queue = new LinkedBlockingQueue<>(); 
    Source.fromGraph(new TestingSource(queue, "source")) 
      .to(Sink.foreach(record -> System.out.println(record))) 
      .run(materializer); 

    Thread.sleep(2500); 
    queue.add("first"); 

    Thread.sleep(2500); 
    queue.add("second"); 
} 

我還寫道,連接兩個來源,以一個合併階段的一個例子:

private static void simpleMerge() throws InterruptedException { 
    BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(); 
    BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(); 

    final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
      Sink.foreach(record -> System.out.println(record)), 
      (builder, out) -> { 
       final UniformFanInShape<String, String> merge = 
         builder.add(Merge.create(2)); 
       builder.from(builder.add(new TestingSource(queue1, "queue1"))) 
         .toInlet(merge.in(0)); 
       builder.from(builder.add(new TestingSource(queue2, "queue2"))) 
         .toInlet(merge.in(1)); 

       builder.from(merge.out()) 
         .to(out); 
       return ClosedShape.getInstance(); 
      })); 
    result.run(materializer); 

    Thread.sleep(2500); 
    System.out.println("seeding first queue"); 
    queue1.add("first"); 

    Thread.sleep(2500); 
    System.out.println("seeding second queue"); 
    queue2.add("second"); 
} 

有時這例如可以作爲我expect-它另外5秒鐘後打印「第一」 5秒,然後打印「第二」後。

然而,有時(約5分之1運行)它在10秒後打印「秒」,然後立即打印「第一」。換句話說,合併階段只有在兩個源都推送了某些東西時纔會將字符串推向下游。 完整的輸出如下所示:

TestingSource queue1 no records in queue, invoking callback 
TestingSource queue2 no records in queue, invoking callback 
seeding first queue 
seeding second queue 
TestingSource queue2 found record during callback, pushed 
second 
TestingSource queue2 no records in queue, invoking callback 
TestingSource queue1 found record during callback, pushed 
first 
TestingSource queue1 no records in queue, invoking callback 

這種現象發生更頻繁地MergePreferred和MergePrioritized。

我的問題是 - 這是合併的正確行爲?如果不是,我做錯了什麼?

回答

0

乍一看,在舞臺中間用Thread.sleep屏蔽線程似乎至少是其中一個問題。

無論如何,我認爲使用Source.queue會更容易,正如您在問題開始時提到的那樣。如果問題是使用GraphDSL時提取物化價值,這裏是你如何做到這一點:在docs在這個

final Source<String, SourceQueueWithComplete<String>> source = Source.queue(100, OverflowStrategy.backpressure()); 
    final Sink<Object, CompletionStage<akka.Done>> sink = Sink.ignore(); 

    final RunnableGraph<Pair<SourceQueueWithComplete<String>, CompletionStage<akka.Done>>> g = 
      RunnableGraph.fromGraph(
        GraphDSL.create(
          source, 
          sink, 
          Keep.both(), 
          (b, src, snk) -> { 
           b.from(src).to(snk); 
           return ClosedShape.getInstance(); 
          } 
        ) 
      ); 

    g.run(materializer); // this gives you back the queue 

更多信息。

+0

謝謝你的回答。如果我想要使用任意數量的源(例如,我有一個'List '),並將它們連接到合併階段?我怎麼能得到他們所有的隊列?此外,'Thread.sleep'在主線程中,它爲什麼會影響流? – akir94

+0

要合併任意數量的源,請查看MergeHub。這裏的文檔http://doc.akka.io/docs/akka/2.5/scala/stream/stream-dynamic.html#using-the-mergehub。至於代碼中的阻塞位,你得到一個'Thread.sleep'作爲你的'prestart'函數的一部分,再加上''onPull'回調函數中的'queue.poll'。這些都是阻塞的,不應在圖表階段內調用,除非您在專用調度器上運行它們。閱讀此信息獲取更多信息http://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management –

+0

謝謝,我完全錯過了'MergeHub'。最後一個問題 - 我想要一個類似於「MergePrioritized」的功能,其中每個「Source」具有不同的優先級。用'MergeHub'完成這件事的正確方法是什麼?該文件似乎沒有涵蓋它。 – akir94