2017-04-10 62 views
0

我已經編寫了一個Akka應用程序,該應用程序從Kafka獲取輸入,然後使用分片演員處理數據並輸出到Kafka。當發送給分片演員時,對ReactiveKafka的背壓

但在某些場合分片區域不能處理負載,我也得到:

你或許應該實行流量控制,以避免水浸 遠程連接。

如何在此鏈/流中實施背壓?

卡夫卡消費 - >共享演員 - >卡夫卡生產者

從代碼一些片斷:

ReactiveKafka kafka = new ReactiveKafka(); 

Subscriber subscriber = kafka.publish(pp, system); 

ActorRef kafkaWriterActor = (ActorRef) Source.actorRef(10000, OverflowStrategy.dropHead()) 
       .map(ix -> KeyValueProducerMessage.apply(Integer.toString(ix.hashCode()), ix)) 
       .to(Sink.fromSubscriber(subscriber)) 
       .run(materializer); 

ConsumerProperties cp = new PropertiesBuilder.Consumer(brokerList, intopic, consumergroup, new ByteArrayDeserializer(), new NgMsgDecoder()) 
         .build().consumerTimeoutMs(5000).commitInterval(Duration.create(60, TimeUnit.SECONDS)).readFromEndOfStream(); 

Publisher<ConsumerRecord<byte[], StreamEvent>> publisher = kafka.consume(cp,system); 

ActorRef streamActor = ClusterSharding.get(system).start("StreamActor", 
       Props.create(StreamActor.class, synctime), ClusterShardingSettings.create(system), messageExtractor); 

shardRegionTypenames.add("StreamActor"); 


Source.fromPublisher(publisher)     
       .runWith(Sink.foreach(msg -> {      
        streamActor.tell(msg.value(),ActorRef.noSender()); 
       }), materializer); 
+0

我是Akka框架的新手。但是您可以嘗試使用Akka Streams來照顧背壓技術。 – Advika

回答

1

也許你可以考慮並行的主題爲分區(如果適用),並與per-消費者打造通過調整ConsumerWithPerPartitionBackpressure中的this example與您的演員使用mapAsync and ask進行整合來分區背壓。

相關問題