SpringBatchを実案件で活用するための10のプラクティス

この記事は最終更新日から1年以上が経過しています。

今までSpringFrameworkは多く触ってたものの本格的にSpringBatchに触れる機会がなかったのですが、ちゃんと触ったので現時点での自分なりのベストプラクティスをまとめておきます

#実際に商用のコードを書く際には気をつけなければいけない部分だと思いますが、調べてもあまりノウハウがない(terasolunaさんには助けられましたが)ようでしたがみなさんどうしているんでしょう・・。

作るもの「csvをDBにロードするバッチ」

あるあるですね。Taskletでは何でもできちゃうので、Chunk前提です。

  • csvファイル
    • 区切り文字:カンマ
    • 囲み文字:ダブルクォート/なし。カラム内の改行も想定。ダブルクォートはダブルクォート2つ""にしてエスケープ。
    • レコードは改行(CRLF/LF)で区切る
    • 文字コードはUTF-8/Shift-JISを想定
    • ヘッダはあり/なし両方を想定。
  • DBはMySQL想定。ORMapperはMyBatis
  • 文字数・型・有効な日付かなど最低限のバリデーションを実施
  • エラーレコードはキー項目とエラー内容のみファイル出力
  • 全入替、追加更新、Dry-Runの3パタンを想定
  • ユニットテストも実施したい

前提

SpringBatchの基本構成などは説明しません。「簡単なサンプルを書いてみた」系の記事はたくさんあるのでまずはそちらを読まれることを推奨します。

1. Job間、Step間を跨いだパラメータの引き渡し

基本的にはExecutionContextPromotionListenerを使う形が推奨されています。

先行Stepでcontextにパラメータをputして、Listenerに設定したExecutionContextPromotionListenerで引き継ぐパラメータのkeySetを渡すことで次のStepで取得するといった方式です。

FirstTasklet.java
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        ExecutionContext context = chunkContext.getStepContext().getStepExecution().getExecutionContext();
        context.put("param1", "foo");
        context.put("param2", "bar");

        // 処理完了とする
        return RepeatStatus.FINISHED;
    }

    @Bean
    public ExecutionContextPromotionListener promotionListener(){
        String[] keySet = {"param1", "param2"};
        ExecutionContextPromotionListener promotionListener = new ExecutionContextPromotionListener();
        promotionListener.setKeys(keySet);
        return promotionListener;
    }
JobConfig.java
    @Bean
    @UploadCsvJob
    public Step firstStep(){
        return stepBuilderFactory.get("firststep")
                .tasklet(firstTasklet)
                .listener(promotionListener)
                .build();
    }

次のStepで取得する場合はStepScopeにして@Valueを使います。Beanはアプリ起動時にSingletonで生成されているのでJobごと、Stepごとのパラメータの受け渡しを行う際は、BeanをJobScope、StepScopeにしておく必要があります。

SecondChunk.java
    @Bean
    @StepScope
    public FlatFileItemReader<SampleRecord> reader(
            @Value("#{jobExecutionContext['param1']}") String param1,
            @Value("#{jobExecutionContext['param2']}") String param2
    ) {
        // ...
    }

補足

一旦はこれにしたものの、Step間の結合度が高くなったため、必要なパラメータをJob起動前に生成してJobParameterとして渡す形で対応しました。ただし、必ずしもJobParameterが正解というわけではありませんので設計方針にあったものを選ぶと良いかと思います。

(注意) JobParameterなどSpringBatchの連携パラメータは、実行時の値をDBに残す構成を取っており、文字数に制限があります。制約を理解して適度に活用するとよいと思います。

参考:
https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch08_FlowControll.html#Ch08_FCh08_FlowControll_HowToUse_PassingDataToFutureSteps

Job起動時にJobParameterとして設定したパラメータは、JobScopeで同様にデータが取れます。

JobLauncherService.java
        JobParametersBuilder builder = new JobParametersBuilder()
                .addString("param1", "foo")
                .addString("param2", "bar");
        JobParameters jobParameters = builder.toJobParameters();

        jobLauncher.run(sampleJob, jobParameters);
