2016-02-27 104 views
1

我有一個spring的xd模塊,用於從s3中拉取文件並逐行分割並在處理後將其刪除(ExpressionAdvice)。我的文件中有大約100萬條消息(行) s3。文件被下載到xd容器框中,並且我檢查了md5sum和它的相同並且具有相同的行。我看到只有260k個奇怪的消息來到輸出通道,這是處理器。我丟失了大約740條消息。有時它隨機一次,我看到所有消息,如100萬在我的輸出通道,有時只有250k。我正在測量這個使用計數器爲我的stream.File下載,但我覺得它被刪除之前處理​​所有記錄/行在10秒內,我的文件大小是700Mb左右。請讓我知道表達式建議是否在處理前刪除。文件在處理之前被刪除

module.aws-s3-source.count=1 and module.aws-s3-source.concurrency=70 
    stream1 as-s3-source |processor|sink 

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:int="http://www.springframework.org/schema/integration" 
     xmlns:context="http://www.springframework.org/schema/context" 
     xmlns:int-aws="http://www.springframework.org/schema/integration/aws" 
     xmlns:int-file="http://www.springframework.org/schema/integration/file" 
     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/integration/file 
     http://www.springframework.org/schema/integration/file/spring-integration-file.xsd 
      http://www.springframework.org/schema/context 
      http://www.springframework.org/schema/context/spring-context.xsd 
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/integration/aws http://www.springframework.org/schema/integration/aws/spring-integration-aws-1.0.xsd"> 


    <context:property-placeholder location="classpath*:test-${region}.properties" /> 

    <int:poller fixed-delay="${fixedDelay}" default="true"> 
     <int:advice-chain> 
      <ref bean="pollAdvise"/> 

     </int:advice-chain> 
    </int:poller> 


    <bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice"> 
     <constructor-arg ref="healthCheckStrategy"/> 

    </bean> 



    <bean id="healthCheckStrategy" class="test.ServiceHealthCheckPollSkipStrategy"> 
     <property name="url" value="${url}"/> 
     <property name="doHealthCheck" value="${doHealthCheck}"/> 
     <property name="restTemplate" ref="restTemplate"/> 

    </bean> 

    <bean id="restTemplate" 
      class="org.springframework.web.client.RestTemplate"> 
     <constructor-arg ref="requestFactory"/> 

    </bean> 


    <bean id="requestFactory" 
      class="test.BatchClientHttpRequestFactory"> 
     <constructor-arg ref="verifier"/> 

    </bean> 

    <bean id="verifier" 
      class="test.NullHostnameVerifier"> 

    </bean> 


    <bean id="encryptedDatum" class="test.EncryptedSecuredDatum"/> 




    <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration"> 
     <property name="proxyHost" value="${proxyHost}"/> 
     <property name="proxyPort" value="${proxyPort}"/> 
     <property name="preemptiveBasicProxyAuth" value="false"/> 
    </bean> 

    <bean id="s3Operations" class="test.CustomC1AmazonS3Operations"> 

     <constructor-arg index="0" ref="clientConfiguration"/> 
     <property name="awsEndpoint" value="s3.amazonaws.com"/> 
     <property name="temporaryDirectory" value="${temporaryDirectory}"/> 
     <property name="awsSecurityKey" value=""/> 
    </bean> 


    <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials"> 

    </bean> 

    <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com" 
             bucket="${bucket}" 
             s3-operations="s3Operations" 
             credentials-ref="credentials" 
             file-name-wildcard="${fileNameWildcard}" 
             remote-directory="${prefix}" 
             channel="splitChannel" 
             local-directory="${localDirectory}" 
             accept-sub-folders="false" 
             delete-source-files="true" 
             archive-bucket="${archiveBucket}" 
             archive-directory="${archiveDirectory}"> 
    </int-aws:s3-inbound-channel-adapter> 

    <int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8"> 

     <int-file:request-handler-advice-chain> 
      <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> 
       <property name="onSuccessExpression" value="payload.delete()"/> 
      </bean> 
     </int-file:request-handler-advice-chain> 

    </int-file:splitter> 

    <int:channel-interceptor pattern="*" order="3"> 
     <bean class="org.springframework.integration.channel.interceptor.WireTap"> 
      <constructor-arg ref="loggingChannel" /> 
     </bean> 
    </int:channel-interceptor> 
    <int:logging-channel-adapter id="loggingChannel" log-full-message="true" level="INFO"/> 

    <int:channel id="output"/> 

</beans> 

更新2:

我的流是象下面 AWS-S3-源|處理器| HTTP客戶端|處理器>隊列:testQueue

1)現在我分裂流象下面這樣:

aws-s3-source> queue:s3Queue 

我能夠非常快的讀取所有我的1萬條消息。 2)現在,我增加了一個流像下面我看到的問題又是S3停止拉文件和郵件都將丟失每次

queue:s3Queue>processor|http-client| processor> queue:testQueue 

3)觀察是當我添加HTTP客戶端這一問題再次發生,即。來自輸入源的一些消息丟失。

4)現在我將文件分割成125個MB的5個文件,而不是660mb一個文件.IE 200點多K的記錄5 files.I沒有看到這個問題讓我的所有消息

我也看到很多消息在http-client之前堵塞在隊列中。 我在想這是關於xd內存或線程的事情嗎?

回答

2

請讓我知道表達式建議是否在處理前刪除。

否;該建議是圍繞消息處理程序的一個around建議;它不能執行(評估表達式),直到分離器發出所有行。

是否有可能在文件被完全寫入之前從s3中拉出文件?

爲了調試這個問題,我建議改變建議將文件發送到另一個子流,並在刪除前做一些分析/記錄。

+0

不,我正在監視我的本地文件夾文件只能在完全上傳後拉動,並且拉出的文件和s3中的文件具有相同的md5hash和大小以及行數。我觀察到的一個觀察結果是文件在8 -10秒它被刪除了。我覺得它不可能在10秒內處理100萬條記錄 – constantlearner

+1

那麼,在分離器發送所有行之前,建議也不可能刪除文件。您需要添加調試。 –

+0

讓我檢查添加調試器 – constantlearner