2015-11-06 154 views
0

我正在並行執行spring批處理作業,並使用SimpleAsyncTaskExecutor進行並行處理,並將throttle-limit設置爲default(缺省爲4)。 物品閱讀器正在讀取文本文件中的行,然後進行處理。 但是,正在發生的事情是文本文件中的一行正在處理4個不同的線程,使其執行單個塊4次。彈簧批量並行處理多次執行一個步驟

下面是我的batch.xml:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 
    <import resource="classpath*:/META-INF/spring/batch/override/**/*.xml" /> 
    <bean id="businessReader" class="com.rbsgbm.rates.eodtasks.batch.reader.BusinessItemReader"/> 
    <bean id="businessProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.BusinessItemProcessor" /> 
    <bean id="businessWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.BusinessItemWriter" /> 
    <bean id="deskReader" class="com.rbsgbm.rates.eodtasks.batch.reader.DeskItemReader"/> 
    <bean id="deskProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.DeskItemProcessor" /> 
    <bean id="deskWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.DeskItemWriter" /> 
    <bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.TradeSnapTasklet" id="tradeSnapTasklet"/> 
    <bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.FoundryExtractTasklet" id="foundryExtractTasklet"/> 
    <bean id="simpleFireTasklet" 
     class="com.rbsgbm.rates.eodtasks.batch.Tasklet.SimpleFireTasklet" /> 

    <bean id="mdxMarketDataSnapTasklet" 
     class="com.rbsgbm.rates.eodtasks.batch.Tasklet.MdxMarketDataSnapTasklet" /> 

    <bean id="stepListener" class="org.springframework.batch.core.listener.StepExecutionListenerSupport" /> 
    <bean id="restartJobListener" class="com.rbsgbm.rates.eodtasks.batch.listener.RestartListener"/> 
    <bean id="failedStepListener" class="com.rbsgbm.rates.eodtasks.batch.listener.FailedStepStepExecutionListener"/> 
    <bean id="taskExecutor" 
     class="org.springframework.core.task.SimpleAsyncTaskExecutor"> 
    </bean> 

    <job id="simpleDojJob" xmlns="http://www.springframework.org/schema/batch"> 
     <step id="processBusiness" next="simpleFireTask"> 
      <tasklet> 
       <chunk reader="businessReader" processor="businessProcessor" 
        writer="businessWriter" commit-interval="1" /> 
      </tasklet> 

     </step> 

     <step id="simpleFireTask" next="foundryTask"> 
      <tasklet task-executor="taskExecutor"> 
       <chunk reader="deskReader" processor="deskProcessor" 
        writer="deskWriter" commit-interval="1" /> 
      </tasklet> 

     </step> 

     <step id="foundryTask"> 
      <tasklet ref="foundryExtractTasklet"/> 
      <listeners> 
        <listener ref="stepListener"/> 
        <listener ref="restartJobListener"/> 
        <listener ref="failedStepListener"/> 
      </listeners>  
     </step> 
    </job> 
</beans> 

回答

2

如果你想擁有線程安全的讀者和作家,你必須實現他們這種方式。

默認情況下,每個線程都可能在同一時刻訪問您的閱讀器或書寫器的相同實例。如果您的閱讀器和書寫器沒有實現,它將無法正確處理它。

要確保它們是線程安全的,最簡單的方法是將讀寫器分別標記爲同步的寫入方法。

如果你不能改變的讀/寫的代碼,只需要實現一個簡單的包裝,並委託給你的讀/寫:

public class SynchronizedItemReader<T> implements ItemReader<T> 
{ 
    private ItemReader<T> delegate; 
    public void setDelegate(ItemReader<T> delegate) {this.delegate = delegate}; 

    public synchronized T read() { 
     return delegate.read(); 
    } 
} 

但請注意:如果您還實現ItemStream來跟蹤已成功由作家承擔(因此能夠在該位置重新啓動),您還需要管理它,因爲塊可以相互超越。

+1

感謝Hansjoerg Wingeier提供的答案。我可以輕鬆地使讀取和寫入方法同步。 – KayV

相關問題