2016-11-09 76 views
0

我有一個作業配置,在加載一組文件後並行加載一組文件我也想並行加載另一組文件,但僅限於第一組完全加載後。第二組對第一組具有參考字段。我以爲我可以使用第二次拆分,但從來沒有得到它的工作,在xsd中,似乎你可以定義多個拆分,顯然流程不能幫助我滿足我的要求。 那麼如何定義2組順序流向每個流?定義兩個拆分並行運行一組步驟

<job> 
    <split> 
    <flow> 
     <step next="step2"/> 
     <step id="step2"/> 
    </flow> 
    <flow> 
     <step ...> 
    </flow> 
    </split> 
<split ../> 

Asoub是正確的,這簡直是可行的,我做了一個簡單的配置和它的工作。所以看起來我得到的原始問題有一些其他問題,當定義2分裂時會導致問題。

簡單的配置我用:

<batch:job id="batchJob" restartable="true"> 
    <batch:split id="x" next="y"> 
     <batch:flow> 
      <batch:step id="a"> 
       <batch:tasklet allow-start-if-complete="true"> 
        <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/> 
       </batch:tasklet> 
      </batch:step> 
     </batch:flow> 
     <batch:flow> 
      <batch:step id="b"> 
       <batch:tasklet allow-start-if-complete="true"> 
        <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/> 
       </batch:tasklet> 
      </batch:step> 
     </batch:flow> 
    </batch:split> 
    <batch:split id="y" next="e"> 
     <batch:flow> 
      <batch:step id="c"> 
       <batch:tasklet allow-start-if-complete="true"> 
        <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/> 
       </batch:tasklet> 
      </batch:step> 
     </batch:flow> 
     <batch:flow> 
      <batch:step id="d"> 
       <batch:tasklet allow-start-if-complete="true"> 
        <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/> 
       </batch:tasklet> 
      </batch:step> 
     </batch:flow> 
    </batch:split> 
    <batch:step id="e"> 
     <batch:tasklet allow-start-if-complete="true"> 
      <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/> 
     </batch:tasklet> 
    </batch:step> 
</batch:job> 


INFO: Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{random=994444}] 
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep 
INFO: Executing step: [a] 
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep 
INFO: Executing step: [b] 
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep 
INFO: Executing step: [c] 
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep 
INFO: Executing step: [d] 
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep 
INFO: Executing step: [e] 
Nov 23, 2016 11:33:25 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run 
INFO: Job: [FlowJob: [name=batchJob]] completed with the following parameters: [{random=994444}] and the following status: [COMPLETED] 
+0

「那麼,如何定義2套平行流的這按順序依次運行「這對我來說似乎是一個矛盾,你無法按順序並行地加載文件2,對嗎?我想我明白你在做什麼,但是我需要更多的細節來說明你的意思是」加載「:整個step1只加載file1嗎?或者你認爲加載只是先讀取整個file1?或者可能首先處理整個file1? – Asoub

回答

1

正如我在評論中說,「所以我怎麼定義2套,其按順序平行流每?」本身沒有意義,你無法平行並順序地開始兩步。

我仍然認爲你想要「當step1中的file1加載完成時,開始在step2中加載file2」。這意味着加載文件發生在一個步驟的中間。我看到了解決這個問題的兩種方法。

比方說,這是你的配置:

<job id="job1"> 
    <split id="split1" task-executor="taskExecutor" next="step3"> 
     <flow> 
      <step id="step1" parent="s1"/> 
     </flow> 
     <flow> 
      <step id="step2" parent="s2"/> 
     </flow> 
    </split> 
    <step id="step3" parent="s4"/> <!-- not important here --> 
</job> 

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/> 

但是,這將立刻啓動這兩個在parrallel的步驟。您需要阻止第2步的開始。因此,您需要在第2步的reader中使用Delegate,它將立即停止加載file2,並等待信號開始讀取。在步驟1的代碼中的某處,您可以考慮加載完成,然後向步驟2的代理閱讀器啓動加載file2的信號。

第二種解決方案是:您創建自己的SimpleAsyncTaskExecutor,它將啓動step1並等待來自step1的信號啓動step2。這基本上是第一種解決方案,但您需要等待自定義Executor中的信號而不是Delegate閱讀器中的信號。 (你可以從SimpleAsyncTaskExecutor複製源代碼以得到一個想法)

