我一直在嘗試使用Java編寫自定義源代碼。具體來說,我寫了一個來自BlockingQueue的元素。我知道Source.queue,但是我不知道如何獲得物化值,如果我將其中幾個連接到合併階段。總之,這裏的實施:Akka Streams-合併階段有時僅在所有上游源推送到下游時纔會向下遊推進
public class TestingSource extends GraphStage<SourceShape<String>> {
private static final ExecutorService executor = Executors.newCachedThreadPool();
public final Outlet<String> out = Outlet.create("TestingSource.out");
private final SourceShape<String> shape = SourceShape.of(out);
private final BlockingQueue<String> queue;
private final String identifier;
public TestingSource(BlockingQueue<String> queue, String identifier) {
this.queue = queue;
this.identifier = identifier;
}
@Override
public SourceShape<String> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape()) {
private AsyncCallback<BlockingQueue<String>> callBack;
{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
String string = queue.poll();
if (string == null) {
System.out.println("TestingSource " + identifier + " no records in queue, invoking callback");
executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream
} else {
System.out.println("TestingSource " + identifier + " found record during pull, pushing");
push(out, string);
}
}
});
}
@Override
public void preStart() {
callBack = createAsyncCallback(queue -> {
String string = null;
while (string == null) {
Thread.sleep(100);
string = queue.poll();
}
push(out, string);
System.out.println("TestingSource " + identifier + " found record during callback, pushed");
});
}
};
}
}
這個示例工作,如此看來,我的來源是否正常工作:
private static void simpleStream() throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Source.fromGraph(new TestingSource(queue, "source"))
.to(Sink.foreach(record -> System.out.println(record)))
.run(materializer);
Thread.sleep(2500);
queue.add("first");
Thread.sleep(2500);
queue.add("second");
}
我還寫道,連接兩個來源,以一個合併階段的一個例子:
private static void simpleMerge() throws InterruptedException {
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>();
final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
Sink.foreach(record -> System.out.println(record)),
(builder, out) -> {
final UniformFanInShape<String, String> merge =
builder.add(Merge.create(2));
builder.from(builder.add(new TestingSource(queue1, "queue1")))
.toInlet(merge.in(0));
builder.from(builder.add(new TestingSource(queue2, "queue2")))
.toInlet(merge.in(1));
builder.from(merge.out())
.to(out);
return ClosedShape.getInstance();
}));
result.run(materializer);
Thread.sleep(2500);
System.out.println("seeding first queue");
queue1.add("first");
Thread.sleep(2500);
System.out.println("seeding second queue");
queue2.add("second");
}
有時這例如可以作爲我expect-它另外5秒鐘後打印「第一」 5秒,然後打印「第二」後。
然而,有時(約5分之1運行)它在10秒後打印「秒」,然後立即打印「第一」。換句話說,合併階段只有在兩個源都推送了某些東西時纔會將字符串推向下游。 完整的輸出如下所示:
TestingSource queue1 no records in queue, invoking callback
TestingSource queue2 no records in queue, invoking callback
seeding first queue
seeding second queue
TestingSource queue2 found record during callback, pushed
second
TestingSource queue2 no records in queue, invoking callback
TestingSource queue1 found record during callback, pushed
first
TestingSource queue1 no records in queue, invoking callback
這種現象發生更頻繁地MergePreferred和MergePrioritized。
我的問題是 - 這是合併的正確行爲?如果不是,我做錯了什麼?
謝謝你的回答。如果我想要使用任意數量的源(例如,我有一個'List
要合併任意數量的源,請查看MergeHub。這裏的文檔http://doc.akka.io/docs/akka/2.5/scala/stream/stream-dynamic.html#using-the-mergehub。至於代碼中的阻塞位,你得到一個'Thread.sleep'作爲你的'prestart'函數的一部分,再加上''onPull'回調函數中的'queue.poll'。這些都是阻塞的,不應在圖表階段內調用,除非您在專用調度器上運行它們。閱讀此信息獲取更多信息http://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management –
謝謝,我完全錯過了'MergeHub'。最後一個問題 - 我想要一個類似於「MergePrioritized」的功能,其中每個「Source」具有不同的優先級。用'MergeHub'完成這件事的正確方法是什麼?該文件似乎沒有涵蓋它。 – akir94