2016-08-30 142 views
0

我正在使用java配置的spring批處理(對此是新的),並且在嘗試使用ClassifierCompositeItemWriter時遇到錯誤,因此根據分類生成單獨的文件。使用ClassifierCompositeItemWriter的批處理java配置錯誤

錯誤即時得到是org.springframework.batch.item.WriterNotOpenException:作家必須是開放的,纔可以被寫入

我的配置看起來像如下:

package com.infonova.btcompute.batch.geneva.job; 

import com.infonova.btcompute.batch.billruntransfer.BillRunTranStatusFinishedJobAssignment; 
import com.infonova.btcompute.batch.billruntransfer.BillRunTranStatusInprogressJobAssignment; 
import com.infonova.btcompute.batch.billruntransfer.BillRunTransferStatus; 
import com.infonova.btcompute.batch.geneva.camel.GenevaJobLauncher; 
import com.infonova.btcompute.batch.geneva.dto.GenevaDetailsResultsDto; 
import com.infonova.btcompute.batch.geneva.dto.GenveaDetailsTransactionDto; 
import com.infonova.btcompute.batch.geneva.properties.GenevaDetailsExportJobProperties; 
import com.infonova.btcompute.batch.geneva.rowmapper.GenevaDetailsTransactionsRowMapper; 
import com.infonova.btcompute.batch.geneva.steps.*; 
import com.infonova.btcompute.batch.repository.BillrunTransferStatusMapper; 
import com.infonova.btcompute.batch.utils.FileNameGeneration; 
import com.infonova.product.batch.camel.CamelEnabledJob; 
import org.apache.camel.Processor; 
import org.apache.camel.builder.RouteBuilder; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.batch.core.Job; 
import org.springframework.batch.core.Step; 
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; 
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; 
import org.springframework.batch.core.configuration.annotation.StepScope; 
import org.springframework.batch.core.step.tasklet.Tasklet; 
import org.springframework.batch.item.file.FlatFileItemWriter; 
import org.springframework.batch.item.support.ClassifierCompositeItemWriter; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.classify.BackToBackPatternClassifier; 
import org.springframework.context.annotation.Bean; 
import org.springframework.core.io.FileSystemResource; 
import org.springframework.jdbc.core.JdbcTemplate; 

import javax.sql.DataSource; 
import java.io.File; 
import java.sql.SQLException; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 


