2017-02-15 83 views
1

我正在使用自定義閱讀器和編寫器的春天批處理。如何運行具有不同參數的Spring批處理作業?

我有一個控制表customerId的

我需要多次運行相同的步驟,一次爲我的控制表中的每個客戶。

customerId應該能夠作爲參數傳遞,因爲我需要它在讀者以及作者中。

這怎麼能最好的實現?

@Bean 
public Step shipmentFactsStep() { 
    return stepBuilderFactory.get("shipmentFactsStep") 
      .<Shipmentfacts, Shipmentfacts>chunk(10000) 
      .reader(shipmentfactsItemReader()) 
      .processor(shipmentFactProcessor()) 
      .writer(shipmentFactsWriter()) 
      .build(); 
} 

回答

0

實現此目的的一種方法是通過Partitioning。如果您想跟蹤哪個customersIds已完成,這種方法似乎更好,因爲每個客戶ID都有一個從屬步驟。

步驟

1.首先通過實現org.springframework.batch.core.partition.support.Partitioner接口創建分區類和填充Map<String, ExecutionContext>爲每一個客戶ID。

因爲,您是通過客戶ID分區,方法參數gridSize將不會用於您的情況。

代碼看起來像這樣,其中allCustomers是您從數據庫準備的列表。

類 - 在主一步,從步驟條款CustomerPartitioner

Map<String, ExecutionContext> result = new HashMap<>(); 
int partitionNumber = 0; 
for (String customer: allCustomers) { 
     ExecutionContext value = new ExecutionContext(); 
     value.putString("customerId", customer); 
     result.put("Customer Id [" + customer+ "] : THREAD " 
      + partitionNumber, value); 
     partitionNumber++; 
     } 

2.Modify你的步驟定義。請參閱在線教程。

示例代碼與此類似。

@Bean 
    public Step customerPartitionerStep() throws Exception { 
    return step.get("customerPartitionerStep") 
     .partitioner(shipmentFactsStep()) 
     .partitioner("shipmentFactsStep", customerPartitioner()) 
     .gridSize(partitionerGridSize).taskExecutor(taskExecutor()) 
     .build(); 
    } 

    @Bean 
public Step shipmentFactsStep() { 
    return stepBuilderFactory.get("shipmentFactsStep") 
      .<Shipmentfacts, Shipmentfacts>chunk(10000) 
      .reader(shipmentfactsItemReader()) 
      .processor(shipmentFactProcessor()) 
      .writer(shipmentFactsWriter()) 
      .build(); 
} 

@Bean 
    public Partitioner customerPartitioner() { 
    return new CustomerPartitioner(); 
    } 

@Bean 
    public TaskExecutor taskExecutor() { 
    SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor(); 
    simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit); 
    return simpleTaskExecutor; 
    } 

您可以將partitionerGridSize設置爲任何值,因爲它未在分區器實現中使用。您可以稍後使用它根據數量總記錄進行分區,而不是僅使用客戶ID。

3.在上面步驟#2中的代碼中,設置concurrencyLimit=1 ..這非常重要,這樣一次只能有一個客戶運行,並且它將爲您在第1步中輸入的所有客戶運行。您可以通過設置此值並行運行儘可能多的客戶。

4. customerId從在步驟分割器在步驟#1可在讀取器,處理器等通過執行

@Bean 
@StepScope 
public ItemReader<ReadBean> shipmentfactsItemReader(
     @Value("#{stepExecutionContext[customerId]}" String customerId){ 
.. 
} 

注意註解,@StepScope ..that是強制性的這個值的結合來訪問。此外,在你的讀者定義,你需要通過null這樣的 - .reader(shipmentfactsItemReader(null))

在你的Spring Batch的元數據,你將有儘可能多的步驟,一個主一步的客戶數量加。當所有從屬步驟完成時,主步驟將結束。

此處的優勢在於,如果需要,您可以並行處理許多客戶,並且客戶的每個從屬步驟都將在其自己的單獨線程中運行。

希望它有幫助!

相關問題