2017-08-17 107 views
0

我創建了一個POC項目,我在其中使用Spring批處理本地分區步驟將Employee表10記錄移動到NewEmployee表。我配置了4個線程來運行此批處理。 當我運行這個批處理過程時,我可以看到pagingItemReader()方法不是由從屬步驟調用的。由於此OraclePagingQueryProvider未被調用。 我注意到錯過的數字記錄(未移動)等於配置的線程數。 我開發這個POC從下面的鏈接以指導: - https://github.com/mminella/LearningSpringBatch/tree/master/src/localPartitioning從slaveStep不調用Spring批處理JDBCPagingItemReader

請注意,當我更換主機和從機代碼與正常讀取,處理以及沒有多線程參與編寫邏輯,下面的代碼工作正常。

DB中的BATCH_STEP_EXECUTION表還表示只移動了8條記錄(這裏有2條記錄再次錯過了,它等於線程數)。 DB記錄表示如下: -

STEP_NAME STATUS COMMIT_COUNT READ_COUNT WRITE_COUNT EXIT_CODE slaveStep:分區1已完成1 4 4竣工 slaveStep:已經完成1 4 4竣工 masterStep partition0竣工2 8 8竣工

的代碼段配置類

  @Bean 
       public JobRegistryBeanPostProcessor jobRegistrar() throws Exception{ 
        JobRegistryBeanPostProcessor registrar=new JobRegistryBeanPostProcessor(); 
        registrar.setJobRegistry(this.jobRegistry); 
        registrar.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); 
        registrar.afterPropertiesSet(); 
        return registrar; 
       } 

       @Bean 
       public JobOperator jobOperator() throws Exception{ 
        SimpleJobOperator simpleJobOperator=new SimpleJobOperator(); 
        simpleJobOperator.setJobLauncher(this.jobLauncher); 
        simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter()); 
        simpleJobOperator.setJobRepository(this.jobRepository); 
        simpleJobOperator.setJobExplorer(this.jobExplorer); 
        simpleJobOperator.setJobRegistry(this.jobRegistry); 

        simpleJobOperator.afterPropertiesSet(); 
        return simpleJobOperator; 

       } 

       @Bean 
       public ColumnRangePartitioner partitioner() { 
        ColumnRangePartitioner partitioner = new ColumnRangePartitioner(); 
        partitioner.setColumn("id"); 
        partitioner.setDataSource(this.dataSource); 
        partitioner.setTable("Employee"); 
        LOGGER.info("partitioner---->"+partitioner); 
        return partitioner; 
       } 

       @Bean 
       public Step masterStep() { 
        return stepBuilderFactory.get("masterStep") 
          .partitioner(slaveStep().getName(), partitioner()) 
          .step(slaveStep()) 
          .gridSize(gridSize) 
          .taskExecutor(taskExecutorConfiguration.taskExecutor()) 
          .build(); 
       } 

       @Bean 
       public Step slaveStep() { 
        return stepBuilderFactory.get("slaveStep") 
          .<Employee, NewEmployee>chunk(chunkSize) 
          .reader(pagingItemReader(null,null)) 
          .processor(employeeProcessor()) 
          .writer(employeeWriter.customItemWriter()) 
          .build(); 
       } 

       @Bean 
       public Job job() { 
        return jobBuilderFactory.get("FR") 
          .start(masterStep()) 
          .build(); 
       } 

       @Bean 
       public ItemProcessor<Employee, NewEmployee> employeeProcessor() { 
        return new EmployeeProcessor(); 
       } 

       @Override 
       public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { 
        this.applicationContext=applicationContext; 
       } 


       */ 

       @Bean 
       @StepScope 
       public JdbcPagingItemReader<Employee> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minvalue, 
         @Value("#{stepExecutionContext['maxValue']}") Long maxvalue) { 

        JdbcPagingItemReader<Employee> reader = new JdbcPagingItemReader<Employee>(); 
        reader.setDataSource(this.dataSource); 
        // this should be equal to chunk size for the performance reasons. 
        reader.setFetchSize(chunkSize); 
        reader.setRowMapper((resultSet, i) -> { 
         return new Employee(resultSet.getLong("id"), 
           resultSet.getString("firstName"), 
           resultSet.getString("lastName")); 
        }); 

        OraclePagingQueryProvider provider = new OraclePagingQueryProvider(); 
        provider.setSelectClause("id, firstName, lastName"); 
        provider.setFromClause("from Employee"); 
        LOGGER.info("min-->"+minvalue); 
        LOGGER.info("max-->"+maxvalue); 
        provider.setWhereClause("where id<=" + minvalue + " and id > " + maxvalue); 

        Map<String, Order> sortKeys = new HashMap<>(1); 
        sortKeys.put("id", Order.ASCENDING); 
        provider.setSortKeys(sortKeys); 

        reader.setQueryProvider(provider); 
        LOGGER.info("reader--->"+reader); 
        return reader; 
       } 

     @Override 
     public Map<String, ExecutionContext> partition(int gridSize) { 
      int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); 
      int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 
      int targetSize = (max - min)/gridSize + 1; 

      Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
      int number = 0; 
      int start = min; 
      int end = start + targetSize - 1; 

      while (start <= max) { 
       ExecutionContext value = new ExecutionContext(); 
       result.put("partition" + number, value); 

       if (end >= max) { 
        end = max; 
       } 
       LOGGER.info("Start-->" + start); 
       LOGGER.info("end-->" + end); 
       value.putInt("minValue", start); 
       value.putInt("maxValue", end); 
       start += targetSize; 
       end += targetSize; 
       number++; 
      } 

      return result; 
     } 

