KotlinでSpring Batchを作る

概要

Kotlin言語を利用して、簡単なSpring Batchプロジェクトを作成します。

プロジェクト作成

次のようにcurlコマンドを使用して、Spring Bootの初期プロジェクトを作成します。

curl https://start.spring.io/starter.tgz \
-d bootVersion=2.5.5 \
-d dependencies=batch,h2 \
-d baseDir=spring-batch \
-d groupId=com.devkuma \
-d artifactId=spring-batch \
-d packageName=com.devkuma.batch \
-d applicationName=BatchApplication \
-d packaging=jar \
-d language=kotlin \
-d javaVersion=11 \
-d type=gradle-project | tar -xzvf -

上記のコマンドを実行すると、Spring BatchとH2 Databaseが追加されます。

生成されたプロジェクトのファイル構造は次のようになります。

.
├── HELP.md
├── build.gradle.kts
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
    ├── main
    │   ├── kotlin
    │   │   └── com
    │   │       └── devkuma
    │   │           └── batch
    │   │               └── BatchApplication.kt
    │   └── resources
    │       └── application.properties
    └── test
        └── kotlin
            └── com
                └── devkuma
                    └── batch
                        └── BatchApplicationTests.kt

Spring Batch設定

/src/main/kotlin/com/devkuma/batch/BatchApplication.kt

package com.devkuma.batch

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@EnableBatchProcessing
@SpringBootApplication
class BatchApplication

fun main(args: Array<String>) {
    runApplication<BatchApplication>(*args)
}

@EnableBatchProcessingアノテーションを追加して、バッチ機能を有効にします。このアノテーションを追加することで、Spring Batchの機能を使用できるようになります。

ロギングライブラリの追加

/build.gradle.kts

// ... 省略 ...

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-batch")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    runtimeOnly("com.h2database:h2")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.batch:spring-batch-test")

    // https://mvnrepository.com/artifact/io.github.microutils/kotlin-logging
    implementation("io.github.microutils:kotlin-logging:2.0.11")
}

// ... 省略 ...

ファイル内容の依存ライブラリを確認すると、Spring Batch(spring-boot-starter-batch)、H2 Database(com.h2database:h2)、Kotlin関連ライブラリが追加されていることがわかります。

そして、新しくKotlin Logging(kotlin-logging)を追加しました。

Tasklet処理方式

単一Step構成の実装

単一Stepジョブの設定を追加します。

