Spring Batchで大量データを安全に処理する方法 - Job、Step、Chunk処理の基本


数万件、数百万件といった大量のデータを処理するバッチ処理を実装する場面は、業務システムでよくあります。@Scheduledによる定期実行は知っているけど、大量データを効率的に処理する方法がわからない、そんな悩みを持つ方も多いでしょう。

Spring Batchは、こうした大量データ処理に特化したフレームワークです。この記事では、Spring Batchの基本構成から実際の実装方法まで、実例を交えながら解説します。

Spring Batchとは

Spring Batchは、大量データの読み込み・加工・書き込みに最適化されたフレームワークです。@Scheduledが「いつ実行するか」を制御するのに対し、Spring Batchは「どうやって大量データを処理するか」を提供します。

Spring Batchには次の特徴があります。

  • chunk処理 によるメモリ効率の良いデータ処理
  • chunk単位での トランザクション管理
  • リトライ・スキップ などのエラーハンドリング機能が標準搭載
  • 処理の再実行制御やメタデータ管理

メモリに全データを載せることなく、数百万件のデータを安全に処理できます。

Spring Batchの4つの主要コンポーネント

Spring Batchは以下のコンポーネントで構成されます。

  • Job - バッチ処理全体を表す最上位の概念
  • Step - Jobを構成する処理単位(1つのJobに複数のStepを定義可能)
  • ItemReader - データソースから1件ずつデータを読み込む
  • ItemProcessor - 読み込んだデータを加工・変換する(省略可能)
  • ItemWriter - 処理済みデータをまとめて書き込む

基本的な流れは「ItemReaderで読む → ItemProcessorで加工 → ItemWriterで書く」です。このサイクルをchunk単位で繰り返します。

chunk処理の仕組み

chunk処理は、Spring Batchの中核となる仕組みです。指定したchunkサイズ分のデータを読み込んでから、一括で書き込みます。

例えばchunkサイズを100に設定した場合は次のように動作します。

  1. ItemReaderで100件読み込む
  2. ItemProcessorで100件を加工
  3. ItemWriterで100件を一括書き込み
  4. トランザクションをコミット

1chunk = 1トランザクション という関係になっているため、chunk単位でコミット・ロールバックが行われます。

chunkサイズの目安は100〜1000件程度です。大きすぎるとメモリ不足、小さすぎると性能が落ちるため、データ特性に応じて調整しましょう。

依存関係の追加

まずはSpring Batchの依存関係を追加します。

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    runtimeOnly 'com.h2database:h2'
}

Spring BatchはJobRepositoryという仕組みでバッチ処理のメタデータを管理するため、データソースが必要です。開発時はH2、本番ではPostgreSQLやMySQLなどを使います。

Spring Boot 3.x以降の注意点 として、@EnableBatchProcessingは基本的に 不要 です。Spring Boot 3.x以降は自動設定でBatch機能が有効化されるため、デフォルト設定で問題ない場合は@EnableBatchProcessingを付ける必要はありません。カスタム設定が必要な場合のみ使用します。

なお、Spring Batch 5.0以降、JobBuilderFactory/StepBuilderFactoryは非推奨となり、JobBuilder/StepBuilderを直接使う形に変更されました。この記事では新しい記法を使用します。

CSVファイルからDBへのデータ投入

まずは基本的な例として、CSVファイルを読み込んでDBに投入する処理を実装してみましょう。

対象となるエンティティクラスは次のとおりです。

public class User {
    private Long id;
    private String name;
    private String email;
    // getter/setter省略
}

FlatFileItemReaderでCSVを読み込む

@Bean
public FlatFileItemReader<User> csvReader() {
    return new FlatFileItemReaderBuilder<User>()
        .name("csvReader")
        .resource(new ClassPathResource("users.csv"))
        .delimited()
        .names("id", "name", "email")
        .targetType(User.class)
        .build();
}

FlatFileItemReaderはCSVやTSVを読み込むためのItemReaderです。names()で列名を指定し、targetType()でマッピング先のクラスを指定すると、フィールド名が一致する場合は自動的にオブジェクトに変換されます。

より柔軟なマッピングが必要な場合は、FieldSetMapperを使ってマッピングロジックを自分で定義できます。

JdbcBatchItemWriterでDBに書き込む

@Bean
public JdbcBatchItemWriter<User> dbWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<User>()
        .dataSource(dataSource)
        .sql("INSERT INTO users (id, name, email) VALUES (:id, :name, :email)")
        .beanMapped()
        .build();
}

JdbcBatchItemWriterは、JDBCのバッチ更新機能を使って複数件を一括でINSERTします。beanMapped()を指定すると、エンティティのフィールド名とSQL名前付きパラメータが一致する必要があります

JobとStepの定義

@Configuration
public class CsvImportJobConfig {

    @Bean
    public Job csvImportJob(JobRepository jobRepository, Step csvImportStep) {
        return new JobBuilder("csvImportJob", jobRepository)
            .start(csvImportStep)
            .build();
    }

    @Bean
    public Step csvImportStep(JobRepository jobRepository,
                              PlatformTransactionManager transactionManager,
                              FlatFileItemReader<User> csvReader,
                              JdbcBatchItemWriter<User> dbWriter) {
        return new StepBuilder("csvImportStep", jobRepository)
            .<User, User>chunk(100, transactionManager)
            .reader(csvReader)
            .writer(dbWriter)
            .build();
    }
}

JobとStepを組み立てます。Spring Boot 3.x以降、JobRepositoryとPlatformTransactionManagerはSpringが自動設定したBeanを引数として受け取りますchunk(100, transactionManager)で、100件ごとにchunk処理を行い、トランザクション管理を有効化しています。

