我創建了一個POC項目,我在其中使用Spring批處理本地分區步驟將Employee表10記錄移動到NewEmployee表。我配置了4個線程來運行此批處理。 當我運行這個批處理過程時,我可以看到pagingItemReader()方法不是由從屬步驟調用的。由於此OraclePagingQueryProvider未被調用。 我注意到錯過的數字記錄(未移動)等於配置的線程數。 我開發這個POC從下面的鏈接以指導: - https://github.com/mminella/LearningSpringBatch/tree/master/src/localPartitioning從slaveStep不調用Spring批處理JDBCPagingItemReader
請注意,當我更換主機和從機代碼與正常讀取,處理以及沒有多線程參與編寫邏輯,下面的代碼工作正常。
DB中的BATCH_STEP_EXECUTION表還表示只移動了8條記錄(這裏有2條記錄再次錯過了,它等於線程數)。 DB記錄表示如下: -
STEP_NAME STATUS COMMIT_COUNT READ_COUNT WRITE_COUNT EXIT_CODE slaveStep:分區1已完成1 4 4竣工 slaveStep:已經完成1 4 4竣工 masterStep partition0竣工2 8 8竣工
的代碼段配置類
@Bean
public JobRegistryBeanPostProcessor jobRegistrar() throws Exception{
JobRegistryBeanPostProcessor registrar=new JobRegistryBeanPostProcessor();
registrar.setJobRegistry(this.jobRegistry);
registrar.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
registrar.afterPropertiesSet();
return registrar;
}
@Bean
public JobOperator jobOperator() throws Exception{
SimpleJobOperator simpleJobOperator=new SimpleJobOperator();
simpleJobOperator.setJobLauncher(this.jobLauncher);
simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
simpleJobOperator.setJobRepository(this.jobRepository);
simpleJobOperator.setJobExplorer(this.jobExplorer);
simpleJobOperator.setJobRegistry(this.jobRegistry);
simpleJobOperator.afterPropertiesSet();
return simpleJobOperator;
}
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner partitioner = new ColumnRangePartitioner();
partitioner.setColumn("id");
partitioner.setDataSource(this.dataSource);
partitioner.setTable("Employee");
LOGGER.info("partitioner---->"+partitioner);
return partitioner;
}
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(gridSize)
.taskExecutor(taskExecutorConfiguration.taskExecutor())
.build();
}
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Employee, NewEmployee>chunk(chunkSize)
.reader(pagingItemReader(null,null))
.processor(employeeProcessor())
.writer(employeeWriter.customItemWriter())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("FR")
.start(masterStep())
.build();
}
@Bean
public ItemProcessor<Employee, NewEmployee> employeeProcessor() {
return new EmployeeProcessor();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
}
*/
@Bean
@StepScope
public JdbcPagingItemReader<Employee> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minvalue,
@Value("#{stepExecutionContext['maxValue']}") Long maxvalue) {
JdbcPagingItemReader<Employee> reader = new JdbcPagingItemReader<Employee>();
reader.setDataSource(this.dataSource);
// this should be equal to chunk size for the performance reasons.
reader.setFetchSize(chunkSize);
reader.setRowMapper((resultSet, i) -> {
return new Employee(resultSet.getLong("id"),
resultSet.getString("firstName"),
resultSet.getString("lastName"));
});
OraclePagingQueryProvider provider = new OraclePagingQueryProvider();
provider.setSelectClause("id, firstName, lastName");
provider.setFromClause("from Employee");
LOGGER.info("min-->"+minvalue);
LOGGER.info("max-->"+maxvalue);
provider.setWhereClause("where id<=" + minvalue + " and id > " + maxvalue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
provider.setSortKeys(sortKeys);
reader.setQueryProvider(provider);
LOGGER.info("reader--->"+reader);
return reader;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
int targetSize = (max - min)/gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
LOGGER.info("Start-->" + start);
LOGGER.info("end-->" + end);
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
ColumnRangePartitioner類的代碼片段: -
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
int targetSize = (max - min)/gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
LOGGER.info("Start-->" + start);
LOGGER.info("end-->" + end);
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
您是否驗證了'Partitioner'正在恢復分區?作業存儲庫中的工作人員是否執行執行記錄? –
嗨邁克爾 - 感謝您的回覆。我用BATCH_STEP_EXECUTION表記錄更新了這個問題。在這裏,我可以看到只有8個記錄被讀取器讀出10(讀取的記錄數量等於線程數量)。同樣,當讀取器不讀取源表中的第二列數據時,它將第一列的值複製到第二列,同樣正在保存在目標表中。請注意,當我刪除分區邏輯時,這些問題都不存在。謝謝你的幫助。 – Abhilash