Kotlin으로 Spring Batch 만들기

개요

Kotlin 언어를 이용하여 간단한 Spring Batch 프로젝트를 만들어 보겠다.

Spring Batch 개요

  • 스프링 배치는 스프링을 만든 Pivotal의 프로젝트 중 하나이다.
  • 이전 스케줄러에서 하던 작업을 대용량 배치로 빼서 만든 프레임워크이다.
  • 기존 스케줄링 작업은 로그확인의 어려움이 있었다.
  • Spring Batch는 일괄적 데이터 처리 가능하다.
  • 스키마(mysql.sql)을 통해 spring batch 메타 정보들 관련 table 생성할 수 있다.

Spring Batch 기본 개념

Job

  • Batch의 가장 큰 작업의 담위이며, 실행의 단위가 된다.
  • Bean을 통해 Job을 등록하고 파라미터 설정으로 Job 실행 가능하다.
  • N개의 Job을 생성할 수 있다.
  • Job은 N개의 Step으로 구성되어 있다. 최소 1개의 Step을 가져야 한다. 엄청나게 복잡한 Job이 아닌 이상 2-10개의 Step을 권장한다.

Step

  • Job안에 속하게 되는 작업이다.
  • Step은 Tasklet 처리 방식과 Chunk 지향 처리 방식(reader, processor, writer)을 지원하고 있다.
  • Step은 읽기 > 가공하기 > 쓰기의 묶음이다. 이 묶음을 Chunk processiong이라고 부르는데 하나의 트랜잭션으로 이해하면 된다. 바로 이 Chunk processing의 재시작의 핵심이다.

Tasklet

  • Step안에 속하는 작업이다.

Chunk

  • Chunk란 처리 되는 커밋 row 수를 의미한다.

  • Batch 처리에서 커밋 되는 row 수라는건 chunk 단위로 Transaction을 수행하기 때문에 실패시 Chunk 단위 만큼 rollback이 되게 된다.

  • Chunk 지향 처리에서는 다음과 같은 3가지 시나리오로 실행된다.

    • 읽기(Read) — Database에서 배치처리를 할 Data를 읽어온다
    • 처리(Processing) — 읽어온 Data를 가공,처리를 한다 (필수사항X)
    • 쓰기(Write) — 가공,처리한 데이터를 Database에 저장한다.

chunk, page 개념

  • page : 처리할 데이터 중 일정 개수만큼 조회
  • chunk : 조회한 데이터를 일정 개수만큼 처리 후 입력
  • page = chunk * n으로 세팅해야 성능적으로 좋다고 한다. 보편적으로는 page = chunk이다.

ItemReader

  • ItemReader는 말 그대로 데이터 읽기를 담당한다.
  • 필수이다.
  • 데이터 조회 타입
  • ItemReader 주요 구현체들
    • CursorItemReader : (stream)방식으로 1건씩 처리한다.
    • PagingItemReader : (page) 사이즈 만큼 조회하여 처리한다.

ItemProcessor

  • ItemProcessor는 ItemReader에게서 Object를 넘겨받아 원하는 방식으로 가공 후에 ItemWriter에 넘겨주는 역할을 하며, 한번에 하나의 아이템을 처리한다.
  • 필수가 아닌다.
  • 조회 데이터 후 가공한다.
  • ItemProcessor 주요 구현체들
    • CompositeItemProcessor : 프로세스를 체이닝 처리하여 순차적 진행한다.

ItemWriter

  • ItemReader 혹은 ItemProcessor가 ItemWriter로 데이터를 넘겨주면 리스트에 차곡차곡 쌓아놓는다.
  • 필수이다.
  • 데이터를 저장한다.
  • ItemWriter 구현체들
    • CompositeItemWriter
    • FlatFileItemWriter
    • HibernateItemWriter
    • JdbcBatchItemWriter
    • JsonFileItemWriter
    • MongoItemWriter

JobLauncher

Job을 실행하는 역활을 한다.

JobRepository

Job, Step 등의 배치 작업에 대한 메타 정보를 처리하는 인터페이스이다.
메타정보는 Spring Batch가 제공하는 핵심 기능 중 하나이다.

Spring Batch Meta-Data Schema