DBからDBへの大量データ変換処理

次は、あるテーブルから読み込んでビジネスロジックで加工し、別のテーブルに書き込む例です。

public class OrderEntity {
    private Long id;
    private Long customerId;
    private BigDecimal amount;
    private String status;
    // getter/setter省略
}

public class ProcessedOrder {
    private Long orderId;
    private BigDecimal finalAmount;
    private String status;
    // getter/setter省略
}

JdbcCursorItemReaderでDBから読み込む

@Bean
public JdbcCursorItemReader<OrderEntity> orderReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<OrderEntity>()
        .name("orderReader")
        .dataSource(dataSource)
        .sql("SELECT id, customer_id, amount, status FROM orders WHERE status = 'PENDING'")
        .rowMapper(new BeanPropertyRowMapper<>(OrderEntity.class))
        .build();
}

JdbcCursorItemReaderは、SQLのカーソルを使って1件ずつデータを読み込みます。メモリに全件載せることなく、大量データを順次処理できます。

ItemProcessorでデータを加工

@Component
public class OrderProcessor implements ItemProcessor<OrderEntity, ProcessedOrder> {

    @Override
    public ProcessedOrder process(OrderEntity order) throws Exception {
        ProcessedOrder processed = new ProcessedOrder();
        processed.setOrderId(order.getId());
        processed.setFinalAmount(order.getAmount().multiply(new BigDecimal("0.9")));
        processed.setStatus("PROCESSED");
        return processed;
    }
}

ItemProcessorで1件ずつビジネスロジックを適用します。ここでnullを返すと、その件はItemWriterに渡されずフィルタリングされます。

エラーハンドリング - skipとretry

データ不正などで一部のレコードが処理できない場合でも、全体を止めずに処理を継続したいことがあります。skipを使うと、特定の例外が発生した件をスキップして次に進めます。

@Bean
public Step resilientStep(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager,
                          ItemReader<User> reader,
                          ItemWriter<User> writer) {
    return new StepBuilder("resilientStep", jobRepository)
        .<User, User>chunk(100, transactionManager)
        .reader(reader)
        .writer(writer)
        .faultTolerant()
        .skip(ValidationException.class)
        .skipLimit(10)
        .retry(TransientDataAccessException.class)
        .retryLimit(3)
        .build();
}

faultTolerant()でエラーハンドリング機能を有効化します。skip()で無視する例外を指定し、skipLimit(10)で最大10件までスキップを許容します。

retry()はネットワークエラーなど一時的な障害に対するリトライ設定です。retryLimit(3)で最大3回までリトライします。skipとretryは組み合わせ可能です。

JobParametersによる実行制御

JobParametersを使うと、実行時にパラメータを渡して処理を柔軟に制御できます。

@Bean
@StepScope
public FlatFileItemReader<User> parameterizedReader(
        @Value("#{jobParameters['inputFile']}") String inputFile) {
    return new FlatFileItemReaderBuilder<User>()
        .name("parameterizedReader")
        .resource(new FileSystemResource(inputFile))
        .delimited()
        .names("id", "name", "email")
        .targetType(User.class)
        .build();
}

@StepScopeを付けることで、Step実行時にBeanが生成される遅延評価により、JobParametersの値を受け取れます。これにより、実行ごとに異なるファイルを処理できます。

JobParametersは実行の識別にも使われます。同じJobParametersでは同じJobインスタンスとして扱われるため、正常終了したJobは再実行できません。再実行したい場合は、パラメータを変えるか、RunIdIncrementerを使って自動的にパラメータを変える必要があります。

バッチの実行方法

実装したバッチを実行する方法はいくつかあります。

@Scheduledと組み合わせて定期実行

@Component
public class BatchScheduler {

    private final JobLauncher jobLauncher;
    private final Job csvImportJob;

    public BatchScheduler(JobLauncher jobLauncher, Job csvImportJob) {
        this.jobLauncher = jobLauncher;
        this.csvImportJob = csvImportJob;
    }

    @Scheduled(cron = "0 0 2 * * *")
    public void runBatch() throws Exception {
        JobParameters params = new JobParametersBuilder()
            .addLong("time", System.currentTimeMillis())
            .toJobParameters();
        
        jobLauncher.run(csvImportJob, params);
    }
}

@ScheduledJobLauncherを組み合わせると、定期実行が簡単に実現できます。毎回異なるパラメータ(現在時刻など)を渡すことで、再実行可能にしています。

実装時の注意点

実務で使う際の注意点をいくつか挙げておきます。

chunkサイズは100件から試す - データ特性に応じて調整が必要ですが、まずは100件から始めるのが無難です。

大量データ処理ではログ出力量に注意 - 全件ログを出すとログファイルが肥大化します。1000件ごとなど間引いてログ出力しましょう。

本番ではJobRepositoryに永続化DBを使う - H2は開発用と割り切り、本番ではPostgreSQLなどを使いましょう。初回実行時にはSpring Batchのメタデータテーブルが自動作成されます。

まとめ

Spring Batchを使えば、大量データを安全かつ効率的に処理できます。Job/Step/ItemReader/ItemWriterという4つのコンポーネントを理解し、chunk処理の仕組みを押さえれば、実用的なバッチ処理が実装できます。

トランザクション管理やエラーハンドリングも標準で備わっているため、安心して業務システムに組み込めます。まずは小さなバッチから始めて、徐々に複雑な処理に挑戦してみてください。