Creating Spring Batch with Kotlin

Overview

This article creates a simple Spring Batch project with Kotlin.

Create the Project

Create a Spring Boot starter project with the following curl command.

curl https://start.spring.io/starter.tgz \
-d bootVersion=2.5.5 \
-d dependencies=batch,h2 \
-d baseDir=spring-batch \
-d groupId=com.devkuma \
-d artifactId=spring-batch \
-d packageName=com.devkuma.batch \
-d applicationName=BatchApplication \
-d packaging=jar \
-d language=kotlin \
-d javaVersion=11 \
-d type=gradle-project | tar -xzvf -

Running the command above adds Spring Batch and H2 Database.

The generated project has the following file structure.

.
├── HELP.md
├── build.gradle.kts
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
    ├── main
    │   ├── kotlin
    │   │   └── com
    │   │       └── devkuma
    │   │           └── batch
    │   │               └── BatchApplication.kt
    │   └── resources
    │       └── application.properties
    └── test
        └── kotlin
            └── com
                └── devkuma
                    └── batch
                        └── BatchApplicationTests.kt

Spring Batch Configuration

/src/main/kotlin/com/devkuma/batch/BatchApplication.kt

package com.devkuma.batch

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@EnableBatchProcessing
@SpringBootApplication
class BatchApplication

fun main(args: Array<String>) {
    runApplication<BatchApplication>(*args)
}

Add the @EnableBatchProcessing annotation to enable batch features. By adding this annotation, Spring Batch features become available.

Add a Logging Library

/build.gradle.kts

// ... omitted ...

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-batch")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    runtimeOnly("com.h2database:h2")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.batch:spring-batch-test")

    // https://mvnrepository.com/artifact/io.github.microutils/kotlin-logging
    implementation("io.github.microutils:kotlin-logging:2.0.11")
}

// ... omitted ...

If you check the dependency libraries in the file, you can see that Spring Batch (spring-boot-starter-batch), H2 Database (com.h2database:h2), and Kotlin-related libraries have been added.

Also, Kotlin Logging (kotlin-logging) was newly added.

Tasklet Processing Method

Implement a Single Step Configuration

Add the configuration for a single-step job.