Spring Batch에는 Meta-Data가 Table 6개, Sequence 3개가 존재한다. 여기에 Spring BatchJob이 실행 될 때마다 실행된 Job에 대한 다양한 정보들이 저장되게 된다.

일반적으로는 Meta-Data Table이 없이는 Spring Batch를 실행시킬 수 없으나, 이는 필요에 따라 커스터마이징을 통해 Meta-Data Table이 없이도 실행되게 만들 수 있다. 하지만, 시스템 운영을 하면서 배치가 실행 및 실패의 이력 등의 정보를 확인해야 하기에 필요 반듯이 필요할 것이다.

spring-batch-core에 Meta-Dat Table들의 스키마를 DBMS별 schema-{DBMS}.sql 파일들이 포함되어 있다.
(IDE에서 schema-로 시작하는 파일들을 찾아 보면 나올 것이다.)

여기에서는 MySQL의 기준으로 Table Create문을 확익해 보도록 하겠다. MySql 스키마 파일은 schema-mysql.sql이다.
(참고로, MySql에서는 sequence가 존재하지 않기에 Sequense 역할을 하기 위한 Table도 만들어야 한다.)

BATCH_JOB_INSTANCE

BATCH_JOB_INSTANCE 테이블에는 JobInstance에 관련된 모든 정보가 포함되어 있다. 또한 해당 Table은 전체 계층 구조의 최상위 역할을 한다.