ColumnRangePartitioner類的代碼片段: -

int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); 
    int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 
    int targetSize = (max - min)/gridSize + 1; 

    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
    int number = 0; 
    int start = min; 
    int end = start + targetSize - 1; 

    while (start <= max) { 
     ExecutionContext value = new ExecutionContext(); 
     result.put("partition" + number, value); 

     if (end >= max) { 
      end = max; 
     } 
     LOGGER.info("Start-->" + start); 
     LOGGER.info("end-->" + end); 
     value.putInt("minValue", start); 
     value.putInt("maxValue", end); 
     start += targetSize; 
     end += targetSize; 
     number++; 
    } 

    return result; 
+0

您是否驗證了'Partitioner'正在恢復分區?作業存儲庫中的工作人員是否執行執行記錄? –

+0

嗨邁克爾 - 感謝您的回覆。我用BATCH_STEP_EXECUTION表記錄更新了這個問題。在這裏,我可以看到只有8個記錄被讀取器讀出10(讀取的記錄數量等於線程數量)。同樣,當讀取器不讀取源表中的第二列數據時,它將第一列的值複製到第二列,同樣正在保存在目標表中。請注意,當我刪除分區邏輯時,這些問題都不存在。謝謝你的幫助。 – Abhilash

回答

0

我找到了這個問題的解決方案。分區器後,我們必須在masterStep中添加partitionHandler。在partitionHandler中,我們定義了slaveStep和其他配置。以下是代碼片段。

MasterStep: - 添加這裏partitionHandler代碼,

 stepBuilderFactory 
      .get("userMasterStep") 
      .partitioner(userSlaveStep().getName(), userPartitioner()) 
      .partitionHandler(userMasterSlaveHandler()) 
      .build(); 

定義一個名爲partitionHandler另一個bean這裏所說的從步

@Bean 
public PartitionHandler userMasterSlaveHandler() throws Exception { 
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); 
    handler.setGridSize(gridSize); 
    handler.setTaskExecutor(taskExecutorConfiguration.taskExecutor()); 
    handler.setStep(userSlaveStep()); 
    handler.afterPropertiesSet(); 
    return handler; 
}