2013-02-15 46 views
1

我遇到了Camel的FTP2組件問題,消費者演員居住在Akka系統中。Akka + Camel + FTP2 + localWorkingDirectory無法可靠運行

基本的想法是監視一個FTP目錄的文件,然後產生一個小孩演員分別處理每個文件。 Akka正在用於管理併發性和可靠性。父消費者使用noop = true輪詢目錄,所以它不做任何事情,那麼兒童消費者應該下載文件,並用'include'駱駝選項進行過濾。下載是併發的,重要的是文件不要加載到內存中(因此使用localWorkDirectory)。

我寫了一個簡單的攝製:

package camelrepro; 

import java.io.InputStream; 

import org.mockftpserver.core.command.Command; 
import org.mockftpserver.core.command.ReplyCodes; 
import org.mockftpserver.core.session.Session; 
import org.mockftpserver.core.session.SessionKeys; 
import org.mockftpserver.fake.FakeFtpServer; 
import org.mockftpserver.fake.UserAccount; 
import org.mockftpserver.fake.command.AbstractFakeCommandHandler; 
import org.mockftpserver.fake.filesystem.FileEntry; 
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem; 

import akka.actor.ActorSystem; 
import akka.actor.Props; 
import akka.camel.CamelMessage; 
import akka.camel.javaapi.UntypedConsumerActor; 
import akka.testkit.JavaTestKit; 

public class Main { 

    public static class ParentActor extends UntypedConsumerActor { 

     public ParentActor() { 
      System.out.println("Parent started"); 
     } 
     @Override 
     public String getEndpointUri() { 
      return "ftp://[email protected]:8021?password=password&readLock=changed&initialDelay=0&delay=200&noop=true"; 
     } 

     @Override 
     public void onReceive(Object msg) throws Exception { 
      if (msg instanceof CamelMessage) { 
       getContext().actorOf(new Props(ChildActor.class), "0"); 
      } else { 
       unhandled(msg); 
      } 
     } 
    } 

    public static class ChildActor extends UntypedConsumerActor { 

     public ChildActor() { 
      System.out.println("Child started"); 
     } 

     @Override 
     public String getEndpointUri() { 
      return "ftp://[email protected]:8021?password=password&readLock=changed&initialDelay=0&delay=200&include=test.txt&localWorkDirectory=/tmp"; 
     } 

     @Override 
     public void onReceive(Object msg) throws Exception { 
      if (msg instanceof CamelMessage) { 
       System.out.println("Child got message"); 
       CamelMessage camelMsg = (CamelMessage) msg; 

       InputStream source = camelMsg.getBodyAs(InputStream.class, getCamelContext()); 
       System.out.println(source.getClass().getName()); 
       System.exit(0); 
      } else { 
       unhandled(msg); 
      } 
     } 
    } 

    public static void main(String[] args) { 

     ActorSystem system = ActorSystem.create("default"); 

     FakeFtpServer ftpServer = new FakeFtpServer(); 
     UnixFakeFileSystem ftpFileSystem = new UnixFakeFileSystem(); 
     ftpServer.setFileSystem(ftpFileSystem); 
     ftpServer.addUserAccount(new UserAccount("anonymous", "password", "/")); 
     ftpServer.setServerControlPort(8021); 

     // fix bug in PWD handling (either Apache FTP client or mock server depending on opinion) 
     ftpServer.setCommandHandler("PWD", new AbstractFakeCommandHandler() { 
      @Override 
      protected void handle(Command command, Session session) { 
       String currentDirectory = (String) session.getAttribute(SessionKeys.CURRENT_DIRECTORY); 
       this.replyCodeForFileSystemException = ReplyCodes.READ_FILE_ERROR; 
       verifyFileSystemCondition(notNullOrEmpty(currentDirectory), currentDirectory, "filesystem.currentDirectoryNotSet"); 
       int replyCode = ReplyCodes.PWD_OK; 
       String replyText = String.format("\"%s\" OK", currentDirectory.replaceAll("\"", "\"\"")); 
       session.sendReply(replyCode, replyText); 
      } 
     }); 
     ftpFileSystem.add(new FileEntry("/test.txt", "hello world")); 
     ftpServer.start(); 

     new JavaTestKit(system) {{ 
      getSystem().actorOf(new Props(ParentActor.class)); 
     }}; 
    } 
} 

Maven依賴顯示的版本:

<dependencies> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-actor_2.10</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-remote_2.10</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-camel_2.10</artifactId> 
      <version>2.1.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-testkit_2.10</artifactId> 
      <version>2.1.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.camel</groupId> 
      <artifactId>camel-ftp</artifactId> 
      <version>2.10.3</version> 
     </dependency> 
     <dependency> 
      <groupId>org.mockftpserver</groupId> 
      <artifactId>MockFtpServer</artifactId> 
      <version>2.4</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.commons</groupId> 
      <artifactId>commons-io</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>commons-net</groupId> 
      <artifactId>commons-net</artifactId> 
      <version>3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-simple</artifactId> 
      <version>1.7.2</version> 
     </dependency> 
    </dependencies> 

我希望看到的BufferedInputStream寫到標準輸出 - 並檢查ByteArrayInputStream的不是。

但是,相反,我看到的文件沒有發現異常:

[ERROR] [02/15/2013 10:53:32.951] [default-akka.actor.default-dispatcher-7] [akka://default/user/$a/0] Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory) 
org.apache.camel.TypeConversionException: Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory) 
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:162) 

有好幾次,它的工作,領導讓我懷疑這可能是一個種族的地方。但它幾乎總是失敗。

任何線索,想法,建議?

FWIW:

uname -a: Linux 3.2.0-37-generiC#58-Ubuntu SMP Thu Jan 24 15:28:10 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux 
java: 1.7.0_11-b21 

回答

2

我找到了解決上述問題的方法。

這是一個事實,兒童消費者autoAck()返回true(它默認情況下)。在這種情況下,akka-camel會發送CamelMessage即時消息,並繼續進行清理。與此同時,兒童演員實際上並沒有打開InputStream,直到由getBodyAs()調用的其中一個類型轉換器將其打開。因此,在通過getBodyAs()打開文件的子actor和異步發送消息後刪除文件的Camel清理之間存在競爭。

因此,修復方法是覆蓋autoAck()以返回false,並在子消息處理程序的結尾處發送Ack.getInstance()(或new Status.Failure(<cause>),如果您願意)。

1

使用駱駝2.10.2,因爲在2.10.3與FTP成分的問題

+0

好主意,但不幸的是,行爲與2.10.2相同。 – 2013-02-15 17:16:26

0

當使用localWorkDirectory =/TMP則該目錄是用於存儲文件的臨時在路由期間。當駱駝交易所完成時,文件被刪除。我不確定這是如何與Akka這是異步事件。所以在Camel交換完成後,Akka onReceive可能被稱爲異步,因此臨時文件被刪除。而不是:

在駱駝你會路線的文件,以更permament自然

from("ftp:...") 
    .to("file:inbox") 

的filke位置,然後你可以從阿卡(「收件箱文件」)消耗。

+0

在這種情況下轉移是否同時發生,並且在例如情況下是可靠的。 ftp服務器偶爾會死亡?我曾經這樣設置過,但是我的老闆不喜歡直接駱駝路線的方法,想要使用akka進行容錯:) – 2013-02-16 11:26:14