Spring Batch 멀티 스레드 프로세싱을 활용하며 겪은 문제 (1)

ds_chanin

·

2023. 1. 30. 01:51


들어가며

보통의 팀들이 사용하듯 통계성, 일회성 작업을 배치로 많이 작성하여 해결하고 있다. 그런데 필자가 기본적인 배치의 구조를 잘못 이해하면서 문제가 발생했다.

스프링 배치의 멀티 스레드 프로세싱을 활용하며 겪은 문제 중 잡 스코프(job scope)를 가지는 빈(bean)을 사용할 경우 겪은 문제를 기록한다.

문제

회사 코드를 가져올 수 없으니 예제 코드를 작성하고 요구사항에서 문제가 발생한 부분에 집중하기 위해 불 필요한 부분은 생략하였다.

코드는 코틀린으로 작성했다.

요구 사항

다른 시스템에서 전달받은 이벤트 중 특정 이벤트의 값이 잘못되어 올바르지 않은 값이 저장되었다.

다른 시스템을 담당하는 팀에서 파일로 전달받은 값으로 특정 데이터들을 수정해야 한다.

작성한 코드

배치 코드

처리 속도를 올리기 위해 멀티 스레드로 처리하도록 step에 taskExecutor와 throttleLimit을 설정해준것을 확인 할 수 있다.

@Configuration
class EventNameMigrationBatchConfiguration(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory,
    private val dataSource: DataSource,
    private val jobParameters: EventNameMigrationJobParameters,
) {
    val log = KotlinLogging.logger(EventNameMigrationBatchConfiguration::class.java.name)

    @Bean(JOB)
    fun job(): Job {
        return jobBuilderFactory[JOB]
            .preventRestart()
            .start(step())
            .build()
    }

    @JobScope
    @Bean(STEP)
    fun step(): Step {
        return stepBuilderFactory[STEP]
            .chunk<UpdateDto, UpdateDto>(jobParameters.chunkSize)
            .reader(reader())
            .processor(processor(null))
            .writer(writer())
            .taskExecutor(taskExecutor())
            .throttleLimit(jobParameters.poolSize)
            .build()
    }

    @StepScope
    @Bean(READER)
    fun reader(): FlatFileItemReader<UpdateDto> {
        val lineMapper: DefaultLineMapper<UpdateDto> = DefaultLineMapper<UpdateDto>()
            .apply {
                setLineTokenizer(
                    DelimitedLineTokenizer(",")
                        .apply { setNames("uuid", "name") }
                )
                setFieldSetMapper(
                    BeanWrapperFieldSetMapper<UpdateDto>()
                        .apply { setTargetType(UpdateDto::class.java) }
                )
            }

        return FlatFileItemReaderBuilder<UpdateDto>()
            .lineMapper(lineMapper)
            .resource(FileSystemResource(File(jobParameters.filePath)))
            .saveState(false)
            .build()
    }

    @StepScope
    @Bean(PROCESSOR)
    fun processor(): ItemProcessor<UpdateDto, UpdateDto> {
        return ItemProcessor {
            log.info { "item: $it" }
            return@ItemProcessor if (jobParameters.updatable) {
                it
            } else {
                null
            }
        }
    }

    @StepScope
    @Bean(WRITER)
    fun writer(): ItemWriter<UpdateDto> {
        return JdbcBatchItemWriterBuilder<UpdateDto>()
            .sql("UPDATE service_event SET name = ? WHERE uuid = ?")
            .dataSource(dataSource)
            .itemPreparedStatementSetter { item, ps ->
                ps.setString(1, item.name)
                ps.setString(2, item.uuid)
            }
            .build()
    }

    @JobScope
    @Bean(TASK_EXECUTOR)
    fun taskExecutor(): ThreadPoolTaskExecutor {
        return ThreadPoolTaskExecutor()
            .apply {
                corePoolSize = jobParameters.poolSize
                maxPoolSize = jobParameters.poolSize
                setThreadNamePrefix("event-name-migration-thread-")
                setAwaitTerminationSeconds(10)
                setQueueCapacity(Integer.MAX_VALUE)
                setWaitForTasksToCompleteOnShutdown(true)
            }
    }

    data class UpdateDto(
        var uuid: String? = null,
        var name: String? = null,
    )

    companion object {
        const val JOB = "EventNameMigrationBatch"
        const val STEP = "${JOB}Step"
        const val READER = "${STEP}Reader"
        const val PROCESSOR = "${STEP}Processor"
        const val WRITER = "${STEP}Writer"
        const val TASK_EXECUTOR = "${JOB}TaskExecutor"
    }
}

