2014-10-11 98 views
1

我試圖從數據庫讀取客戶端數據並將處理後的數據寫入平面文件。 但是在寫入數據之前我需要處理整個ItemReader的結果。Spring批處理如何在寫入數據之前處理數據列表

例如,我從數據庫中讀取行客戶端:

public class Client { 
    private String id; 
    private String subscriptionCode; 
    private Boolean activated; 
} 

但我想算,寫多少用戶被激活,通過subscriptionCode分組:

public class Subscription { 
    private String subscriptionCode; 
    private Integer activatedUserCount; 
} 

我不知道如何執行使用ItemReader/ItemProcessor/ItemWriter,你能幫助我嗎?

BatchConfiguration:

@CommonsLog 
@Configuration 
@EnableBatchProcessing 
@EnableAutoConfiguration 
public class BatchConfiguration { 

    @Autowired 
    private JobBuilderFactory jobBuilderFactory; 

    @Autowired 
    private StepBuilderFactory stepBuilderFactory; 

    @Bean 
    public Step step1() { 
     return stepBuilderFactory.get("step1") 
       .<Client, Client> chunk(1000) 
       .reader(new ListItemReader<Client>(new ArrayList<Client>() { // Just for test 
        { 
         add(Client.builder().id("1").subscriptionCode("AA").activated(true).build()); 
         add(Client.builder().id("2").subscriptionCode("BB").activated(true).build()); 
         add(Client.builder().id("3").subscriptionCode("AA").activated(false).build()); 
         add(Client.builder().id("4").subscriptionCode("AA").activated(true).build()); 
        } 
       })) 
       .processor(new ItemProcessor<Client, Client>() { 
        public Client process(Client item) throws Exception { 
         log.info(item); 
         return item; 
        } 
       }) 
       .writer(new ItemWriter<Client>() { 
        public void write(List<? extends Client> items) throws Exception { 
         // Only here I can use List of Client 
         // How can I process this list before to fill Subscription objects ? 
        } 
       }) 
       .build(); 
    } 

    @Bean 
    public Job job1(Step step1) throws Exception { 
     return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build(); 
    } 
} 

主要應用:

public class App { 
    public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { 
     System.exit(SpringApplication.exit(SpringApplication.run(BatchConfiguration.class, args))); 
    } 
} 
+0

不知道我是否完全理解了這個問題,你確實有'ItemProcessor'的權利..你可以在你的個人客戶端實例上做任何處理 - 除了你打算執行的處理以外? – 2014-10-11 15:12:15

+0

我想統計有多少用戶已將'activated'標誌設置爲'true',按subscriptionCode進行分組。所以我需要一個'客戶'列表來確定我的'訂閱'列表。但是使用'chunk',我只能逐行處理......而不是一個組。這個例子中的結果嘗試應該是一個List of 2 subscription:'Subscription(subscriptionCode = AA,activatedUserCount = 2)'和'Subscription(subscriptionCode = BB,activatedUserCount = 1)' – Aure77 2014-10-11 15:32:11

回答

0

我發現基於ItemProcessor一個解決方案:

@Bean 
public Step step1() { 
    return stepBuilderFactory.get("step1") 
     .<Client, Subscription> chunk(1000) 
     .reader(new ListItemReader<Client>(new ArrayList<Client>() { 
     { 
      add(Client.builder().id("1").subscriptionCode("AA").activated(true).build()); 
      add(Client.builder().id("2").subscriptionCode("BB").activated(true).build()); 
      add(Client.builder().id("3").subscriptionCode("AA").activated(false).build()); 
      add(Client.builder().id("4").subscriptionCode("AA").activated(true).build()); 
     } 
     })) 
     .processor(new ItemProcessor<Client, Subscription>() { 
     private List<Subscription> subscriptions; 

     public Subscription process(Client item) throws Exception { 
      for (Subscription s : subscriptions) { // try to retrieve existing element 
      if (s.getSubscriptionCode().equals(item.getSubscriptionCode())) { // element found 
       if(item.getActivated()) { 
       s.getActivatedUserCount().incrementAndGet(); // increment user count 
       log.info("Incremented subscription : " + s); 
       }        
       return null; // existing element -> skip 
      } 
      } 
      // Create new Subscription 
      Subscription subscription = Subscription.builder().subscriptionCode(item.getSubscriptionCode()).activatedUserCount(new AtomicInteger(1)).build(); 
      subscriptions.add(subscription); 
      log.info("New subscription : " + subscription); 
      return subscription; 
     } 

     @BeforeStep 
     public void initList() { 
      subscriptions = Collections.synchronizedList(new ArrayList<Subscription>()); 
     } 

     @AfterStep 
     public void clearList() { 
      subscriptions.clear(); 
     } 
     }) 
     .writer(new ItemWriter<Subscription>() {     
     public void write(List<? extends Subscription> items) throws Exception { 
      log.info(items); 
      // do write stuff 
     }     
     }) 
     .build(); 
} 

但我要保持第二Subscription列表爲ItemProcessor(我不知道,如果是線程安全的,高效的?)。你對這個解決方案有什麼看法?

+0

有什麼更好的想法? – Aure77 2014-11-05 20:05:41

+0

什麼都沒有?不知道 ? – Aure77 2014-12-05 14:33:49

0

如果我從您的意見了解你需要做激活帳戶的總結,對不對?
您可以爲您正在處理的每個Client創建一個Subscription,並使用ItemWriterLister.afterWrite將以上創建的Subscription的項目寫入數據庫。

+0

這些只是偵聽對象的定義由大塊。但是我怎麼能通過'List '到我的'ItemWriter'來處理呢? 我無法聲明'.Client,Subscription> chunk(1000)',因爲一個'Client'不能在一個'Subscription'中轉換(塊處理逐項)...... – Aure77 2014-10-11 15:56:46

+0

使用監聽器來寫東西並不常見。此外,ItemWriter寫入方法總是讓我訪問List a Client。但你給了我一個想法:使用'ItemProcessor'(看我的安裝程序) – Aure77 2014-10-11 19:44:41