/src/main/kotlin/com/devkuma/batch/config/SingleStepJobConfig.kt

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class SingleStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun singleStepJob(): Job {
        return jobBuilderFactory["singleStepJob"]
            .start(singleStep())
            .build()
    }

    @Bean
    fun singleStep(): Step {
        return stepBuilderFactory["singleStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Single Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["singleStepJob"]
    • Creates a batch job with the name simpleJob.
    • The job name is not specified separately; it is specified through the Builder like this.
  • stepBuilderFactory["singleStep"]
    • Creates a batch step with the name simpleStep1.
    • As with the job name above, the name is specified through the Builder.
  • .tasklet { _: StepContribution, _: ChunkContext
    • Specifies the features to be performed in the step.
    • A tasklet is used to declare custom features that are performed once in a step.
    • Here, when the batch runs, a log is printed by the log.info { "Single Step!!" } statement.

The execution result is as follows.

// ... omitted ...

2021-10-02 01:51:51.746  INFO 72551 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=singleStepJob]] launched with the following parameters: [{}]
2021-10-02 01:51:51.769  INFO 72551 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [singleStep]
2021-10-02 01:51:51.778  INFO 72551 --- [           main] c.d.batch.config.SingleStepJobConfig     : Single Step!!
2021-10-02 01:51:51.782  INFO 72551 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [singleStep] executed in 13ms
2021-10-02 01:51:51.786  INFO 72551 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=singleStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 28ms

// ... omitted ...

If you check the log above, you can confirm that Single Step!! was printed.

Implement a Multi-Step Configuration

Add the configuration for a multi-step job.

/src/main/kotlin/com/devkuma/batch/config/SingleStepJobConfig.kt

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class MultipleStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun multipleStepJob(): Job {
        return jobBuilderFactory["multipleStepJob"]
            .start(startStep())
            .next(nextStep())
            .next(lastStep())
            .build()
    }

    @Bean
    fun startStep(): Step {
        return stepBuilderFactory["startStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Start Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun nextStep(): Step {
        return stepBuilderFactory["nextStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Next Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun lastStep(): Step {
        return stepBuilderFactory["lastStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Last Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["multipleStepJob"]
    • Creates a batch job with the name multipleStepJob.
  • stepBuilderFactory["startStep"], stepBuilderFactory["nextStep"], stepBuilderFactory["lastStep"]
    • Creates batch steps with the names startStep, nextStep, and lastStep, respectively.

The execution result is as follows.

// ... omitted ...

2021-10-02 01:54:56.544  INFO 72707 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multipleStepJob]] launched with the following parameters: [{}]
2021-10-02 01:54:56.573  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [startStep]
2021-10-02 01:54:56.585  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Start Step!!
2021-10-02 01:54:56.593  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [startStep] executed in 20ms
2021-10-02 01:54:56.603  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [nextStep]
2021-10-02 01:54:56.607  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Next Step!!
2021-10-02 01:54:56.613  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [nextStep] executed in 10ms
2021-10-02 01:54:56.619  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [lastStep]
2021-10-02 01:54:56.622  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Last Step!!
2021-10-02 01:54:56.627  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [lastStep] executed in 8ms

// ... omitted ...

If you check the log above, you can confirm that Start Step!!, Next Step!!, and Last Step!! are printed for each step.

Implement Step Configuration Through Flow

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.ExitStatus
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class FlowStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun flowStepJob(): Job {
        return jobBuilderFactory["flowStepJob"]

            .start(flowStartStep())
            .on(ExitStatus.COMPLETED.exitCode) // "flowStartStep"의 "ExitStatus"가 "COMPLETED"인 경우
            .to(flowProcessStep()) // "flowProcessStep"을 실행 시킨다
            .on("*") // "flowProcessStep"의 결과와 상관없이
            .to(flowWriteStep()) // "writeStep"을 실행 시킨다.
            .on("*") // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료 시킨다.

            .from(flowStartStep())
            .on(ExitStatus.FAILED.exitCode) // "flowStartStep"의 "ExitStatus"가 "FAILED"일 경우
            .to(flowFailOverStep()) // "flowFailOverStep"을 실행 시킨다.
            .on("*") // "flowFailOverStep"의 결과와 상관없이
            .to(flowWriteStep()) //  "flowWriteStep"을 실행 시킨다.
            .on("*") //  // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료시킨다.

            .from(flowStartStep())
            .on("*") // "flowStartStep"의 "ExitStatus"가 "FAILED", "COMPLETED"가 아닌 모든 경우
            .to(flowWriteStep()) // "flowWriteStep"을 실행시킨다.
            .on("*") // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료시킨다.

            .end()
            .build()
    }

    @Bean
    fun flowStartStep(): Step {
        return stepBuilderFactory["flowStartStep"]
            .tasklet { contribution: StepContribution, _: ChunkContext ->
                log.info("Flow Start Step!")

                val result = "COMPLETED"
                // val result = "FAIL";
                // val result = "UNKNOWN";

                // "Flow"에서 "on"은 "RepeatStatus"가 아닌 "ExitStatus"를 바라본다.
                if (result == "COMPLETED") {
                    contribution.exitStatus = ExitStatus.COMPLETED
                } else if (result == "FAIL") {
                    contribution.exitStatus = ExitStatus.FAILED
                } else if (result == "UNKNOWN") {
                    contribution.exitStatus = ExitStatus.UNKNOWN
                }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowProcessStep(): Step {
        return stepBuilderFactory["flowProcessStep"]
            .tasklet { _: StepContribution?, _: ChunkContext? ->
                log.info("Flow Process Step!")
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowFailOverStep(): Step {
        return stepBuilderFactory.get("flowFailOverStep")
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Flow FailOver Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowWriteStep(): Step {
        return stepBuilderFactory["flowWriteStep"]
            .tasklet { _: StepContribution?, _: ChunkContext? ->
                log.info("Flow Write Step!")
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["flowStepJob"]
    • As described in the comments, behavior changes depending on the ExitStatus value of flowStartStep.
      • If ExitStatus is COMPLETED, flowProcessStep and flowWriteStep are executed.
      • If ExitStatus is FAILED, flowFailOverStep and flowWriteStep are executed.
      • If ExitStatus is neither FAILED nor COMPLETED, flowWriteStep is executed.

The execution result is as follows.

// ... omitted ...

:19:50.268  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=flowStepJob]] launched with the following parameters: [{}]
2021-10-02 02:19:50.295  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowStartStep]
2021-10-02 02:19:50.303  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Start Step!
2021-10-02 02:19:50.309  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowStartStep] executed in 14ms
2021-10-02 02:19:50.314  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowProcessStep]
2021-10-02 02:19:50.315  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Process Step!
2021-10-02 02:19:50.317  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowProcessStep] executed in 3ms
2021-10-02 02:19:50.320  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowWriteStep]
2021-10-02 02:19:50.322  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Write Step!
2021-10-02 02:19:50.324  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowWriteStep] executed in 3ms
2021-10-02 02:19:50.327  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=flowStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 47ms
2021-10-02 02:19:50.330  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multipleStepJob]] launched with the following parameters: [{}]

// ... omitted ...

If you check the log above, you can see that flowStartStep has contribution.exitStatus = ExitStatus.COMPLETED, so Flow Process Step! from flowProcessStep is printed.

Try changing the contribution.exitStatus value in flowStartStep and checking the logs.

References