2016-04-25 91 views
0

我正在使用自定義Iteamreader bean從MongoDB讀取數據。我的閱讀器按照reader中定義的pageSize(50)返回數據。但是處理器只能從50開始獲得31行數據。我嘗試了各種塊大小,但是一些處理器只獲得了前31行。spring批處理:itemprocessor未獲取讀取器讀取的所有數據

請幫我找到了這個錯誤......我想聽衆,但沒能找到問題..

---- XML配置----

<?xml version="1.0" encoding="UTF-8"?> 

    <context:property-placeholder location="classpath:application.properties" /> 

    <context:component-scan base-package="com.xxx.yyy.batch.kernel" /> 
    <context:component-scan base-package="com.xxx.yyy.batch.dao" /> 

    <context:annotation-config /> 

    <!-- Enable Annotation based Declarative Transaction Management --> 
    <tx:annotation-driven proxy-target-class="true" 
     transaction-manager="transactionManager" /> 

    <!-- Creating TransactionManager Bean, since JDBC we are creating of type 
     DataSourceTransactionManager --> 
    <bean id="transactionManager" 
     class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> 
     <property name="dataSource" ref="dataSource" /> 
    </bean> 

    <batch:job id="txnLogJob" job-repository="jobRepository" 
     restartable="true"> 
     <batch:step id="txnload"> 
      <tasklet allow-start-if-complete="true"> 
       <chunk reader="txnLogItemReader" writer="txnLogItemWriter" 
        processor="txnLogProcessor" commit-interval="20" />    
      </tasklet> 
     </batch:step> 

     <batch:listeners> 
      <batch:listener ref="completionListener" /> 
     </batch:listeners> 
    </batch:job> 

    <bean id="completionListener" 
     class="com.xxx.yyy.batch.listeners.JobCompletionNotificationListener" /> 


    <bean id="jobParametersDAOImpl" class="com.xxx.yyy.batch.dao.JobParametersDAOImpl" /> 

    <bean id="batchLoader" class="com.xxx.yyy.batch.kernel.BatchLoader" /> 

    <bean id="batchjobParameter" class="com.xxx.yyy.batch.dao.Batch_Job_Parameters" /> 



    <bean id="txnLogItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" 
     scope="step"> 
     <property name="shouldDeleteIfExists" value="true" /> 
     <property name="resource" value="file:target/test-outputs/output.txt" /> 
     <property name="lineAggregator"> 
      <bean 
       class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 
     </property> 
    </bean> 


    <bean id="txnLogProcessor" 
     class="com.xxx.yyy.batch.processor.MessageContextItemProcessor" /> 

    <bean id="jobLauncher" 
     class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> 
     <property name="jobRepository" ref="jobRepository" /> 
    </bean> 

    <bean id="jobRepository" 
     class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> 
     <property name="databaseType" value="MYSQL" /> 
     <property name="dataSource" ref="dataSource" /> 
     <property name="transactionManager" ref="transactionManager" /> 
    </bean> 

    <bean id="dataSource" class="com.xxx.yyy.common.DataSource" 
     destroy-method="close"> 
     <property name="driverClassName" value="${jdbc.driverClassName}" /> 
     <property name="url" value="${jdbc.url}" /> 
     <property name="username" value="${jdbc.username}" /> 
     <property name="password" value="${jdbc.password}" /> 
     <property name="connectionProperties" value="${jdbc.connectionProperties}" /> 
     <property name="initialSize" value="${jdbc.initialSize}" /> 
     <property name="maxTotal" value="${jdbc.maxTotal}" /> 
     <property name="maxIdle" value="${jdbc.maxIdle}" /> 
     <property name="minIdle" value="${jdbc.minIdle}" /> 
     <property name="maxWaitMillis" value="${jdbc.maxWaitMillis}" /> 
     <property name="testOnBorrow" value="${jdbc.testOnBorrow}" /> 
     <property name="testWhileIdle" value="${jdbc.testWhileIdle}" /> 
     <property name="testOnReturn" value="${jdbc.testOnReturn}" /> 
     <property name="validationQuery" value="${jdbc.validationQuery}" /> 
    </bean> 

</beans> 

定製讀者豆:

@Bean 
    public MongoItemReader<MessageContext> txnLogItemReader() { 
     MongoItemReader<MessageContext> reader = new MongoItemReader<MessageContext>(); 
     reader.setPageSize(50); 
     reader.setCollection("txnlog"); 
     reader.setTemplate(mongoTemplate); 

     String query = null ; 
     query = "{ \"audit_info.created_on\": { $gt: { \"$date\" : ?0 }, $lte: { \"$date\" : ?1 } }, " 
         + "$and: [ { \"processing_status\": { $in: [?2] } } ] }" ; 




     reader.setQuery(query);   

     //Timestamp to_date_timestamp = jobParametersDAOImpl.getCurrentTimeStamp() ;   

     Batch_Job_Parameters job_param = jobParametersDAOImpl.getBatchJobParameters() ; 
     String from_date = job_param.getFrom_date().toString() ; 
     String [] splitstr = from_date.split(" ") ; 
     from_date = splitstr[0]+"T"+splitstr[1]+"00Z" ; 

     String to_date = job_param.getTo_date().toString() ; 
     splitstr = to_date.split(" ") ; 
     to_date = splitstr[0]+"T"+splitstr[1]+"00Z" ; 

     List<Object> parameterValues = new ArrayList<Object>() ; 
     parameterValues.add(from_date) ; 
     parameterValues.add(to_date) ;  
     parameterValues.add(job_param.getTxnlog_status_list()) ;  

     reader.setParameterValues(parameterValues); 

     reader.setTargetType(com....MessageContext.class); 
     Map<String,Direction> sorts = new HashMap<String,Direction>() ; 
     sorts.put("audit_info.created_on", org.springframework.data.domain.Sort.Direction.ASC) ; 
     reader.setSort(sorts); 

     return reader; 
    } 

回答

0

更新的答案,因爲這個問題將焦點轉移

我會讓MessageContextReadConverter不返回null,而是在Processor中進行驗證。如果Processor返回null,它只是增加過濾器計數,而不是混淆Step,因爲認爲沒有更多的行要處理。

+0

MongoDB包含超過500行,正如我前面提到的,我可以看到所有正在讀取的50行。 –

+0

MongoDB包含超過500行,我可以看到正在讀取的所有50行。我將偵聽程序(步,塊,讀)放入配置中。聽者的 輸出是: 「步驟之前」,「前塊」 「之前讀」 50行數據 「讀出之後」 19次: 「之前讀」 「之後讀出」 20行數據的「前塊」 10倍處理器和寫入器處理 「塊後」: 「之前讀」 「讀取後」,「塊之後」 「之前讀」 10行由處理器和作家 處理的數據的「步驟後」 –

0

我已經實現了MessageContextReadConverter implements Converter,並且在返回null的情況下轉換沒有完成。因此,如果爲null,read()方法不會將元素進一步傳遞給Processor/Writer。問題是Converter does not允許拋出異常。展望如何解決這部分。

相關問題