잡 파라미터 코드

잡 파라미터를 사용하는 여러가지 방법 중 하나인 JobScope를 가진 Bean으로 선언하여 사용하였다.

@JobScope
@Component
class EventNameMigrationJobParameters(
    @Autowired
    @Value("#{jobParameters['chunkSize']}")
    private var _chunkSize: Int? = null,

    @Autowired
    @Value("#{jobParameters['poolSize']}")
    private var _poolSize: Int? = null,

    @Autowired
    @Value("#{jobParameters['filePath']}")
    private var _filePath: String? = null,

    @Autowired
    @Value("#{jobParameters['updatable']}")
    private var _updatable: Boolean? = null,
) {
    val chunkSize: Int
        get() = _chunkSize!!

    val poolSize: Int
        get() = _poolSize!!

    val filePath: String
        get() = _filePath!!

    val updatable: Boolean
        get() = _updatable == true
}

문제가 발생한 부분

테스트를 실행하면 아래 부분에서 에러가 발생한다.

    @StepScope
    @Bean(PROCESSOR)
    fun processor(): ItemProcessor<UpdateDto, UpdateDto> {
        return ItemProcessor {
            log.info { "item: $it" }
            return@ItemProcessor if (jobParameters.updatable) { // 문제 발생
                it
            } else {
                null
            }
        }
    }
Caused by: java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159)
    at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:360)
    ... 31 more

컨텍스트에 있어야 하는 잡 파라미터를 프로세서가 찾지 못하면서 문제가 발생했다는 에러이다.

문제가 발생한 이유

스프링 배치를 사용할 때 잡 파라미터(job parameter)를 클래스로 선언해서 많이 사용하고 있다.

잡 파라미터를 클래스로 선언하면서 아래와 같이 JobScope로 설정해서 사용하고 있는데 JobExecution이 ThreadLocal에 저장되어 하위 Worker Thread에 전달이 되지 않아 발생한다.

아래에 적어둔 것 처럼 공식문서에 기재되어 있었다.

[공식문서]
There are some practical limitations of using job-scoped beans in multi-threaded or partitioned steps. Spring Batch does not control the threads spawned in these use cases, so it is not possible to set them up correctly to use such beans. Hence, we do not recommend using job-scoped beans in multi-threaded or partitioned steps.

문제 해결 방법

1. @Value 를 이용하여 직접 주입

JobScope Bean을 사용하지 않고 직접 @Value를 이용하여 잡 파라미터를 주입해주면 해결된다.

    @StepScope
    @Bean(PROCESSOR)
    fun processor(@Value("#{jobParameters['updatable']}") updatable: Boolean?): ItemProcessor<UpdateDto, UpdateDto> {
        return ItemProcessor {
            log.info { "item: $it" }
            return@ItemProcessor if (updatable == true) {
                it
            } else {
                null
            }
        }
    }

2. (WARN) TaskExecutor에서 JobExecution을 전달하도록 수정

TaskDecorator를 이용하여 JobExecution을 worker thread에 전달하는 방식이다.

그러나 아래 방법이 안전한지 모른다. 스프링 배치에서 권장하지 않는 기능을 강제로 가능하게 만든 것일 뿐이라 확신하지 못하겠다.

    @JobScope
    @Bean(TASK_EXECUTOR)
    fun taskExecutor(): TaskExecutor {
        val jobExecution = JobSynchronizationManager.getContext()!!.jobExecution
        return ThreadPoolTaskExecutor()
            .apply {
                corePoolSize = jobParameters.poolSize
                maxPoolSize = jobParameters.poolSize
                setThreadNamePrefix("event-name-migration-thread-")
                setAwaitTerminationSeconds(10)
                setQueueCapacity(Integer.MAX_VALUE)
                setWaitForTasksToCompleteOnShutdown(true)
                // Decorator를 이용하여 JobExecution을 worker thread에 전달
                setTaskDecorator {
                    Runnable {
                        JobSynchronizationManager.register(jobExecution)
                        try {
                            it.run()
                        } catch (e: Exception) {
                            log.error { e }
                        } finally {
                            JobSynchronizationManager.close()
                        }
                    }
                }
            }
    }