2017-07-20 252 views
0

我有一個包含500k記錄的輸入文件。我需要批量處理這些記錄,應用轉換並寫入輸出文件。我試圖用下面的流程對它進行一點實驗。 batch.block大小設置爲1000.輸出文件只包含1000條記錄。其餘的490k記錄丟失。Mule批處理 - 在批處理塊中處理記錄並彙總到文件

按照我的理解,批量爲每個塊大小啓動一個新實例,在這種情況下,每1000條記錄將由一個新線程處理。這些線程是否互相覆蓋?如何將所有轉換後的記錄收集到輸出文件中?

<flow name="poll-inbound-file"> 
     <file:inbound-endpoint path="${file.inbound.location}" 
      pollingFrequency="${file.polling.frequency}" responseTimeout="10000" 
      doc:name="File" metadata:id="abce53af-7d82-411a-a75a-5cd8ae8e55ae" 
      fileAge="${file.fileage}" moveToDirectory="${file.outbound.location}"/> 
     <custom-interceptor 
      class="com.example.TimerInterceptor" doc:name="Timer" /> 

     <dw:transform-message doc:name="Transform Message" 
      metadata:id="dcf84872-5aca-404f-9169-d448c9e4cd76"> 
      <dw:input-payload mimeType="application/csv" /> 
      <dw:set-payload><![CDATA[%dw 1.0 
%output application/java 
--- 
payload as :iterator]]></dw:set-payload> 
     </dw:transform-message> 
     <batch:job name="process-batchBatch" block-size="${batch.blocksize}"> 

     <batch:process-records> 
      <batch:step name="Batch_Step1"> 
       <logger level="TRACE" doc:name="Logger" message="#[payload]" /> 
      </batch:step> 
      <batch:step name="Batch_Step2"> 
       <logger level="TRACE" doc:name="Logger" message="#[payload]" /> 
      </batch:step> 
      <batch:step name="Batch_Step3"> 

       <batch:commit doc:name="Batch Commit" size="1000"> 
       <expression-component doc:name="Expression"><![CDATA[StringBuilder sb=new StringBuilder(); 
for(String s: payload) 
{ 
    sb.append(s); 
    sb.append(System.lineSeparator()); 
} 
payload= sb.toString();]]></expression-component> 
        <file:outbound-endpoint path="${file.outbound.location}" 
         responseTimeout="10000" doc:name="File" /> 
       </batch:commit> 
      </batch:step> 
     </batch:process-records> 
     <batch:on-complete> 
      <logger 
       message="******************************************** Batch Report **************************************" 
       level="INFO" doc:name="Logger" /> 
     </batch:on-complete> 
    </batch:job> 

    </flow> 

回答

0

從多個線程同時寫入文件通常是不安全的。而是將結果寫入諸如ActiveMQ之類的隊列中,並將另一個流從隊列中讀取並寫入文件。您可以決定是否要在處理文件之前或之後開始從隊列中進行處理。