2017-10-06 138 views
0

我運行一個阿卡流卡夫卡應用程序,我想結合的流消費者的監督策略,例如,如果經紀人下降,流消費者停止超時後死亡,主管可以重新啓動消費者。阿卡卡夫卡流監理策略不工作

這裏是我的完整代碼:

UserEventStream

import akka.actor.{Actor, PoisonPill, Props} 
import akka.kafka.{ConsumerSettings, Subscriptions} 
import akka.kafka.scaladsl.Consumer 
import akka.stream.scaladsl.Sink 
import akka.util.Timeout 
import org.apache.kafka.clients.consumer.ConsumerConfig 
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer} 

import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.{Failure, Success} 
import akka.pattern.ask 
import akka.stream.ActorMaterializer 

class UserEventStream extends Actor { 

    val settings = Settings(context.system).KafkaConsumers 
    implicit val timeout: Timeout = Timeout(10 seconds) 
    implicit val materializer = ActorMaterializer() 

    override def preStart(): Unit = { 
    super.preStart() 
    println("Starting UserEventStream....s") 
    } 
    override def receive = { 
    case "start" => 
     val consumerConfig = settings.KafkaConsumerInfo 
     println(s"ConsumerConfig with $consumerConfig") 
     startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1)) 
    } 

    def startStreamConsumer(config: Map[String, String]) = { 
    println(s"startStreamConsumer with config $config") 

    val consumerSource = createConsumerSource(config) 
    val consumerSink = createConsumerSink() 
    val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor") 

    println("START: The UserEventStream processing") 
    val future = 
     consumerSource 
     .mapAsync(parallelism = 50) { message => 
      val m = s"${message.record.value()}" 
      messageProcessor ? m 
     } 
     .runWith(consumerSink) 
    future.onComplete { 
     case Failure(ex) => 
     println("FAILURE : The UserEventStream processing, stopping the actor.") 
     self ! PoisonPill 
     case Success(ex) => 
    } 
    } 

    def createConsumerSource(config: Map[String, String]) = { 
    val kafkaMBAddress = config("bootstrap-servers") 
    val groupID = config("groupId") 
    val topicSubscription = config("subscription-topic").split(',').toList 
    println(s"Subscriptiontopics $topicSubscription") 

    val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(kafkaMBAddress) 
     .withGroupId(groupID) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") 

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*)) 
    } 

    def createConsumerSink() = { 
    Sink.foreach(println) 
    } 
} 

StreamProcessorSupervisor(這是主管類UserEventStream類):

import akka.actor.{Actor, Props} 
import akka.pattern.{Backoff, BackoffSupervisor} 
import akka.stream.ActorMaterializer 
import stream.StreamProcessorSupervisor.StartClient 
import scala.concurrent.duration._ 

object StreamProcessorSupervisor { 
    final case object StartSimulator 
    final case class StartClient(id: String) 
    def props(implicit materializer: ActorMaterializer) = 
    Props(classOf[StreamProcessorSupervisor], materializer) 
} 

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor { 
    override def preStart(): Unit = { 
    self ! StartClient(self.path.name) 
    } 

    def receive: Receive = { 
    case StartClient(id) => 
     println(s"startCLient with id $id") 
     val childProps = Props(classOf[UserEventStream]) 
     val supervisor = BackoffSupervisor.props(
     Backoff.onFailure(
      childProps, 
      childName = "usereventstream", 
      minBackoff = 1.second, 
      maxBackoff = 1.minutes, 
      randomFactor = 0.2 
     ) 
    ) 
     context.actorOf(supervisor, name = s"$id-backoff-supervisor") 
     val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream") 
     userEventStrean ! "start" 
    } 
} 

App(主應用程序類) :

import akka.actor.{ActorSystem, Props} 
import akka.stream.ActorMaterializer 

object App extends App { 

    implicit val system = ActorSystem("stream-test") 
    implicit val materializer = ActorMaterializer() 

    system.actorOf(StreamProcessorSupervisor.props,"StreamProcessorSupervisor") 
} 

application.conf

kafka { 

    consumer { 

    num-consumers = "1" 
    c1 { 
     bootstrap-servers = "localhost:9092" 
     bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1} 
     groupId = "localakkagroup1" 
     subscription-topic = "test" 
     subscription-topic = ${?SUBSCRIPTION_TOPIC1} 
     message-type = "UserEventMessage" 
     poll-interval = 50ms 
     poll-timeout = 50ms 
     stop-timeout = 30s 
     close-timeout = 20s 
     commit-timeout = 15s 
     wakeup-timeout = 10s 
     max-wakeups = 10 
     use-dispatcher = "akka.kafka.default-dispatcher" 
     kafka-clients { 
     enable.auto.commit = true 
     } 
    } 
    } 
} 

運行的應用程序後,我故意殺死了卡夫卡的經紀人,然後發現,30秒後,演員是通過發送一個毒丸計劃停止本身。但奇怪的是,它不會像BackoffSupervisor策略中提到的那樣重新啓動。

這裏有什麼問題?

回答

0

還有的UserEventStream兩個實例在你的代碼:一個是兒童演員的BackoffSupervisor內部與您傳遞給它的Props創建,另一個是val userEventStrean那就是StreamProcessorSupervisor一個孩子。你應該發送"start"消息給後者,當你應該發送消息給前者時。

您不需要val userEventStrean,因爲BackoffSupervisor創建了子actor。發送到BackoffSupervisor消息被轉發到孩子,所以發送"start"消息,孩子,將其發送到BackoffSupervisor

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor { 
    override def preStart(): Unit = { 
    self ! StartClient(self.path.name) 
    } 

    def receive: Receive = { 
    case StartClient(id) => 
     println(s"startCLient with id $id") 
     val childProps = Props[UserEventStream] 
     val supervisorProps = BackoffSupervisor.props(...) 
     val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor") 
     supervisor ! "start" 
    } 
} 

的另一個問題是,當一個演員收到PoisonPill,這是不一樣的作爲該演員投擲異常的事情。因此,當UserEventStream自己發送PoisonPill時,Backoff.onFailure將不會被觸發。一個PoisonPill停止演員,所以使用Backoff.onStop代替:

val supervisorProps = BackoffSupervisor.props(
    Backoff.onStop(// <--- use onStop 
    childProps, 
    ... 
) 
) 
val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor") 
supervisor ! "start" 
+0

甚至做出上述改變孩子停止後主管不發送「開始」消息後的獨生子女。雖然孩子再次被創造。主管如何將消息發送給孩子 – Deepakkumar