2013-05-01 113 views
2

TL; DR:應該如何使用Spring批處理作業創建Spring批處理作業? 交易界限似乎是問題。這似乎是一個 經典問題,但在這裏再次:春季批量使用或如何在作業中啓動作業

我有以下用例:我需要輪詢FTP服務器和存儲找到 XML文件作爲blob在數據庫中。 XML有0 ... N條感興趣的我 需要發送到外部Web服務並存儲 響應。響應可能不可重試或可重試,我需要 存儲每個請求及其審覈目的的響應。

域/ JPA模型如下:批處理(包含XML blob)包含 0-N BatchRow對象。 BatchRow包含要發送到Web 服務的數據,它還包含1 N N BatchRowHistory對象,其中包含有關Web服務調用的狀態 的信息。

我被要求使用Spring批處理(Spring集成 本來可能是其他可能性,因爲這種情況下的集成)。現在 我一直在努力與不同的方法,我發現這項任務更復雜,因此它很難,因爲它恕我直言應該。

我已經分裂任務以下工作:

作業1

  • 第11步:取文件,並存儲到數據庫作爲一個blob。

  • 第12步:將XML拆分爲條目並將這些條目存儲到數據庫。

  • 步驟13:創建Job2併爲存儲在 中的每個條目啓動Step12。 Mark Job2在域模型 數據庫中爲條目創建了標誌。

作業2

  • 步驟21:調用Web服務的每個條目,結果存儲到數據庫中。重試和 跳過邏輯居住在這裏。 Job2類型可能需要手動重啓等。

該結構背後的邏輯是Job1定期運行(每分鐘一次左右) 。每當有 這些作業時,都會運行Job2,並且它們已經成功或者它們的重試限制已達到 ,並且它們已失敗。域模型基本上只存儲結果,並且彈簧批處理負責運行展示。手動重新啓動 等可以通過Spring批處理管理(至少我希望如此)。另外 Job2在JobParameters地圖中有BatchRow的ID,所以它可以在Spring Batch Admin中查看 。

問題1這個工作結構有意義嗎? I.e.創建新的 春季批處理作業在db中的每一行,它似乎擊敗了 的目的,並重新發明輪子在某種程度上?

問題2如何在Step13中創建這些Job2條目?

我得到了與交易和JobRepository第一問題,但成功 推出具備以下設置一些工作:

<batch:step id="Step13" parent="stepParent"> 
<batch:tasklet> 
    <batch:transaction-attributes propagation="NEVER"/> 
    <batch:chunk reader="rowsWithoutJobReader" processor="batchJobCreator" writer="itemWriter" 
       commit-interval="10" /> 
</batch:tasklet> 
</batch:step> 

<bean id="stepParent" class="org.springframework.batch.core.step.item.FaultTolerantStepFactoryBean" abstract="true"/> 

請注意,提交間隔=「10」是指本可以創建多達10個 工作目前就是這樣......因爲batchJobCreator調用 JobLauncher.run方法,並且它順其自然地運行,但itemWriter不能 將BatchRows寫回數據庫並更新信息(布爾型 jobCreated標誌已打開)。明顯的原因是傳播屬性中的propagation.NEVER,但沒有它我不能用jobLauncher創建作業。

因爲更新不傳遞到數據庫中,我再次得到相同的BatchRows和 他們雜亂的日誌,:

org.springframework.batch.retry.RetryException: Non-skippable exception in recoverer while processing; nested exception is org.springframework.batch.core.repository.JobExecutionAlreadyRunningException: A job execution for this job is already running: JobInstance: id=1, version=0, JobParameters=[{batchRowId=71}], Job=[foo.bar] 
     at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$2.recover(FaultTolerantChunkProcessor.java:278) 
     at org.springframework.batch.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:420) 
     at org.springframework.batch.retry.support.RetryTemplate.doExecute(RetryTemplate.java:289) 
     at org.springframework.batch.retry.support.RetryTemplate.execute(RetryTemplate.java:187) 
     at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:215) 
     at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.transform(FaultTolerantChunkProcessor.java:287) 
     at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:190) 
     at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:74) 
     at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386) 
     at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130) 
     at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264) 
     at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76) 
     at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367) 
     at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214) 
     at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143) 
     at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250) 
     at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195) 
     at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135) 
     at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61) 
     at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60) 
     at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144) 
     at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124) 
     at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135) 
     at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:293) 
     at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120) 
     at java.lang.Thread.run(Thread.java:680) 

