實現此目的的一種方法是通過Partitioning。如果您想跟蹤哪個customersIds
已完成,這種方法似乎更好,因爲每個客戶ID都有一個從屬步驟。
步驟
1.首先通過實現org.springframework.batch.core.partition.support.Partitioner
接口創建分區類和填充Map<String, ExecutionContext>
爲每一個客戶ID。
因爲,您是通過客戶ID分區,方法參數gridSize
將不會用於您的情況。
代碼看起來像這樣,其中allCustomers
是您從數據庫準備的列表。
類 - 在主一步,從步驟條款CustomerPartitioner
Map<String, ExecutionContext> result = new HashMap<>();
int partitionNumber = 0;
for (String customer: allCustomers) {
ExecutionContext value = new ExecutionContext();
value.putString("customerId", customer);
result.put("Customer Id [" + customer+ "] : THREAD "
+ partitionNumber, value);
partitionNumber++;
}
2.Modify你的步驟定義。請參閱在線教程。
示例代碼與此類似。
@Bean
public Step customerPartitionerStep() throws Exception {
return step.get("customerPartitionerStep")
.partitioner(shipmentFactsStep())
.partitioner("shipmentFactsStep", customerPartitioner())
.gridSize(partitionerGridSize).taskExecutor(taskExecutor())
.build();
}
@Bean
public Step shipmentFactsStep() {
return stepBuilderFactory.get("shipmentFactsStep")
.<Shipmentfacts, Shipmentfacts>chunk(10000)
.reader(shipmentfactsItemReader())
.processor(shipmentFactProcessor())
.writer(shipmentFactsWriter())
.build();
}
@Bean
public Partitioner customerPartitioner() {
return new CustomerPartitioner();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor();
simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit);
return simpleTaskExecutor;
}
您可以將partitionerGridSize
設置爲任何值,因爲它未在分區器實現中使用。您可以稍後使用它根據數量總記錄進行分區,而不是僅使用客戶ID。
3.在上面步驟#2中的代碼中,設置concurrencyLimit=1
..這非常重要,這樣一次只能有一個客戶運行,並且它將爲您在第1步中輸入的所有客戶運行。您可以通過設置此值並行運行儘可能多的客戶。
4. customerId
從在步驟分割器在步驟#1可在讀取器,處理器等通過執行
@Bean
@StepScope
public ItemReader<ReadBean> shipmentfactsItemReader(
@Value("#{stepExecutionContext[customerId]}" String customerId){
..
}
注意註解,@StepScope
..that是強制性的這個值的結合來訪問。此外,在你的讀者定義,你需要通過null
這樣的 - .reader(shipmentfactsItemReader(null))
在你的Spring Batch的元數據,你將有儘可能多的步驟,一個主一步的客戶數量加。當所有從屬步驟完成時,主步驟將結束。
此處的優勢在於,如果需要,您可以並行處理許多客戶,並且客戶的每個從屬步驟都將在其自己的單獨線程中運行。
希望它有幫助!