CREATE TABLE BATCH_JOB_INSTANCE  (
	JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT ,
	JOB_NAME VARCHAR(100) NOT NULL,
	JOB_KEY VARCHAR(32) NOT NULL,
	constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;

BATCH_JOB_INSTANCE의 Primary Key는 BATCH_JOB_SEQ에 의해 생성된다.

CREATE TABLE BATCH_JOB_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

BATCH_JOB_EXECUTION

BATCH_JOB_EXECUTION테이블에는 JobExcution에 관련된 모든 정보를 저장하고 있다. JobExcution은 JobInstance가 실행 될 때마다 시작시간, 종료시간, 종료코드 등 다양한 정보를 가지고 있다

CREATE TABLE BATCH_JOB_EXECUTION  (
	JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT  ,
	JOB_INSTANCE_ID BIGINT NOT NULL,
	CREATE_TIME DATETIME(6) NOT NULL,
	START_TIME DATETIME(6) DEFAULT NULL ,
	END_TIME DATETIME(6) DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED DATETIME(6),
	JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
	constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
	references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;

BATCH_JOB_EXECUTION의 Primary Key는 BATCH_JOB_EXECUTION_SEQ에 의해 생성된다.

CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);

BATCH_JOB_EXECUTION_PARAMS

BATCH_JOB_EXECUTION_PARAMS 테이블에는 Job을 실행 시킬 때 사용했던 JobParameters에 대한 정보를 저장하고 있다.

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	TYPE_CD VARCHAR(6) NOT NULL ,
	KEY_NAME VARCHAR(100) NOT NULL ,
	STRING_VAL VARCHAR(250) ,
	DATE_VAL DATETIME(6) DEFAULT NULL ,
	LONG_VAL BIGINT ,
	DOUBLE_VAL DOUBLE PRECISION ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

BATCH_STEP_EXECUTION

BATCH_JOB_EXECUTION테이블에는 StepExecution에 대한 정보를 저장하고 있다. BATCH_JOB_EXECUTION 테이블과 여러 면에서 유사하며 STEP을 EXECUTION 정보인 읽은 수, 커밋 수, 스킵 수 등 다양한 정보를 추가로 담고 있다.

CREATE TABLE BATCH_STEP_EXECUTION  (
	STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT NOT NULL,
	STEP_NAME VARCHAR(100) NOT NULL,
	JOB_EXECUTION_ID BIGINT NOT NULL,
	START_TIME DATETIME(6) NOT NULL ,
	END_TIME DATETIME(6) DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	COMMIT_COUNT BIGINT ,
	READ_COUNT BIGINT ,
	FILTER_COUNT BIGINT ,
	WRITE_COUNT BIGINT ,
	READ_SKIP_COUNT BIGINT ,
	WRITE_SKIP_COUNT BIGINT ,
	PROCESS_SKIP_COUNT BIGINT ,
	ROLLBACK_COUNT BIGINT ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED DATETIME(6),
	constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

BATCH_STEP_EXECUTION의 Primary Key는 BATCH_STEP_EXECUTION_SEQ에 의해 생성된다.

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);

BATCH_JOB_EXECUTION_CONTEXT

BATCH_JOB_EXECUTION_CONTEXT테이블에는 JobExecution의ExecutionContext 정보가 들어있다.이 ExecutionContext 데이터는 일반적으로 JobInstance가 실패 시 중단된 위치에서 다시 시작할 수 있는 정보를 저장하고 있다.

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
	STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
	references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;

BATCH_STEP_EXECUTION_CONTEXT

BATCH_STEP_EXECUTION_CONTEXT테이블에는 StepExecution의 ExecutionContext 정보가 들어있다. 이 ExecutionContext 데이터는 일반적으로 JobInstance가 실패 시 중단된 위치에서 다시 시작할 수 있는 정보를 저장하고 있다.

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
	JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

프로젝트 생성

아래와 같이 curl 명령어을 사용하여 Spring Boot 초기 프로젝트를 생성한다.

curl https://start.spring.io/starter.tgz \
-d bootVersion=2.5.5 \
-d dependencies=batch,h2 \
-d baseDir=kotlin-spring-batch \
-d groupId=com.devkuma \
-d artifactId=kotlin-spring-batch \
-d packageName=com.devkuma.batch \
-d applicationName=BatchApplication \
-d packaging=jar \
-d language=kotlin \
-d jvmVersion=11 \
-d type=gradle-project | tar -xzvf -

위 명령어를 실행하게 되면 Spring Batch, H2 Database를 추가하였다.

생성된 프로젝트의 파일 구조는 아래와 같이 구성된다.

.
├── HELP.md
├── build.gradle.kts
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
    ├── main
    │   ├── kotlin
    │   │   └── com
    │   │       └── devkuma
    │   │           └── batch
    │   │               └── BatchApplication.kt
    │   └── resources
    │       └── application.properties
    └── test
        └── kotlin
            └── com
                └── devkuma
                    └── batch
                        └── BatchApplicationTests.kt

Spring Batch 설정

/src/main/kotlin/com/devkuma/elasticsearch/classElasticsearchApplication.kt

package com.devkuma.batch

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@EnableBatchProcessing
@SpringBootApplication
class BatchApplication

fun main(args: Array<String>) {
	runApplication<BatchApplication>(*args)
}

@EnableBatchProcessing 어노테이션을 추가하여 배치 기능을 활성화 한다. 이 어노테이션을 추가 하므로써 Spring Batch 기능들을 사용할 수 있게 된다.

로깅 라이브러리 추가

/build.gradle.kts

// ... 생략 ...

dependencies {
	implementation("org.springframework.boot:spring-boot-starter-batch")
	implementation("org.jetbrains.kotlin:kotlin-reflect")
	implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
	runtimeOnly("com.h2database:h2")
	testImplementation("org.springframework.boot:spring-boot-starter-test")
	testImplementation("org.springframework.batch:spring-batch-test")

	// https://mvnrepository.com/artifact/io.github.microutils/kotlin-logging
	implementation("io.github.microutils:kotlin-logging:2.0.11")
}

// ... 생략 ...

파일 내용 중에 의존성 라이브러리를 확인해 보면 Spring 배치(spring-boot-starter-batch), 코틀린용 H2 Database(com.h2database:h2)가 추가되어 있는 것을 볼수 있다.

그리고, 신규로 코틀린 로깅(kotlin-logging)을 추가 하였다.

Tasklet 처리 방식

단일 스탭 구성 구현

단일 스탭 일감의 설정을 추가한다.

/src/main/kotlin/com/devkuma/batch/config/SingleStepJobConfig.kt

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class SingleStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun singleStepJob(): Job {
        return jobBuilderFactory["singleStepJob"]
            .start(singleStep())
            .build()
    }

    @Bean
    fun singleStep(): Step {
        return stepBuilderFactory["singleStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Single Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["singleStepJob"]
    • simpleJob이란 이름으로 Batch Job을 생성한다.
    • Job 이름은 별도로 지정하지 않고, 이렇게 Builder를 통해 지정한다.
  • stepBuilderFactory["singleStep"]
    • simpleStep1이란 이름으로 Batch Step을 생성한다.
    • 위에 Job 이름과 동일하게 Builder를 통해 이름을 지정한다.
  • .tasklet { _: StepContribution, _: ChunkContext
    • Step에서 수행될 기능들을 명시한다.
    • Tasklet은 Step에서 단일로 수행될 커스텀한 기능들을 선언할때 사용한다.
    • 여기서는 Batch가 수행되면 log.info { "Single Step!!" } 구문으로 의해 로그가 출력된다.

실행 결과는 아래와 같다.

// ... 생략 ...

2021-10-02 01:51:51.746  INFO 72551 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=singleStepJob]] launched with the following parameters: [{}]
2021-10-02 01:51:51.769  INFO 72551 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [singleStep]
2021-10-02 01:51:51.778  INFO 72551 --- [           main] c.d.batch.config.SingleStepJobConfig     : Single Step!!
2021-10-02 01:51:51.782  INFO 72551 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [singleStep] executed in 13ms
2021-10-02 01:51:51.786  INFO 72551 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=singleStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 28ms

// ... 생략 ...

위에 로그를 확인해 보면 “Single Step!!“이 출력된 것을 확인 할 수 있을 것이다.

다중 Step 구성 구현

다중 스탭 일감의 설정을 추가한다.

/src/main/kotlin/com/devkuma/batch/config/SingleStepJobConfig.kt

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class MultipleStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun multipleStepJob(): Job {
        return jobBuilderFactory["multipleStepJob"]
            .start(startStep())
            .next(nextStep())
            .next(lastStep())
            .build()
    }

    @Bean
    fun startStep(): Step {
        return stepBuilderFactory["startStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Start Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun nextStep(): Step {
        return stepBuilderFactory["nextStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Next Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun lastStep(): Step {
        return stepBuilderFactory["lastStep"]
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Last Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["multipleStepJob"]
    • multipleStepJob이란 이름으로 Batch Job을 생성한다.
  • stepBuilderFactory["startStep"], stepBuilderFactory["nextStep"], stepBuilderFactory["lastStep"]
    • 각각 startStep, nextStep, lastStep 이름으로 Batch Step을 각각 생성한다.

실행 결과는 아래와 같다.

// ... 생략 ...

2021-10-02 01:54:56.544  INFO 72707 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multipleStepJob]] launched with the following parameters: [{}]
2021-10-02 01:54:56.573  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [startStep]
2021-10-02 01:54:56.585  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Start Step!!
2021-10-02 01:54:56.593  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [startStep] executed in 20ms
2021-10-02 01:54:56.603  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [nextStep]
2021-10-02 01:54:56.607  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Next Step!!
2021-10-02 01:54:56.613  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [nextStep] executed in 10ms
2021-10-02 01:54:56.619  INFO 72707 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [lastStep]
2021-10-02 01:54:56.622  INFO 72707 --- [           main] c.d.batch.config.MultipleStepJobConfig   : Last Step!!
2021-10-02 01:54:56.627  INFO 72707 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [lastStep] executed in 8ms

// ... 생략 ...

위에 로그를 확인해 보면 스탭별로 “Start Step!!”, “Next Step!!”, “Last Step!!“이 출력된 것을 확인 할 수 있을 것이다.

Flow를 통한 Step 구성 구현

package com.devkuma.batch.config

import mu.KotlinLogging
import org.springframework.batch.core.ExitStatus
import org.springframework.batch.core.Job
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

private val log = KotlinLogging.logger {}

@Configuration
class FlowStepJobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun flowStepJob(): Job {
        return jobBuilderFactory["flowStepJob"]

            .start(flowStartStep())
            .on(ExitStatus.COMPLETED.exitCode) // "flowStartStep"의 "ExitStatus"가 "COMPLETED"인 경우
            .to(flowProcessStep()) // "flowProcessStep"을 실행 시킨다
            .on("*") // "flowProcessStep"의 결과와 상관없이
            .to(flowWriteStep()) // "writeStep"을 실행 시킨다.
            .on("*") // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료 시킨다.

            .from(flowStartStep())
            .on(ExitStatus.FAILED.exitCode) // "flowStartStep"의 "ExitStatus"가 "FAILED"일 경우
            .to(flowFailOverStep()) // "flowFailOverStep"을 실행 시킨다.
            .on("*") // "flowFailOverStep"의 결과와 상관없이
            .to(flowWriteStep()) //  "flowWriteStep"을 실행 시킨다.
            .on("*") //  // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료시킨다.

            .from(flowStartStep())
            .on("*") // "flowStartStep"의 "ExitStatus"가 "FAILED", "COMPLETED"가 아닌 모든 경우
            .to(flowWriteStep()) // "flowWriteStep"을 실행시킨다.
            .on("*") // "flowWriteStep"의 결과와 상관없이
            .end() // "Flow"를 종료시킨다.

            .end()
            .build()
    }

    @Bean
    fun flowStartStep(): Step {
        return stepBuilderFactory["flowStartStep"]
            .tasklet { contribution: StepContribution, _: ChunkContext ->
                log.info("Flow Start Step!")

                val result = "COMPLETED"
                // val result = "FAIL";
                // val result = "UNKNOWN";

                // "Flow"에서 "on"은 "RepeatStatus"가 아닌 "ExitStatus"를 바라본다.
                if (result == "COMPLETED") {
                    contribution.exitStatus = ExitStatus.COMPLETED
                } else if (result == "FAIL") {
                    contribution.exitStatus = ExitStatus.FAILED
                } else if (result == "UNKNOWN") {
                    contribution.exitStatus = ExitStatus.UNKNOWN
                }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowProcessStep(): Step {
        return stepBuilderFactory["flowProcessStep"]
            .tasklet { _: StepContribution?, _: ChunkContext? ->
                log.info("Flow Process Step!")
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowFailOverStep(): Step {
        return stepBuilderFactory.get("flowFailOverStep")
            .tasklet { _: StepContribution, _: ChunkContext ->
                log.info { "Flow FailOver Step!!" }
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun flowWriteStep(): Step {
        return stepBuilderFactory["flowWriteStep"]
            .tasklet { _: StepContribution?, _: ChunkContext? ->
                log.info("Flow Write Step!")
                RepeatStatus.FINISHED
            }
            .build()
    }
}
  • jobBuilderFactory["flowStepJob"]
    • 주석에 내용을 따라 flowStartStepExitStatus 값에 따라 동작을 다르게 하게 된다.
      • ExitStatusCOMPLETED인 경우는 flowProcessStep, flowWriteStep를 실행 시킨다.
      • ExitStatusFAILED인 경우는 flowFailOverStep, flowWriteStep를 실행 시킨다.
      • ExitStatusFAILED, “COMPLETED"가 둘다 아닌 경우는 flowWriteStep를 실행 시킨다.

실행 결과는 아래와 같다.

// ... 생략 ...

:19:50.268  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=flowStepJob]] launched with the following parameters: [{}]
2021-10-02 02:19:50.295  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowStartStep]
2021-10-02 02:19:50.303  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Start Step!
2021-10-02 02:19:50.309  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowStartStep] executed in 14ms
2021-10-02 02:19:50.314  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowProcessStep]
2021-10-02 02:19:50.315  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Process Step!
2021-10-02 02:19:50.317  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowProcessStep] executed in 3ms
2021-10-02 02:19:50.320  INFO 73719 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flowWriteStep]
2021-10-02 02:19:50.322  INFO 73719 --- [           main] c.d.batch.config.FlowStepJobConfig       : Flow Write Step!
2021-10-02 02:19:50.324  INFO 73719 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flowWriteStep] executed in 3ms
2021-10-02 02:19:50.327  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=flowStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 47ms
2021-10-02 02:19:50.330  INFO 73719 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multipleStepJob]] launched with the following parameters: [{}]

// ... 생략 ...

위에 로그를 확인해 보면 flowStartStepcontribution.exitStatuscontribution.exitStatus = ExitStatus.COMPLETED 이어서 flowProcessStepFlow Process Step!가 출력되는 것을 확인 할 수 있다.

flowStartStepcontribution.exitStatus 값을 변경해 보면서 로그는 확인해 보도록 하자.

참고