這意味着,工作已經在Spring Batch的創建,它 嘗試在稍後執行Step13時再次創建這些文件。我可以在 Job2/Step21中繞過這個設置jobCreated標誌爲true的情況,但是它感覺很差,對我來說也是錯誤的。

問題3:我有更多的域對象驅動方法;我有Spring 批處理作業使用相當複雜的JPQL查詢 和JPAItemReaders掃描域表。這種方法的問題在於, 不使用Spring Batch的更精細的功能。歷史和重試邏輯是 的問題。我需要將重試邏輯直接編碼到JPQL查詢 (例如,如果BatchRow具有超過3個BatchRowHistory 元素,它已失敗並需要手動重新檢查)。 我應該 咬緊牙關,繼續採用這種方法,而不是嘗試 爲每個Web服務調用創建單獨的Spring Batch Job?

需要的軟件信息:Spring Batch 2.1.9,Hibernate 4.1.2,Spring 3.1.2,Java 6。

預先感謝您在百忙的很長的故事,蒂莫

編輯1: 爲什麼我認爲我需要生成新的就業機會的原因是這樣的:

  • 循環一邊讀卡返回空或拋出異常

  • 交易開始

  • 讀者 - 處理器 - 寫入器環路爲整個N行

  • 事務端批量大小N

每個失敗的條目的問題;我希望手動重新啓動 執行(作業是唯一可以在批處理管理器中彈出的對象,對嗎?),以便我可以使用 Spring Batch Admin查看失敗的作業(其作業參數爲 其中包含來自域數據庫的行ID)並重新啓動這些等。我如何在不產生作業並將 歷史存儲到域db的情況下完成此類行爲?

回答

0

好吧,我討厭回答問題......但我需要知道些什麼? 1)如果你的輸入文件是XML,你爲什麼不在他們上使用StaxEventItemReader,並且只是在步驟1中保存你的條目?

2)從一步開始第二份工作!我甚至不知道它是否可以工作......但是IMO ......它聞起來;-)

爲什麼不定義另一個步驟,使用JdbcCursorItemReader讀取條目並在ItemProcessor中調用Web服務,然後將結果寫入數據庫?

也許我不明白你的要求,爲每個調用Web服務創建不同的工作!

我沒有類似的東西給你的使用情況,並使用此方案來完成:

工作1: 第1步:讀取XML,工藝pojo->域OBJ,在DB

寫域OBJ工作2: 第1步:從數據庫中,工藝=呼叫WS閱讀OBJ,在DB

寫響應

這是簡單,效果很不錯(包括重新啓動,並跳過功能)

希望這將有助於

關於

+0

感謝您的回覆。 1:我需要將原始文件存儲到數據庫以進行審計 的目的。 StaxReader無法直接從db afaik讀取該文件。 2:是的,這似乎是不是很容易完成的問題;-) 關於您的結構:除了作業 產卵部分,Job1看起來與我的一樣。 – tfriman 2013-05-02 20:22:03

+0

作業2大致相同,當快樂路徑走時,意味着當 沒有IO問題。所以當WS調用成功時,它會很好地向db寫入 響應。當有問題時,因爲事務被回滾,所以沒有任何東西會被寫入數據庫中 。 如果您能夠將WS調用 結果寫入數據庫,您如何處理重試?跳過元素也需要登錄到 數據庫,你是否設法做到這一點?我知道 RetryListener和SkipListener接口,但它們是否允許將 (JPA)條目寫回數據庫? – tfriman 2013-05-02 20:22:36