2017-03-18 73 views
-1

我是新的Akka,我試圖觸發或派遣事件消息到Akka演員,我有3個事件消息一個再見一個我觸發但爲什麼只有一個第一個事件正在被觸發。Akka事件消息不派遣或觸發另一個事件。總是被觸發的第一個事件其他事件沒有得到調度

這可能是因爲:receive(receiveEvent);此方法調用我的EventProcessActor構造函數。

但之後我們也打電話給其他事件,但有些我在這裏失蹤爲什麼它不派遣到其他比賽事件。

我總是得到以下輸出控制檯:

[INFO] [03/18/2017 13:35:53.446]... We received the Events need to process it 

我的預期成果是:

[INFO] [03/18/2017 13:35:53.446] ... We received the Events need to process it 

[INFO] [03/18/2017 13:35:53.447]... We are processing Events 

[INFO] [03/18/2017 13:35:53.446]... Completed Events processing 

上面控制檯輸出我已刪除[default-akka.actor.default-dispatcher-4] [akka://default/user/EventProcessing]...

我作爲觸發事件如下所示:

procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents); 
procsssEvents.tell(new EventProcessActor.EventActivity(Events.READING_LINE, Paths.get("/")), procsssEvents); 
procsssEvents.tell(new ventProcessActor.EventActivity(Events.END_OR,Paths.get("/")), procsssEvents); 

下面是我的Acotr類和消息類和pom.xml文件。

AkkaActor:

package com.ebc.biz.akka.event.trigger; 

import java.io.IOException; 
import java.nio.file.Path; 
import java.nio.file.Paths; 

import scala.PartialFunction; 
import scala.runtime.BoxedUnit; 

import akka.actor.AbstractLoggingActor; 
import akka.actor.ActorRef; 
import akka.actor.ActorSystem; 
import akka.actor.ActorSystemImpl; 
import akka.actor.Props; 
import akka.japi.pf.ReceiveBuilder; 

import static com.ebc.biz.akka.event.trigger.EventMessage.Events; 

public class EventProcessActor extends AbstractLoggingActor { 

    public static class EventActivity { 
     final EventMessage startOfEventMessage; 

     public EventMessage getStartOfEventMessage() { 
      return startOfEventMessage; 
     } 

     public EventActivity(Events events, Path eventPath) { 
      startOfEventMessage = new EventMessage(events, eventPath); 
     } 

    } 

    public static class EventReadingActivity { 

     final EventMessage startOfReadingMessage; 

     public EventMessage getStartOfReadingMessage() { 
      return startOfReadingMessage; 
     } 

     public EventReadingActivity(Events events, Path eventPath) { 
      startOfReadingMessage = new EventMessage(events, eventPath); 

     } 

    } 

    public static class EndOfEventActivity { 

     final EventMessage endOfEventMessage; 

     public EventMessage getEndOfEventMessage() { 
      return endOfEventMessage; 
     } 

     public EndOfEventActivity(Events events, Path eventPath) { 
      endOfEventMessage = new EventMessage(Events.END_OR, eventPath); 

     } 
    } 

    private final PartialFunction<Object, BoxedUnit> receiveEvent; 

    private final PartialFunction<Object, BoxedUnit> startEventsProcessing; 

    private final PartialFunction<Object, BoxedUnit> completeEventProcessing; 

    public EventProcessActor() { 

     receiveEvent = ReceiveBuilder 
       .match(EventActivity.class, this::onStartEventReceive) 
       .match(EventReadingActivity.class, this::readEventLine).build(); 

     startEventsProcessing = ReceiveBuilder 
       .match(EventReadingActivity.class, this::readEventLine) 
       .match(EndOfEventActivity.class, this::onEndOfEventProcessing) 
       .build(); 

     completeEventProcessing = ReceiveBuilder.match(
       EndOfEventActivity.class, this::onEndOfEventProcessing).build(); 

     receive(receiveEvent); 
    } 

    public static Props props() { 

     return Props.create(EventProcessActor.class); 
    } 

    public void onStartEventReceive(EventActivity fileActivity) { 
     log().info("We received the Events need to process it"); 
     getContext().become(startEventsProcessing); 
    } 