SecondChunk.java
    @Bean
    @JobScope
    public FlatFileItemReader<SampleRecord> reader(
            @Value("#{jobParameters['param1']}") String param1,
            @Value("#{jobParameters['param2']}") String param2
    ) {
        // ...
    }

2. 区切り文字や改行を含むcsvを読み込む

SpringBatchでファイルを読むと言えばまず選択肢に上がるのはFlatFileReaderかと思います。
DelimitedLineTokenizerを使ってこのクラスでcsvを読み込むサンプルもググるといくつか出て来ますが、実案件では改行や区切り文字を含む文字列にも対応しなければいけないこともあります。

そういった場合はcsv読み込みするカスタムReaderを作りましょう。csv読み込み自体はunivocity-parserというライブラリを使っていますが、使い慣れたもので構いません。

build.gradle
    compile("com.univocity:univocity-parsers:2.7.1")

文字コード、改行コード、ヘッダの有無などが設定可能にしています

CsvFileItemReader.java
/**
 * SpringBatchでは対応できてないcsv形式に対応したReader(改行含む文字列など)
 *
 * @param <T>
 */
@Slf4j
public class CsvFileItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements ResourceAwareItemReaderItemStream<T>, InitializingBean {

    // default encoding for input files
    public static final Charset DEFAULT_CHARSET = Charset.defaultCharset();

    private Resource resource;

    private boolean noInput = false;

    private int lineCount = 0;

    private Charset charset = DEFAULT_CHARSET;

    private int linesToSkip = 0;

    private boolean strict = true;

    private String lineSeparator = "\r\n";

    private char delimiter = ',';

    private char quote = '"';

    private String[] headers;

    private CsvParser csvParser;

    private FieldSetMapper<T> fieldSetMapper;

    private int numberOfKeyValue = 1;

    public CsvFileItemReader() {
        setName(ClassUtils.getShortName(CsvFileItemReader.class));
    }

    /**
     * 読み込み対象のエンコーディングを設定します。デフォルトは {@link #DEFAULT_CHARSET}.
     *
     * @param charset 文字コード
     */
    public void setCharset(Charset charset) {
        this.charset = charset;
    }

    /**
     * 最初に読み込みをスキップする行数を設定します
     *
     * @param linesToSkip the number of lines to skip
     */
    public void setLinesToSkip(int linesToSkip) {
        this.linesToSkip = linesToSkip;
    }

    /**
     * strictModeを設定します
     *
     * @param strict <code>true</code> by default
     */
    public void setStrict(boolean strict) {
        this.strict = strict;
    }

    /**
     * 1行の区切りとなる文字をセットします
     *
     * @param lineSeparator 区切り文字(CRLFの場合は\r\n, LFの場合は\n)
     */
    public void setLineSeparator(String lineSeparator){
        this.lineSeparator = lineSeparator;
    }

    /**
     * カラムの区切り文字をセットします
     *
     * @param delimiter 区切り文字
     */
    public void setDelimiter(char delimiter){
        this.delimiter = delimiter;
    }

    /**
     * カラムの囲み文字をセットします
     *
     * @param quote 囲み文字
     */
    public void setQuote(char quote){
        this.quote = quote;
    }

    /**
     * フィールドのヘッダ情報(Beanのフィールド名で表記)をセットします
     *
     * @param headers フィールドのヘッダ情報(Beanのフィールド名で表記)
     */
    public void setHeaders(String[] headers){
        this.headers = headers;
    }

