2017-06-21 119 views
1

我們有要求將數據從1個數據庫移動到另一個數據庫並探索相同的彈簧批處理。我們的應用程序的用戶選擇源和目標數據源以及需要移動數據的表的列表。如何在運行時創建並啓動彈簧批處理作業

需要幫助的以下內容:

  1. 必要建立作業的信息都可以從我們的Web應用程序運行時 - 包括數據源的詳細信息和表名的列表。我們希望通過將這些詳細信息發送給作業生成器模塊並使用JobLauncher啓動它來創建新作業。我們如何編寫這個作業生成器模塊?
  2. 我們可能有多個用戶並行提出數據移動請求,因此需要創建多個作業並按適當順序運行它們的方法。

我們使用基於Java的配置來創建作業並從Web容器啓動它。該配置如下

@Bean 
public Job loadDataJob(JobCompletionNotificationListener listener) { 
    RunIdIncrementer inc = new RunIdIncrementer(); 
    inc.setKey(new Date().toString()); 
    JobBuilder builder = jobBuilderFactory.get("loadDataJob") 
      .incrementer(inc) 
      .listener(listener); 
    SimpleJobBuilder simpleBuilder = builder.start(preExecute()); 
    for(String s : getTables()){ 
     simpleBuilder.next(etlTable(s)); 
    } 
    simpleBuilder.next(postExecute()); 
    return simpleBuilder.build(); 
} 

@Bean 
@Scope("prototype") 
public Step etlTable(String tableName) { 
    return stepBuilderFactory.get(tableName) 
      .<Map<String,Object>, Map<String,Object>> chunk(1000) 
      .reader(dbDataReader(tableName)) 
      .processor(processor()) 
      .writer(dbDataWriter(tableName)) 
      .build(); 
} 

目前我們已經硬編碼的源和目標數據源的詳細信息到相應的豆類。 getTables()返回需要移動數據的表(硬編碼)的列表。即啓動該作業

@RestController 
public class MyController { 
    @Autowired 
    JobLauncher jobLauncher; 

    @Autowired 
    Job job; 

    @RequestMapping("/launchjob") 
    public String handle() throws Exception { 
     try { 
      JobParameters jobParameters = new JobParametersBuilder().addLong("time", new Date().getTime()).toJobParameters(); 
      jobLauncher.run(job, jobParameters); 
     } catch (Exception e) { 

     } 

     return "Done"; 
    } 
} 

回答

0

關於你的第一個問題

RestController,你一定要使用JavaConfiguration。此外,如果要使用動態數量的步驟創建作業(例如,您必須複製每個表的步驟),則不應將其步驟定義爲spring bean。

我已經寫了幾個關於如何動態創建作業的問題的答案。看看他們,他們可能會有所幫助

編輯
關於你的第二個問題的一些言論:

首先,您使用的是普通的JobLauncher,我假設您實例化了SimpleJobLauncher。這意味着,您可以使用作業參數提供作業,如您在上面的代碼中所示。但是,提供的「工作」不一定是「SpringBean」實例,因此您不必自動裝入它,因此,您可以按照上述問題的答案中的建議使用create-methodes。其次,如果您爲每個請求動態創建Job實例,則不需要將整個配置作爲jobparameters傳遞,因爲您可以將數據源和表等「配置屬性」作爲參數直接傳遞給您的「createJob」方法。如果您事先不知道所有可能的數據源,您甚至可以「即時」創建DataSource實例。

第三,我將每個請求視爲「單次運行」,不能「重新啓動」。因此,我只是將一些「元信息」放入作業參數中,如用戶,日期/時間,數據源名稱(url)和要複製的表格列表。我將這種信息當作一種記錄/審計來請求發佈的內容,但我不會將作業參數實例用作作業內部的控制參數(同樣,您可以在構建過程中傳遞這些參數的值將作業的時間和步驟傳遞給create-Methods,因此根據參數創建作業結構,因此,在運行時 - 當您可以訪問作業參數時 - 基於作業參數無關) 。最後,如果請求失敗(意味着作業退出時出現錯誤),則只需執行一個新請求即可重試,但此請求將是一個完整的新請求,而不是已執行作業的重新啓動發射(因爲我會將請求時間添加到我的工作參數中,所以每次發射都將是獨特的發射)。

編輯2: 不創建作爲Bean的作業並不意味着不使用自動裝配。這是一個例子,我會構建我的豆子。

@Component 
@EnableBatchProcessing 
@Import() // list with imports as neede 
public class JobCreatorComponent { 

    @Autowire 
    private StepBuilderFactory stepBuilder; 

    @Autowire 
    private JobBuilderFactory jobBuilder; 

    public Job createJob(all the parameters you need) { 
    return jobBuilder.get(). .... 
    } 
} 

@RestController 
@Import(JobCreatorComponent.class) 
public class MyController { 
    @Autowired 
    JobLauncher jobLauncher; 

    @Autowired 
    JobCreatorComponent jobCreator; 

    @RequestMapping("/launchjob") 
    public String handle() throws Exception { 
     try { 
      Job job = jobCreator.createJob(... params ...); 
      JobParameters jobParameters = new JobParametersBuilder().addLong("time", new Date().getTime()).toJobParameters(); 
      jobLauncher.run(job, jobParameters); 
     } catch (Exception e) { 

     } 

     return "Done"; 
    } 
} 
+0

您分享的鏈接中的一些帖子幫助我們解決了其他問題。然而,主要關心的問題仍然存在 - 如何將Spring批處理作爲服務使用,以便在用戶請求數據移動時創建和運行作業?在啓動作業時發送作業參數似乎對發送複雜對象(在我們的例子中爲數據源細節+數據移動請求細節)有限制。 – ksh

+0

我在回答中加了幾條評論。 –

+0

根據你的建議,我們不再像春季豆一樣創造就業機會,它對我們的用例來說效果很好。請注意,如果沒有使用自動裝配和未使用註釋(如@EnableBatchProcessing),則需要爲JobRepository和Job/Step構建器工廠進行少量手動配置。現在只關注如果我們因爲這種變化而失去任何東西。一旦分析完成,將在這裏更新。 – ksh

0

通過使用itemreader @JobScope無需在運行時手工做的事情只需要annoted您與@Jobscope各自的閱讀器,與控制器的每個交互,你會得到新的記錄處理。

這是一種按需工作的類型,您可以像執行數據庫遷移那樣執行目標作業,或者獲取像這樣的特定報告。

相關問題