這是有代價的,如果step1永遠不會到達它指示step2開始加載的部分,那麼您的批處理將永遠掛起。也許加載異常可能會導致這種情況。至於信號機制,Java有很多方法可以做到這一點(wait()notifiy(),鎖,信號量,也許是非標準庫)。

我不認爲在春季批次中有一些平行階躍觸發器的國王(但如果有的話,有人發佈它)。


我已經回答了一小會兒問你的問題,你需要2個分裂:第一個將裝入文件,二,一組文件B.

<job id="job1"> 
    <split id="splitForSet_A" task-executor="taskExecutor" next="splitForSet_B"> 
     <flow><step id="step1" parent="s1"/></flow> 
     <flow><step id="step2" parent="s2"/></flow> 
     <flow><step id="step3" parent="s3"/></flow> 
    </split> 
    <split id="splitForSet_B" task-executor="taskExecutor" next="stepWhatever"> 
     <flow><step id="step4" parent="s4"/></flow> 
     <flow><step id="step5" parent="s5"/></flow> 
     <flow><step id="step6" parent="s6"/></flow> 
    </split> 
    <step id="stepWhatever" parent="sx"/> 
</job> 

步驟1,2和3將在parrallel(和負載文件集A)上運行,然後,一旦他們全部結束,第二分割(splitForSet_B)將啓動並運行在步驟4,5和6 parrallel。分割基本上是一個步驟,其中包含平行運行的步驟。

你只需要在每個步驟來指定要使用什麼文件(所以這將是對在第二分割步驟先拆步驟不同。

+0

當然這是有道理的,我有一個由3個步驟組成的集合A,它們並行運行,我得到了一個集合B,它應該並行運行,但集合A已經被處理。等待在另一個完成之後執行2個分割。 – 1174

+0

@ 1174你是什麼意思「由3個步驟組成的集合A」?你的意思是這3個步驟平行運行(假設'split1')爲多個文件(「集合A」),然後一旦集合A中的所有文件都被處理完畢(3個步驟結束),就可以做同樣的事情「set B」(其他文件)的東西(相同的3個步驟)? – Asoub

+0

是的,這就是我的意思。我遵循Dean提供的第一個建議來分割大文件,但是最初的問題是並行加載第一個集合,然後第二個集合並行運行,仍然沒有看到如何在XML中描述它,或者如果它是可能的。對不起,不知道如何描述它。 – 1174

1

我會使用兩個分隔的步驟。每個分區將負責確定其相應的一套文件的併發兒童步驟來處理

<job> 
    <step name="loadFirstSet"> 
    <partition partitioner="firstSetPartitioner"> 
     <handler task-executor="asyncTaskExecutor" /> 
     <step name="loadFileFromSetOne> 
     <tasklet> 
      <chunk reader="someReader" writer="someWriter" commit-interval="#{jobParameters['commit.interval']}" /> 
     </tasklet> 
     </step> 
    </partition> 
    </step> 
    <step name="loadSecondSet"> 
    <partition partitioner="secondSetPartitioner"> 
     <handler task-executor="asyncTaskExecutor" /> 
     <step name="loadFileFromSecondSet> 
     <tasklet> 
      <chunk reader="someOtherReader" writer="someOtherWriter" commit-interval="#{jobParameters['another.commit.interval']}" /> 
     </tasklet> 
     </step> 
    </partition> 
    </step> 
</job> 
+0

我現在不想分割,我想運行一組並行的文件,一旦所有文件都被讀取,我想在第一組讀完所有文件後才並行加載另一組文件,爲了參照完整性,寫入數據庫。 – 1174

+0

您可以使用'AsyncTaskExecutor'並行運行分區子步驟。第一個分區步驟將用於並行加載所有的「Set 1」文件。一旦完成所有這些,第二個分區步驟可以並行加載所有的「Set 2」文件。這也使您可以通過修改發送到每個「分區程序」的文件集來輕鬆更改每個集中的文件數量。 –

+0

感謝您的第一個建議,我現在已經劃分了這個大文件,並且幫助了很多。對於你最後的評論,我也使用了異步,例如我將文件分割成n個文件並且並行讀取它們。我仍然不明白我能如何整理。假設您有在商店購買的顧客,我需要首先加載商店以具有參照完整性,還有一些其他參考數據,比如產品。那些我想平行加載的。所有加載後,我加載商店和訂單數據。除了分區之外,在第一組加載之後,我看不到如何並行運行它們。 – 1174