    /**
     * フィールドへのMapperをセットします
     *
     * @param fieldSetMapper フィールドへ設定するためのMapper
     */
    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper){
        this.fieldSetMapper = fieldSetMapper;
    }

    /**
     * キー項目となる項目番号の個数をセットします(先頭からここで指定された個数の項目をキーとして結果ファイルに出力します)
     *
     * @param numberOfKeyValue キー項目の個数
     */
    public void setNumberOfKeyValue(int numberOfKeyValue){
        this.numberOfKeyValue = numberOfKeyValue;
    }


    @Override
    public void setResource(Resource resource) {
        this.resource = resource;
    }

    @Override
    protected T doRead() throws Exception {
        if(noInput){
            return null;
        }

        String[] line = readLine();

        if(line == null){
            return null;
        }

        // カラム数不一致
        if(line.length != headers.length){
            // たとえ項目が足りなかったとしてもキー項目のカンマ数は一致させる
            String keyValue = IntStream.range(0, numberOfKeyValue)
                    .mapToObj(i -> (line.length > i) ? line[i] : "")
                    .collect(Collectors.joining(","));
            throw new BatchValidationException( keyValue, "項目数不一致(" + String.valueOf(line.length) + ")");
        }

        FieldSet fieldSet = new DefaultFieldSet(line, headers);
        return fieldSetMapper.mapFieldSet(fieldSet);
    }

    @Override
    protected void doOpen() throws Exception {
        Assert.notNull(resource, "Input resource must be set");

        noInput = true;
        if(!resource.exists()){
            if(strict) {
                throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
            }
            log.warn("Input resource does not exist " + resource.getDescription());
            return;
        }

        if(!resource.isReadable()){
            if(strict){
                throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode): " + resource);
            }
            log.warn("Input resource is not readable " + resource.getDescription());
        }

        csvParser = new CsvParser(settings());
        csvParser.beginParsing(new InputStreamReader(resource.getInputStream(), charset));
        for (int i = 0; i < linesToSkip; i++) {
            readLine();
        }

        noInput = false;
    }

    private CsvParserSettings settings(){
        CsvParserSettings settings = new CsvParserSettings();
        settings.getFormat().setLineSeparator(lineSeparator);
        settings.getFormat().setDelimiter(delimiter);
        settings.getFormat().setQuote(quote);
        settings.setEmptyValue("");
        return settings;
    }

    private String[] readLine(){
        if(csvParser == null){
            throw new ReaderNotOpenException("Parser must be open before it can be read");
        }

        String[] line = csvParser.parseNext();
        if( line == null ){
            return null;
        }
        lineCount++;

        return line;
    }


    @Override
    protected void doClose() throws Exception {
        lineCount = 0;
        if( csvParser != null){
            csvParser.stopParsing();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(headers, "headers is required");
        Assert.notNull(fieldSetMapper, "FieldSetMapper is required");
    }
}

JobConfigはこのような記述となります。

  • 読み込むResourceはInputに応じて変わることが多いため、StepScopeで実行
  • ユニットテストでローカルファイルに差し替えやすいように外出し
  • csvファイルの中身をSampleRecordというオブジェクトにそのままマッピング
SampleJobConfig.java
    @Bean
    @StepScope
    public Resource sampleCsvResource(@Value("#{jobParameters['filePath']}") String filePath) {
        return new FileSystemResource(filePath);
    }


    @Bean
    public CsvFileItemReader<SampleRecord> sampleReader() {
        CsvFileItemReader<SampleRecord> reader = new CsvFileItemReader<>();
        reader.setCharset(StandardCharsets.UTF_8);
        reader.setLineSeparator(Constant.LineSeparator.LF);
        reader.setStrict(true);
        reader.setResource(sampleCsvResource);
        reader.setLinesToSkip(1);
        reader.setHeaders(SampleRecord.getFields());
        reader.setFieldSetMapper(new BeanWrapperFieldSetMapper<SampleRecord>() {{
            setTargetType(SampleRecord.class);
        }});
        reader.setNumberOfKeyValue(1);
        return reader;
    }