    public void readEventLine(EventReadingActivity fileActivity) { 
     log().info("We are processing Events"); 
     getContext().become(completeEventProcessing); 

    } 

    public void onEndOfEventProcessing(EndOfEventActivity fileActivity) { 
     log().info("Completed Events processing"); 

    } 

    public static void main(String args[]) throws IOException { 

     ActorSystem syste = ActorSystemImpl.create(); 
     final ActorRef procsssEvents = syste.actorOf(EventProcessActor.props(), 
       "Event" + "Processing"); 

     procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, 
       Paths.get("/")), procsssEvents); 
     procsssEvents.tell(new EventProcessActor.EventActivity(
       Events.READING_LINE, Paths.get("/")), procsssEvents); 
     procsssEvents.tell(new EventProcessActor.EventActivity(Events.END_OR, 
       Paths.get("/")), procsssEvents); 

     System.out.println("Enter to terminate"); 
     System.in.read(); 

    } 

} 

事件消息

package com.ebc.biz.akka.event.trigger; 

import java.nio.file.Path; 

public class EventMessage { 

    public static enum Events { 

     STSRT, READING_LINE, END_OR; 

    } 

    private final Events readEvents; 
    private final Path pathOfEvents; 

    public Path getPathOfEvents() { 
     return pathOfEvents; 
    } 

    public Events getReadEvents() { 
     return readEvents; 
    } 

    public EventMessage(Events readEvents, Path pathOfFile) { 
     this.readEvents = readEvents; 
     this.pathOfEvents = pathOfFile; 
    } 

} 

的pom.xml

<groupId>com.ebc.biz</groupId> 
    <artifactId>akka.event.trigger</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <properties> 
     <akka.version>2.4.9</akka.version> 
     <maven-dependency-plugin.version>3.0.0</maven-dependency-plugin.version> 
     <maven.compiler.plugin>3.6.1</maven.compiler.plugin> 
     <java.compiler.target>1.8</java.compiler.target> 
     <java.compiler.source>1.8</java.compiler.source> 
    </properties> 
    <dependencies> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-actor_2.11</artifactId> 
      <version>${akka.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-http-core_2.11</artifactId> 
      <version>${akka.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-http-experimental_2.11</artifactId> 
      <version>${akka.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-http-jackson-experimental_2.11</artifactId> 
      <version>${akka.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-dependency-plugin</artifactId> 
      <version>${maven-dependency-plugin.version}</version> 
     </dependency> 
    </dependencies> 

    <build> 
     <plugins> 
      <plugin> 
       <!-- This will download source so easy to see API and java doc. --> 
       <artifactId>maven-source-plugin</artifactId> 
       <executions> 
        <execution> 
         <id>attach-sources</id> 
         <phase>verify</phase> 
         <goals> 
          <goal>jar</goal> 
         </goals> 
        </execution> 
       </executions> 
      </plugin> 
      <!-- Java 8 compiler plugin --> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>${maven.compiler.plugin}</version> 
       <configuration> 
        <source>${java.compiler.source}</source> 
        <target>${java.compiler.target}</target> 
       </configuration> 
      </plugin> 

     </plugins> 
    </build> 
</project> 

爲什麼我的消息是不是G分派一個到另一個。我想我錯過了一些東西。

在此先感謝您提供任何信息和幫助。

回答

1

編輯:問題是 - 你發送EventActivity而男主角希望每個設計另一種類型,所以你應該更新main

procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents); 
    procsssEvents.tell(new EventProcessActor.EventReadingActivity(Events.READING_LINE, Paths.get("/")), procsssEvents); 
    procsssEvents.tell(new EventProcessActor.EndOfEventActivity(Events.END_OR, Paths.get("/")), procsssEvents); 
+0

如果我不是要求接收(receiveEvent);在EventProcessActor構造函數中,我得到了異常,並且比從主方法發送的所有消息都要「遇到死信」。 並且只有第一條消息纔會被解僱,因爲我稱之爲「receive(receiveEvent);」那是第一件事。 我試圖在調用「告訴調用」的每一行之前放置線程睡眠stil同樣的問題只有第一個事件正在被記錄或觸發。 我曾嘗試將Thread.sleep(2000)也放在每個調用之前。 –

+1

對,這個問題在別的地方,我已經更新了答案。 – MirMasej