public abstract class AbstractGenevaDetailsExportJob extends CamelEnabledJob { 

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGenevaDetailsExportJob.class); 

    @Autowired 
    protected JobBuilderFactory jobBuilders; 

    @Autowired 
    protected StepBuilderFactory stepBuilders; 

    @Autowired 
    protected DataSource datasource; 

    @Autowired 
    private BillrunTransferStatusMapper billrunTransferStatusMapper; 

    @Autowired 
    protected JdbcTemplate jdbcTemplate; 


    public abstract GenevaDetailsExportJobProperties jobProperties(); 

    @Bean 
    public RouteBuilder routeBuilder(final GenevaDetailsExportJobProperties jobProperties, final Job job) { 
     return new RouteBuilder() { 
      @Override 
      public void configure() throws Exception { 
       from(jobProperties.getConsumer()) 
         .transacted("PROPAGATION_REQUIRED") 
         .routeId(jobProperties.getInputRouteName()) 
         .process(genevaJobLauncher(job)); 
         //.to("ftp://[email protected]?password=secret"); 
      } 
     }; 
    } 

    @Bean 
    public Processor genevaJobLauncher(Job job) { 
     return new GenevaJobLauncher(job); 
    } 

    @Bean 
    @StepScope 
    public GenevaDetailsReader reader() { 
     GenevaDetailsReader reader = new GenevaDetailsReader(jobProperties().getMandatorKey(), 
       jobProperties().getInvoiceType(), jobProperties().getSqlResourcePath()); 
     reader.setSql(""); 
     reader.setDataSource(datasource); 
     reader.setRowMapper(new GenevaDetailsTransactionsRowMapper()); 
     reader.setFetchSize(jobProperties().getFetchSize()); 
     return reader; 
    } 

    @Bean 
    @StepScope 
    public GenevaDetailsItemProcessor processor() { 
     return new GenevaDetailsItemProcessor(); 
    } 

    @Bean 
    @StepScope 
    public ClassifierCompositeItemWriter writer() { 

     List<String> serviceCodes = new ArrayList<>();//billrunTransferStatusMapper.getServiceCodes(jobProperties().getMandatorKey()); 
     Long billingTaskId = billrunTransferStatusMapper.getCurrentTaskId(jobProperties().getMandatorKey()); 
     String countryKey = billrunTransferStatusMapper.getCountryKey(billingTaskId); 
     serviceCodes.add("BTCC"); 
     serviceCodes.add("CCMS"); 

     BackToBackPatternClassifier classifier = new BackToBackPatternClassifier(); 
     classifier.setRouterDelegate(new GenveaDetailsRouterClassifier()); 

     HashMap<String, Object> map = new HashMap<>(); 

     for (String serviceCode : serviceCodes) { 
      map.put(serviceCode, genevaDetailsWriter(serviceCode, countryKey)); 
     } 

     classifier.setMatcherMap(map); 
     ClassifierCompositeItemWriter<GenveaDetailsTransactionDto> writer = new ClassifierCompositeItemWriter<>(); 
     writer.setClassifier(classifier); 
     return writer; 

    } 


    @Bean 
    @StepScope 
    public GenevaDetailsFlatFileItemWriter genevaDetailsWriter(String serviceCode, String countryKey) { 
     GenevaDetailsFlatFileItemWriter writer = new GenevaDetailsFlatFileItemWriter(jobProperties().getDelimiter()); 

     FileNameGeneration fileNameGeneration = new FileNameGeneration(); 

     try { 
      FileSystemResource fileSystemResource = new FileSystemResource(new File(jobProperties().getExportDir(), fileNameGeneration.generateFileName(jdbcTemplate, 
        serviceCode, countryKey))); 
      writer.setResource(fileSystemResource); 
     } catch (SQLException e) { 
      LOGGER.error("Error creating FileSystemResource : " + e.getMessage()); 
     } 
     return writer; 
    } 

    @Bean 
    public Job job() { 
     return jobBuilders.get(jobProperties().getJobName()) 
       .start(setBillRunTransferStatusDetailInprogressStep()) 
       .next(processGenevaDetailsStep()) 
       .next(setBillRunTransferStatusProcessedStep()) 
       .build(); 
    } 

    @Bean 
    public Step setBillRunTransferStatusDetailInprogressStep() { 
     return stepBuilders.get("setBillRunTransferStatusDetailInprogressStep") 
       .tasklet(setBillRunTransferStatusDetailInprogress()) 
       .build(); 
    } 

    @Bean 
    public Tasklet setBillRunTransferStatusDetailInprogress() { 
     return new BillRunTranStatusInprogressJobAssignment(BillRunTransferStatus.SUMMARY.toString(), BillRunTransferStatus.DETAILS_INPROGRESS.toString(), 
       jobProperties().getMandatorKey(), jobProperties().getInvoiceTypeNum(), jobProperties().getReportTypeNum()); 
    } 

    @Bean 
    public Step setBillRunTransferStatusProcessedStep() { 
     return stepBuilders.get("setBillRunTransferStatusProcessedStep") 
       .tasklet(setBillRunTransferStatusProcessed()) 
       .build(); 
    } 

    @Bean 
    public Tasklet setBillRunTransferStatusProcessed() { 
     return new BillRunTranStatusFinishedJobAssignment(BillRunTransferStatus.PROCESSED.toString()); 
    } 

    @Bean 
    public Step processGenevaDetailsStep() { 
     return stepBuilders.get("processGenevaDetailsStep") 
       .<GenveaDetailsTransactionDto, GenevaDetailsResultsDto>chunk(jobProperties().getChunkSize()) 
       .reader(reader()) 
       .processor(processor()) 
       .writer(writer()) 
       .build(); 
    } 

} 

和我的長相作家如:

package com.infonova.btcompute.batch.geneva.steps; 

import com.infonova.btcompute.batch.geneva.dto.GenevaDetailsResultsDto; 
import com.infonova.btcompute.batch.repository.BillrunTransferStatusMapper; 
import com.infonova.btcompute.batch.utils.FileNameGeneration; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.batch.core.StepExecution; 
import org.springframework.batch.core.annotation.BeforeStep; 
import org.springframework.batch.item.*; 
import org.springframework.batch.item.file.FlatFileHeaderCallback; 
import org.springframework.batch.item.file.FlatFileItemWriter; 
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor; 
import org.springframework.batch.item.file.transform.DelimitedLineAggregator; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.core.io.FileSystemResource; 
import org.springframework.jdbc.core.JdbcTemplate; 
import org.springframework.stereotype.Component; 

import java.io.File; 
import java.io.IOException; 
import java.io.Writer; 
import java.sql.SQLException; 
import java.util.Iterator; 
import java.util.List; 