SampleRecord自体はこのような構成になっています。

  • csvのカラム順に変数を定義
  • Writerで処理することを考慮しDBに登録する項目はすべてフィールド変数で持つように
    • ただし、getFieldsでcsvに含まれる項目のみにしぼって返却できるように工夫しています。
  • 日付などの項目も一旦Stringに入れて、日付の型変換を行う
    • MyBatisを使っており、#{date}で参照できるようにするためにgetDateでLocalDateを返すようにしている
  • バリデーション用のアノテーションが付いていますが、別途説明
  • keyHeaderやkeyValueはエラーファイルの出力用。別途説明
SampleRecord.java
/**
 * csvインポート用のサンプルレコード
 */
@Data
@NoArgsConstructor
public class SampleRecord {

    @NotBlank
    @Length(max = 10)
    private String key;

    @NotBlank
    @Length(max = 20)
    private String name;

    @NotBlank
    @EnableDate // カスタムバリデーション
    private String dateString; // YYYYMMddから日付変換

    private String fixed;  // csvには含まれないがDBに格納する固定パラメータ

    public static String[] getFields() {
        // csvフォーマット用のフィールドリストを取得する
        return Arrays.stream(SampleRecord.class.getDeclaredFields())
                .map(f -> f.getName())
                .filter(s -> !Lists.newArrayList("fixed").contains(s))
                .toArray(i -> new String[i]);
    }

    public static int getFieldIndex(String value) {
        return Arrays.asList(getFields()).indexOf(value) + 1;
    }

    public static String getKeyHeader() {
        return "キー項目";
    }

    public String getKeyValue() {
        return this.key;
    }

    public LocalDate getDate() {
                if (Strings.isNullOrEmpty(dateString)) {
            return null;
        }
        return LocalDate.parse(dateString, DateTimeFormatter.ofPattern("uuuuMMdd"));
    }
}

3. 入替・追記などの更新方法を切り替える

UploadModeというパラメータで入れ替え・追記を指定する形とし、WriterをJobScopeにしてJobParameterによって挙動を切り替えるようにしています。
ORMapperでMyBatisを使っている場合はMapperのInterfaceを指定するだけで使えるWriterが用意されていますので、insert用とupsert用の切り替えを実施します(今回は省略していますが、「入れ替え」の場合はこの前処理Stepでデータを消しておいてください。)

SampleJobConfig.java
    @Bean
    @JobScope
    public ItemWriter<SampleRecord> sampleWriter(@Value("#{jobParameters['uploadModeString']}") String uploadModeString) {

        UploadMode uploadMode = UploadMode.convert(uploadModeString);

        switch (uploadMode) {
            case 入れ替え:
                MyBatisBatchItemWriter<SampleRecord> insertWriter = new MyBatisBatchItemWriter<>();
                insertWriter.setStatementId("com.nyasba.repository.mapper.SampleMapper.batchInsert");
                insertWriter.setSqlSessionFactory(sqlSessionFactory);
                return insertWriter;
            case 更新:
                MyBatisBatchItemWriter<SampleRecord> upsertWriter = new MyBatisBatchItemWriter<>();
                upsertWriter.setStatementId("com.nyasba.repository.mapper.SampleMapper.batchUpsert");
                upsertWriter.setSqlSessionFactory(sqlSessionFactory);
                return upsertWriter;
            default:
                return new DryRunItemWriter<>();
        }
    }

入れ替え・更新のどちらにも当てはまらない場合はDryRunWriterが実行される(=何もしない)ようにしています。
次のプラクティスで「バリデーション」を紹介しますが、DryRunWriterはバリデーションだけ実施して実際に登録しない(エラーが発生しないか試してみる)、という現実の業務でもよくある使い方に応用が効いてくるものとなっています

DryRunItemWriter.java
public class DryRunItemWriter<T> implements ItemWriter<T> {

    @Override
    public void write(List<? extends T> items) throws Exception {
        // do nothing
    }
}

4. バリデーションを実装する

これはProcessorの中でチェックします。SampleRecordの各フィールドに設定されたAnnotationを元にValidationチェックして、エラーがあればBatchValidationExceptionを発生させて処理をSkipする構成にしています。

