数万件、数百万件といった大量のデータを処理するバッチ処理を実装する場面は、業務システムでよくあります。@Scheduledによる定期実行は知っているけど、大量データを効率的に処理する方法がわからない、そんな悩みを持つ方も多いでしょう。
Spring Batchは、こうした大量データ処理に特化したフレームワークです。この記事では、Spring Batchの基本構成から実際の実装方法まで、実例を交えながら解説します。
Spring Batchとは
Spring Batchは、大量データの読み込み・加工・書き込みに最適化されたフレームワークです。@Scheduledが「いつ実行するか」を制御するのに対し、Spring Batchは「どうやって大量データを処理するか」を提供します。
Spring Batchには次の特徴があります。
- chunk処理 によるメモリ効率の良いデータ処理
- chunk単位での トランザクション管理
- リトライ・スキップ などのエラーハンドリング機能が標準搭載
- 処理の再実行制御やメタデータ管理
メモリに全データを載せることなく、数百万件のデータを安全に処理できます。
Spring Batchの4つの主要コンポーネント
Spring Batchは以下のコンポーネントで構成されます。
- Job - バッチ処理全体を表す最上位の概念
- Step - Jobを構成する処理単位(1つのJobに複数のStepを定義可能)
- ItemReader - データソースから1件ずつデータを読み込む
- ItemProcessor - 読み込んだデータを加工・変換する(省略可能)
- ItemWriter - 処理済みデータをまとめて書き込む
基本的な流れは「ItemReaderで読む → ItemProcessorで加工 → ItemWriterで書く」です。このサイクルをchunk単位で繰り返します。
chunk処理の仕組み
chunk処理は、Spring Batchの中核となる仕組みです。指定したchunkサイズ分のデータを読み込んでから、一括で書き込みます。
例えばchunkサイズを100に設定した場合は次のように動作します。
- ItemReaderで100件読み込む
- ItemProcessorで100件を加工
- ItemWriterで100件を一括書き込み
- トランザクションをコミット
1chunk = 1トランザクション という関係になっているため、chunk単位でコミット・ロールバックが行われます。
chunkサイズの目安は100〜1000件程度です。大きすぎるとメモリ不足、小さすぎると性能が落ちるため、データ特性に応じて調整しましょう。
依存関係の追加
まずはSpring Batchの依存関係を追加します。
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
runtimeOnly 'com.h2database:h2'
}
Spring BatchはJobRepositoryという仕組みでバッチ処理のメタデータを管理するため、データソースが必要です。開発時はH2、本番ではPostgreSQLやMySQLなどを使います。
Spring Boot 3.x以降の注意点 として、@EnableBatchProcessingは基本的に 不要 です。Spring Boot 3.x以降は自動設定でBatch機能が有効化されるため、デフォルト設定で問題ない場合は@EnableBatchProcessingを付ける必要はありません。カスタム設定が必要な場合のみ使用します。
なお、Spring Batch 5.0以降、JobBuilderFactory/StepBuilderFactoryは非推奨となり、JobBuilder/StepBuilderを直接使う形に変更されました。この記事では新しい記法を使用します。
CSVファイルからDBへのデータ投入
まずは基本的な例として、CSVファイルを読み込んでDBに投入する処理を実装してみましょう。
対象となるエンティティクラスは次のとおりです。
public class User {
private Long id;
private String name;
private String email;
// getter/setter省略
}
FlatFileItemReaderでCSVを読み込む
@Bean
public FlatFileItemReader<User> csvReader() {
return new FlatFileItemReaderBuilder<User>()
.name("csvReader")
.resource(new ClassPathResource("users.csv"))
.delimited()
.names("id", "name", "email")
.targetType(User.class)
.build();
}
FlatFileItemReaderはCSVやTSVを読み込むためのItemReaderです。names()で列名を指定し、targetType()でマッピング先のクラスを指定すると、フィールド名が一致する場合は自動的にオブジェクトに変換されます。
より柔軟なマッピングが必要な場合は、FieldSetMapperを使ってマッピングロジックを自分で定義できます。
JdbcBatchItemWriterでDBに書き込む
@Bean
public JdbcBatchItemWriter<User> dbWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("INSERT INTO users (id, name, email) VALUES (:id, :name, :email)")
.beanMapped()
.build();
}
JdbcBatchItemWriterは、JDBCのバッチ更新機能を使って複数件を一括でINSERTします。beanMapped()を指定すると、エンティティのフィールド名とSQL名前付きパラメータが一致する必要があります。
JobとStepの定義
@Configuration
public class CsvImportJobConfig {
@Bean
public Job csvImportJob(JobRepository jobRepository, Step csvImportStep) {
return new JobBuilder("csvImportJob", jobRepository)
.start(csvImportStep)
.build();
}
@Bean
public Step csvImportStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
FlatFileItemReader<User> csvReader,
JdbcBatchItemWriter<User> dbWriter) {
return new StepBuilder("csvImportStep", jobRepository)
.<User, User>chunk(100, transactionManager)
.reader(csvReader)
.writer(dbWriter)
.build();
}
}
JobとStepを組み立てます。Spring Boot 3.x以降、JobRepositoryとPlatformTransactionManagerはSpringが自動設定したBeanを引数として受け取ります。chunk(100, transactionManager)で、100件ごとにchunk処理を行い、トランザクション管理を有効化しています。
DBからDBへの大量データ変換処理
次は、あるテーブルから読み込んでビジネスロジックで加工し、別のテーブルに書き込む例です。
public class OrderEntity {
private Long id;
private Long customerId;
private BigDecimal amount;
private String status;
// getter/setter省略
}
public class ProcessedOrder {
private Long orderId;
private BigDecimal finalAmount;
private String status;
// getter/setter省略
}
JdbcCursorItemReaderでDBから読み込む
@Bean
public JdbcCursorItemReader<OrderEntity> orderReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<OrderEntity>()
.name("orderReader")
.dataSource(dataSource)
.sql("SELECT id, customer_id, amount, status FROM orders WHERE status = 'PENDING'")
.rowMapper(new BeanPropertyRowMapper<>(OrderEntity.class))
.build();
}
JdbcCursorItemReaderは、SQLのカーソルを使って1件ずつデータを読み込みます。メモリに全件載せることなく、大量データを順次処理できます。
ItemProcessorでデータを加工
@Component
public class OrderProcessor implements ItemProcessor<OrderEntity, ProcessedOrder> {
@Override
public ProcessedOrder process(OrderEntity order) throws Exception {
ProcessedOrder processed = new ProcessedOrder();
processed.setOrderId(order.getId());
processed.setFinalAmount(order.getAmount().multiply(new BigDecimal("0.9")));
processed.setStatus("PROCESSED");
return processed;
}
}
ItemProcessorで1件ずつビジネスロジックを適用します。ここでnullを返すと、その件はItemWriterに渡されずフィルタリングされます。
エラーハンドリング - skipとretry
データ不正などで一部のレコードが処理できない場合でも、全体を止めずに処理を継続したいことがあります。skipを使うと、特定の例外が発生した件をスキップして次に進めます。
@Bean
public Step resilientStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<User> reader,
ItemWriter<User> writer) {
return new StepBuilder("resilientStep", jobRepository)
.<User, User>chunk(100, transactionManager)
.reader(reader)
.writer(writer)
.faultTolerant()
.skip(ValidationException.class)
.skipLimit(10)
.retry(TransientDataAccessException.class)
.retryLimit(3)
.build();
}
faultTolerant()でエラーハンドリング機能を有効化します。skip()で無視する例外を指定し、skipLimit(10)で最大10件までスキップを許容します。
retry()はネットワークエラーなど一時的な障害に対するリトライ設定です。retryLimit(3)で最大3回までリトライします。skipとretryは組み合わせ可能です。
JobParametersによる実行制御
JobParametersを使うと、実行時にパラメータを渡して処理を柔軟に制御できます。
@Bean
@StepScope
public FlatFileItemReader<User> parameterizedReader(
@Value("#{jobParameters['inputFile']}") String inputFile) {
return new FlatFileItemReaderBuilder<User>()
.name("parameterizedReader")
.resource(new FileSystemResource(inputFile))
.delimited()
.names("id", "name", "email")
.targetType(User.class)
.build();
}
@StepScopeを付けることで、Step実行時にBeanが生成される遅延評価により、JobParametersの値を受け取れます。これにより、実行ごとに異なるファイルを処理できます。
JobParametersは実行の識別にも使われます。同じJobParametersでは同じJobインスタンスとして扱われるため、正常終了したJobは再実行できません。再実行したい場合は、パラメータを変えるか、RunIdIncrementerを使って自動的にパラメータを変える必要があります。
バッチの実行方法
実装したバッチを実行する方法はいくつかあります。
@Scheduledと組み合わせて定期実行
@Component
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job csvImportJob;
public BatchScheduler(JobLauncher jobLauncher, Job csvImportJob) {
this.jobLauncher = jobLauncher;
this.csvImportJob = csvImportJob;
}
@Scheduled(cron = "0 0 2 * * *")
public void runBatch() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(csvImportJob, params);
}
}
@ScheduledとJobLauncherを組み合わせると、定期実行が簡単に実現できます。毎回異なるパラメータ(現在時刻など)を渡すことで、再実行可能にしています。
実装時の注意点
実務で使う際の注意点をいくつか挙げておきます。
chunkサイズは100件から試す - データ特性に応じて調整が必要ですが、まずは100件から始めるのが無難です。
大量データ処理ではログ出力量に注意 - 全件ログを出すとログファイルが肥大化します。1000件ごとなど間引いてログ出力しましょう。
本番ではJobRepositoryに永続化DBを使う - H2は開発用と割り切り、本番ではPostgreSQLなどを使いましょう。初回実行時にはSpring Batchのメタデータテーブルが自動作成されます。
まとめ
Spring Batchを使えば、大量データを安全かつ効率的に処理できます。Job/Step/ItemReader/ItemWriterという4つのコンポーネントを理解し、chunk処理の仕組みを押さえれば、実用的なバッチ処理が実装できます。
トランザクション管理やエラーハンドリングも標準で備わっているため、安心して業務システムに組み込めます。まずは小さなバッチから始めて、徐々に複雑な処理に挑戦してみてください。