@Component 
public class GenevaDetailsFlatFileItemWriter extends FlatFileItemWriter<GenevaDetailsResultsDto> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(GenevaDetailsFlatFileItemWriter.class); 

    @Autowired 
    protected JdbcTemplate jdbcTemplate; 

    @Autowired 
    private BillrunTransferStatusMapper billrunTransferStatusMapper; 


    private String delimiter; 


    public GenevaDetailsFlatFileItemWriter(String delimiter) { 
     this.delimiter = delimiter; 
     this.setLineAggregator(getLineAggregator()); 
     this.setHeaderCallback(getHeaderCallback()); 
    } 

    private DelimitedLineAggregator<GenevaDetailsResultsDto> getLineAggregator() { 
     DelimitedLineAggregator<GenevaDetailsResultsDto> delLineAgg = new DelimitedLineAggregator<>(); 
     delLineAgg.setDelimiter(delimiter); 

     BeanWrapperFieldExtractor<GenevaDetailsResultsDto> fieldExtractor = new BeanWrapperFieldExtractor<>(); 
     fieldExtractor.setNames(getNames()); 
     delLineAgg.setFieldExtractor(fieldExtractor); 

     return delLineAgg; 
    } 

    private String[] getHeaderNames() { 
     return new String[] {"Record ID", "Service Identifier", "Billing Account Reference", "Cost Description", "Event Cost", 
       "Event Date and Time", "Currency Code", "Charge Category", "Order Identifier", "Net Usage", "UOM", 
       "Quantity", "Service Start Date", "Service End Date"}; 
    } 

    private String[] getNames() { 
     return new String[] {"RECORD_ID", "SERVICE_CODE", "BILLING_ACCOUNT_REFERENCE", "COST_DESCRIPTION", "EVENT_COST", 
       "EVENT_DATE_AND_TIME", "CURRENCY_CODE", "CHARGE_CATEGORY", "ORDER_IDENTIFIER", "NET_USAGE", "UOM", 
       "QUANTITY", "SERVICE_START_DATE", "SERVICE_END_DATE"}; 
    } 



    private FlatFileHeaderCallback getHeaderCallback() 
    { 
     return new FlatFileHeaderCallback() { 
      @Override 
      public void writeHeader(Writer writer) throws IOException { 
       writer.write(String.join(delimiter, getHeaderNames())); 
      } 
     }; 
    } 

// @BeforeStep 
// public void beforeStep(StepExecution stepExecution) { 
//  billingTaskId = (Long) stepExecution.getJobExecution().getExecutionContext().get("billingTaskId"); 
//  FileNameGeneration fileNameGeneration = new FileNameGeneration(); 
// 
//  try { 
//   FileSystemResource fileSystemResource = new FileSystemResource(new File(exportDir, fileNameGeneration.generateFileName(jdbcTemplate, 
//     serviceCode, billrunTransferStatusMapper.getCountryKey(billingTaskId)))); 
//   setResource(fileSystemResource); 
//  } catch (SQLException e) { 
//   LOGGER.error("Error creating FileSystemResource : " + e.getMessage()); 
//  } 
// } 
} 

我已經搜索了網絡並找不到解決此問題的方法。

回答

0

ClassifierCompositeItemWriter沒有實現ItemStream接口,因此您的FlatFileItemWriter的打開方法從不會被調用。

做最簡單的事情是,當你創建你的分類地圖調用open方法:

for (String serviceCode : serviceCodes) { 
    FlatFileItemWriter writer =genevaDetailsWriter(serviceCode, countryKey); 
    writer.open (new ExecutionContext()); 
    map.put(serviceCode, writer); 

    } 
+0

此解決方案僅部分解決問題,因爲寫入程序的執行上下文還應該更新/關閉以允許可重新啓動(更新)和正確的數據刷新(更新/關閉)。 –

1

什麼@Hansjoerg Wingeier寫ClassifierCompositeItemWriter是正確的,但解決問題的正確方法是要註冊作爲流使用AbstractTaskletStepBuilder.stream()授權作者(s)讓SB管理執行上下文生命週期。

+0

我將如何實現AbstractTaskletStepBuilder.stream()我試着用Google搜索它並找不到任何Java示例。你是對的上述解決方案只是部分解決問題,因爲它不正確地關閉編寫器 –

+0

我想你必須在processGenevaDetailsS​​tep函數'stepBuilders.stream()'爲每個流;我從來沒有使用javaconfig,所以我不能幫你很多,但我想我可以將stepBuilder傳遞給writer()函數(只是一個鏈接https://github.com/codecentric/spring-batch-javaconfig/blob/master/的src /主/爪哇/ DE/codecentric /批次/配置/ FlatfileToDbWithParametersJobConfiguration.java) –