SampleJobConfig.java
    @Bean
    public ItemProcessor<SampleRecord, SampleRecord> sampleProcessor() {
        return new ItemProcessor<SampleRecord, SampleRecord>() {

            ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
            Validator validator = validatorFactory.getValidator();

            @Override
            public SampleRecord process(SampleRecord record) {

                Set<ConstraintViolation<SampleRecord>> constraintViolationSet = validator.validate(record);

                if (constraintViolationSet.isEmpty()) {
                    record.setFixed('固定パラメータ');
                    return record;
                }

                // Validationエラー時
                String errorMessage = constraintViolationSet.stream()
                    .sorted(Comparator.comparingInt(v -> SampleRecord.getFieldIndex(v.getPropertyPath().toString())))
                    .map(v -> SampleRecord.getFieldIndex(v.getPropertyPath().toString()) + ":" + v.getMessage())
                    .collect(Collectors.joining("|"));
                throw new BatchValidationException(record.getKeyValue(), errorMessage);
            }
        };
    }

BatchValidationExceptionの例外は何件起きても処理は中断しないようにするためにPolicyも作っています。
このあたりは要件に応じてカスタマイズすると良いかと思います。

CsvJobSkipPolicy.java
public class CsvJobSkipPolicy implements SkipPolicy {

    @Override
    public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {

        // ValidationエラーをSkip (カラム数の不一致も含む)
        return t instanceof BatchValidationException;
    }
}

上記PolicyはStepを組み立てる際にFaultTolerantStepBuilder()を使って、異常時の挙動を設定する形で有効化されます。簡単。

SampleJobConfig.java
    /**
     * [STEP] ファイル取り込み
     */
    private Step csvImportStep() {
        return stepBuilderFactory.get(STEP_LOAD)
            .<T, T>chunk(100)
            .faultTolerant()   // ←★★★
            .skipPolicy(new CsvJobSkipPolicy())
            .listener(getCsvRecordSkipListener())
            .reader(getCsvFileItemReader())
            .processor(getItemProcessor())
            .writer(getItemWriter())
            .listener(stepExecutionListener)
            .build();
    }

5. csvのカラム数不正

Processorで検出できないバリデーションの代表格が「カラム数不正」です。
これはReaderでオブジェクトにマッピングする際に問題が発生しますので、先ほど紹介したCsvFileItemReaderの中でヘッダーのカラム数とレコードのカラム数のチェックを実施し、BatchValidationExceptionを発生させています。

CsvFileItemReader.java
    @Override
    protected T doRead() throws Exception {
        if(noInput){
            return null;
        }

        String[] line = readLine();

        if(line == null){
            return null;
        }

        // カラム数不一致
        if(line.length != headers.length){
            // たとえ項目が足りなかったとしてもキー項目のカンマ数は一致させる
            String keyValue = IntStream.range(0, numberOfKeyValue)
                    .mapToObj(i -> (line.length > i) ? line[i] : "")
                    .collect(Collectors.joining(","));
            throw new BatchValidationException( keyValue, "項目数不一致(" + String.valueOf(line.length) + ")");
        }

        FieldSet fieldSet = new DefaultFieldSet(line, headers);
        return fieldSetMapper.mapFieldSet(fieldSet);
    }

カラム数不正は検知すること自体はそこまで難しくはないのですが、オブジェクトへのマッピングが実施できていないため、エラーレコードの出力が複雑になり、どうすればいいのか悩みました。その結果を次のプラクティスで紹介します。

6. エラーレコードをファイル出力する

エラーレコードはキー項目とエラー理由(複数あればパイプでつなぐ)としています。
行番号にすることも考えましたが、INPUTファイルとの突き合わせをしなければいけなくなるため、キー項目を出力することにしました。

まず、エラーレコードはファイルで出力するためWriterを準備します。
シンプルな仕様ですので、FlatFileItemWriterを使い、エクセルで読み込めるようなフォーマットにしています。

BatchValidationExceptionというカスタム例外を元に出力するような構成にしています