/src/main/kotlin/com/devkuma/batch/config/SingleStepJobConfig.kt

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class SingleStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun singleStepJob(): Job {
        return jobBuilderFactory["singleStepJob"]
            .start(singleStep())
            .build()
    }

    @Bean
    fun singleStep(): Step {
        return stepBuilderFactory["singleStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Single Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["singleStepJob"]
    • simpleJobという名前でBatch Jobを生成します。
    • Job名は別途指定せず、このようにBuilderを通じて指定します。
  • stepBuilderFactory["singleStep"]
    • simpleStep1という名前でBatch Stepを生成します。
    • 上記のJob名と同じように、Builderを通じて名前を指定します。
  • .tasklet { _: StepContribution, _: ChunkContext
    • Stepで実行される機能を明示します。
    • Taskletは、Stepで単独実行されるカスタム機能を宣言するときに使用します。
    • ここではBatchが実行されると、log.info { "Single Step!!" }文によってログが出力されます。

実行結果は次のとおりです。

// ... 省略 ...

2021-10-02 01:51:51.746  INFO 72551 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=singleStepJob]] launched with the following parameters: [{}]
2021-10-02 01:51:51.769  INFO 72551 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [singleStep]
2021-10-02 01:51:51.778  INFO 72551 --- [           main] c.d.batch.config.SingleStepJobConfig     : Single Step!!
2021-10-02 01:51:51.782  INFO 72551 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [singleStep] executed in 13ms
2021-10-02 01:51:51.786  INFO 72551 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=singleStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 28ms

// ... 省略 ...

上記のログを確認すると、Single Step!!が出力されたことを確認できます。

複数Step構成の実装

複数Stepジョブの設定を追加します。

/src/main/kotlin/com/devkuma/batch/config/SingleStepJobConfig.kt

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class MultipleStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun multipleStepJob(): Job {
        return jobBuilderFactory["multipleStepJob"]
            .start(startStep())
            .next(nextStep())
            .next(lastStep())
            .build()
    }

    @Bean
    fun startStep(): Step {
        return stepBuilderFactory["startStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Start Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun nextStep(): Step {
        return stepBuilderFactory["nextStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Next Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun lastStep(): Step {
        return stepBuilderFactory["lastStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Last Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["multipleStepJob"]
    • multipleStepJobという名前でBatch Jobを生成します。
  • stepBuilderFactory["startStep"]stepBuilderFactory["nextStep"]stepBuilderFactory["lastStep"]
    • それぞれstartStepnextSteplastStepという名前でBatch Stepを生成します。

実行結果は次のとおりです。

// ... 省略 ...

2021-10-02 01:54:56.544  INFO 72707 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multipleStepJob]] launched with the following parameters: [{}]
2021-10-02 01:54:56.573  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [startStep]
2021-10-02 01:54:56.585  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Start Step!!
2021-10-02 01:54:56.593  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [startStep] executed in 20ms
2021-10-02 01:54:56.603  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [nextStep]
2021-10-02 01:54:56.607  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Next Step!!
2021-10-02 01:54:56.613  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [nextStep] executed in 10ms
2021-10-02 01:54:56.619  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [lastStep]
2021-10-02 01:54:56.622  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Last Step!!
2021-10-02 01:54:56.627  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [lastStep] executed in 8ms

// ... 省略 ...

上記のログを確認すると、StepごとにStart Step!!Next Step!!Last Step!!が出力されたことを確認できます。

FlowによるStep構成の実装

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.ExitStatus
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class FlowStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun flowStepJob(): Job {
        return jobBuilderFactory["flowStepJob"]

            .start(flowStartStep())
            .on(ExitStatus.COMPLETED.exitCode) // "flowStartStep"의 "ExitStatus"가 "COMPLETED"인 경우
            .to(flowProcessStep()) // "flowProcessStep"을 실행 시킨다
            .on("*") // "flowProcessStep"의 결과와 상관없이
            .to(flowWriteStep()) // "writeStep"을 실행 시킨다.
            .on("*") // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료 시킨다.

            .from(flowStartStep())
            .on(ExitStatus.FAILED.exitCode) // "flowStartStep"의 "ExitStatus"가 "FAILED"일 경우
            .to(flowFailOverStep()) // "flowFailOverStep"을 실행 시킨다.
            .on("*") // "flowFailOverStep"의 결과와 상관없이
            .to(flowWriteStep()) //  "flowWriteStep"을 실행 시킨다.
            .on("*") //  // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료시킨다.

            .from(flowStartStep())
            .on("*") // "flowStartStep"의 "ExitStatus"가 "FAILED", "COMPLETED"가 아닌 모든 경우
            .to(flowWriteStep()) // "flowWriteStep"을 실행시킨다.
            .on("*") // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료시킨다.

            .end()
            .build()
    }

    @Bean
    fun flowStartStep(): Step {
        return stepBuilderFactory["flowStartStep"]
            .tasklet { contribution: StepContribution, _: ChunkContext ->
                log.info("Flow Start Step!")

                val result = "COMPLETED"
                // val result = "FAIL";
                // val result = "UNKNOWN";

                // "Flow"에서 "on"은 "RepeatStatus"가 아닌 "ExitStatus"를 바라본다.
                if (result == "COMPLETED") {
                    contribution.exitStatus = ExitStatus.COMPLETED
                } else if (result == "FAIL") {
                    contribution.exitStatus = ExitStatus.FAILED
                } else if (result == "UNKNOWN") {
                    contribution.exitStatus = ExitStatus.UNKNOWN
                }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowProcessStep(): Step {
        return stepBuilderFactory["flowProcessStep"]
            .tasklet { _: StepContribution?, _: ChunkContext? ->
                log.info("Flow Process Step!")
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowFailOverStep(): Step {
        return stepBuilderFactory.get("flowFailOverStep")
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Flow FailOver Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowWriteStep(): Step {
        return stepBuilderFactory["flowWriteStep"]
            .tasklet { _: StepContribution?, _: ChunkContext? ->
                log.info("Flow Write Step!")
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["flowStepJob"]
    • コメントの内容に従い、flowStartStepExitStatus値によって動作が変わります。
      • ExitStatusCOMPLETEDの場合は、flowProcessStepflowWriteStepを実行します。
      • ExitStatusFAILEDの場合は、flowFailOverStepflowWriteStepを実行します。
      • ExitStatusFAILEDでもCOMPLETEDでもない場合は、flowWriteStepを実行します。

実行結果は次のとおりです。

// ... 省略 ...

:19:50.268  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=flowStepJob]] launched with the following parameters: [{}]
2021-10-02 02:19:50.295  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowStartStep]
2021-10-02 02:19:50.303  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Start Step!
2021-10-02 02:19:50.309  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowStartStep] executed in 14ms
2021-10-02 02:19:50.314  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowProcessStep]
2021-10-02 02:19:50.315  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Process Step!
2021-10-02 02:19:50.317  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowProcessStep] executed in 3ms
2021-10-02 02:19:50.320  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowWriteStep]
2021-10-02 02:19:50.322  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Write Step!
2021-10-02 02:19:50.324  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowWriteStep] executed in 3ms
2021-10-02 02:19:50.327  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=flowStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 47ms
2021-10-02 02:19:50.330  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multipleStepJob]] launched with the following parameters: [{}]

// ... 省略 ...

上記のログを確認すると、flowStartStepcontribution.exitStatuscontribution.exitStatus = ExitStatus.COMPLETEDであるため、flowProcessStepFlow Process Step!が出力されることを確認できます。

flowStartStepcontribution.exitStatusの値を変更しながら、ログを確認してみましょう。

参考