SampleJobConfig.java
    @Bean
    public FlatFileItemWriter<BatchValidationException> sampleErrorWriter() {
        FlatFileItemWriter<BatchValidationException> errorWriter = new FlatFileItemWriter<>();
        errorWriter.setEncoding(Constant.AppCharset.CP932.name());
        errorWriter.setLineSeparator(Constant.LineSeparator.CRLF);
        errorWriter.setShouldDeleteIfEmpty(true);
        errorWriter.setLineAggregator(e -> e.getKeyValue() + "," + e.getMessage());
        errorWriter.setResource(sampleErrorResource);
        errorWriter.setHeaderCallback(writer -> writer.write(SampleRecord.getKeyHeader() + ",エラー内容"));
        return errorWriter;
    }

例外は主にReader(カラム数不正)、Processor(各種バリデーション)で発生し、BatchValidationExceptionが投げられる構成にしていますので、それをSkipListenerで受けてerrorWriterで書き出すようにしました。

CsvRecordSkipListener.java
/**
 * ChunkのSkipListener
 */
@Slf4j
@Component
public class CsvRecordSkipListener<T> implements SkipListener<T, T> {

    @Autowired
    private FlatFileItemWriter<BatchValidationException> errorWriter;

    @Override
    public void onSkipInRead(Throwable t) {
        if (t instanceof BatchValidationException) {
            outputErrorLogBatchValidationException((BatchValidationException) t);
            return;
        }
        log.error("UploadCsvError: ** Read ** {} {}", t.getClass().toString(), t.getMessage());
    }

    @Override
    public void onSkipInWrite(T item, Throwable t) {
        log.error("UploadCsvError: ** Write ** {} {}", t.getClass().toString(), t.getMessage());
    }

    @Override
    public void onSkipInProcess(T item, Throwable t) {
        if (t instanceof BatchValidationException) {
            outputErrorLogBatchValidationException((BatchValidationException) t);
            return;
        }
        log.error("UploadCsvError: ** Process ** {} {}", t.getClass().toString(), t.getMessage());
    }

    private void outputErrorLogBatchValidationException(BatchValidationException e) {
        log.info("{},{}", e.getKeyValue(), e.getMessage());
        try {
            errorWriter.write(Arrays.asList(e));
        } catch (Exception e1) {
            log.error("UploadCsvError: {} {}", t.getClass().toString(), t.getMessage());
        }
    }
}

補足: GoogleStorageResourceを使う場合

GoogleCloudStorageを使う場合、ResourceにはGoogleStorageResourceを指定することになるのですが、標準のFlatFileItemWriterの中でgetFileメソッドが呼ばれてUnsupportedOperationExceptionで落ちます。クラウドのオブジェクトストレージだといろいろ難しいところがあるのかもしれませんが、これは衝撃でした(ユニットテストも書いてましたが、Resourceを置き換えていたので気づけなかった)

結局個別にWriterクラスを自作して解決する手間に。。

7. 処理件数を取得する

処理件数はStepExecutionの中に入りますので、Stepの処理終了時に実行されるStepListenerを作成して、そこで件数を取得します。使い方は要件に従ってご自由に。

UploadCsvStepExecutionListener.java
@Slf4j
public class UploadCsvStepExecutionListener implements StepExecutionListener {

    @Autowired
    private FlatFileItemWriter<BatchValidationException> errorWriter;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        errorWriter.open(stepExecution.getExecutionContext());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {

        // 処理件数
        // stepExecution.getReadCount() + stepExecution.getReadSkipCount(),

        // 成功件数(=書き込み件数)
        // stepExecution.getWriteCount(),

        // 失敗件数(=読み取りSkip+変換Skip+書き込みSkip)
        // stepExecution.getReadSkipCount() + stepExecution.getProcessSkipCount() + stepExecution.getWriteSkipCount(),

        errorWriter.close();

        return ExitStatus.COMPLETED;
    }
}

8. insert時のPrimaryKey重複

バリデーションエラー、カラム数不正は対処しましたが、データ依存で発生するエラーの中で対処ができていないのがWriterで発生するようなエラーです。代表的なものはinsert時のPrimaryKey重複です。

Writerのエラーということで、PrimaryKey重複の例外をSkipPolicyに設定し、6で紹介したCsvRecordSkipListenerの中のonSkipInWriteでエラー出力すればよいかと考えました。 → 結果、失敗。

Writerはある程度の件数まとめて処理する形をとっており、A,B,C,Dという4件のinsertを実施する際には指定したChunkSize単位で以下のような挙動となることが原因で、エラーレコードが複数登録されてしまう事態に陥りました。

ケース 処理順
正常 A -> B -> C -> D
Cでエラーが発生した場合   A -> B -> C × -> (rollback) -> A -> B    

エラーが発生するまでは気づいていなかったのですが、エラーが発生する箇所まではなんとか終わらせるというすごくありがたい機能になっているようです。

とはいうもののWriterでエラーを拾っていては今までのProcessorのエラー出力もうまくいきません。
そこで、どうしたかというとSkipListener内に登録したKeyを持つSetを作成して、重複しているデータはSkipListenerのエラー出力時に除外する形で対処しました。

他にもエラーはDBに書き込む(重複があれば何もしない)などいろいろ方法はあると思いますが、バッチ単体でなんとかしたかったのでこの方法をとりました。とはいえ、この部分はあまり良い作りではない気はしています・・

9. アプリの実行ユーザ、バッチ開始日時などを管理する

業務でバッチ実装するケースでは、実行ユーザとかバッチの処理開始のシステム日時とかを管理しているものが必要になるのではないでしょうか。そのようなケースでは、JobScopeで実行ユーザIDをもらってBeanを作るという方法がとれます。

AppContextProviderの中でシステム日付を固定化しておけばバッチ全体で同一の日付を管理することも可能です。

SampleJobConfig.java
    @Bean
    @JobScope
    public AppContextProvider appContextProvider(@Value("#{jobParameters['userId']}") String userId) {
        User user = userRepository.findById(userId);
        if (user == null) {
            throw new BatchException(ErrorMessage.E001_実行ユーザが存在しない);
        }
        return new BatchAppContextProvider(user);
    }

10. 複数Jobを実装した時にBeanのInjectionエラーがでないようにする

これは各種「作ってみました」系のサイトでは全く考慮されていない内容で非常に困りました。Aというcsvの読み込みをしようと思ったらBというcsvのReaderが使われるということにはまりました。。
実際にSpringBatchで実装している場合、必ず影響してくると思うのですがみなさんどうされているのでしょう・・

今回はQualifierの仕組みを使ったカスタムアノテーションで乗り切ろうとしました。

SampleCsvJob.java
@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Qualifier
public @interface SampleCsvJob {

    String value() default "";

}

これをBean登録する方のメソッドに追加。

SampleJobConfig.java
    @Bean
    @SampleCsvJob
    public CsvFileItemReader<SampleRecord> sampleReader() {
        CsvFileItemReader<SampleRecord> reader = new CsvFileItemReader<>();
        reader.setCharset(StandardCharsets.UTF_8);
        reader.setLineSeparator(Constant.LineSeparator.LF);
        reader.setStrict(true);
        reader.setResource(sampleCsvResource);
        reader.setLinesToSkip(1);
        reader.setHeaders(SampleRecord.getFields());
        reader.setFieldSetMapper(new BeanWrapperFieldSetMapper<SampleRecord>() {{
            setTargetType(SampleRecord.class);
        }});
        reader.setNumberOfKeyValue(1);
        return reader;
    }

さらにDIされるほうにも合わせて追加することでBeanを区別できると考えました。

SampleJobConfig.java
    @Autowired
    @SampleCsvJob
    private final CsvFileItemReader<SampleRecord> sampleReader;

ただ、実際はこれでは解決せず。(Qualifierで区別できるはずなんですが理由はわからず・・)
どうやらメソッド名(すべてreaderにしていた)を sampleReaderなど別名にすることでバグが解消したので現在は変数名とアノテーションの組み合わせで区別する構成としています。

慣れるまでに一番ハマったのはここです。あー、つらい。

まとめ

実案件で使えるプラクティスを10個紹介しました。
あまり紹介されていない情報なのかなと感じているのでみなさまのお役に立てれば幸いです。

おまけ:ユニットテストを書く

これらを実践すると、ある程度実案件でも使えるような仕組みになってきたのではないでしょうか。

とはいっても、実装しているものとしてはやはりテストは必要ですよね。
最後にユニットテストのJUnit5で実装したサンプルも紹介しておきます。Chunk(Step)の部分を実行して、DBへの登録結果とエラーファイルのassertを実行しています。

SampleJobTest.java
@ExtendWith(SpringExtension.class)
@MybatisTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE) 
@Transactional(propagation = Propagation.NOT_SUPPORTED)
@DisplayName("Sample csvインポートテスト ")
public class SampleCsvLoadChunkTest {

    static String ERROR_ACTUAL_FILE = "src/test/resources/uploadcsv/app/sample-error.csv";
    static String ERROR_EXPECTED_FILE = "src/test/resources/uploadcsv/app/sample-error-expected.csv";

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private Resource sampleCsvErrorResource;

    @Sql(statements = {
        "delete from SAMPLE",
    })
    @DisplayName("入れ替え")
    @Test
    public void test() throws Exception {

        if (sampleCsvErrorResource.exists() && sampleCsvErrorResource.getFile().delete()) {
            System.out.println("delete old error file");
        }

        // when
        JobParameters jobParameters = new JobParametersBuilder()
            .addString("uploadModeString", "deleteinsert")
            .toJobParameters();

        JobExecution actual = jobLauncherTestUtils.launchStep("sampleLoad", jobParameters);

        // then
        //   正常終了
        Assertions.assertEquals(ExitStatus.COMPLETED, actual.getExitStatus());

        //   DBの中身をチェック
        //    (略

        //   エラーリストの中身
        Assertions.assertEquals(
            FileUtils.readFileToString(new File(ERROR_EXPECTED_FILE), Constant.AppCharset.CP932),
            FileUtils.readFileToString(sampleCsvErrorResource.getFile(), Constant.AppCharset.CP932)
        );
    }

    @Configuration
    @EnableBatchProcessing
    @ComponentScan(basePackageClasses = {SampleCsvJob.class, __BatchUtil.class})
    @MapperScan(basePackageClasses = __AppMapper.class)
    static class LocalTestContext {

        @Bean
        public static BatchAppContextProvider appContextProvider() {
            return new BatchAppContextProvider("test");
        }

        @Bean
        public static JobLauncherTestUtils jobLauncherTestUtils() {
            return new JobLauncherTestUtils() {
                @Override
                @Autowired
                public void setJob(@SampleCsvJob Job sampleCsvJob) {
                    super.setJob(sampleCsvJob);
                }
            };
        }

        @Bean
        public static Resource sampleCsvResource() {
            return new ClassPathResource("/uploadcsv/app/sample.csv");
        }

        @Bean
        public static WritableResource sampleCsvErrorResource() {
            return new FileSystemResource(ERROR_ACTUAL_FILE);
        }
    }
}
nyasba
IT Architect / VP of Engineering @GxP
ユーザー登録して、Qiitaをもっと便利に使ってみませんか。
  1. あなたにマッチした記事をお届けします
    ユーザーやタグをフォローすることで、あなたが興味を持つ技術分野の情報をまとめてキャッチアップできます
  2. 便利な情報をあとで効率的に読み返せます
    気に入った記事を「ストック」することで、あとからすぐに検索できます
コメント
この記事にコメントはありません。
あなたもコメントしてみませんか :)
すでにアカウントを持っている方は
ユーザーは見つかりませんでした