이벤트 재발행 처리하기 - Transactional Outbox Pattern

2025. 4. 8. 23:00·spring
 

이벤트 발행으로 결제 후처리 하기

점점 많아지는 결제 후 처리 과정진행중인 사이드 프로젝트는 수능 문제를 구매하는 플랫폼으로 유저는 포인트를 지불해서 수능 문제를 구매할 수 있습니다.public void payment(Long userId, List questionI

e4g3r.tistory.com

위 포스팅에서는 Transactional Outbox Pattern을 도입하여 이벤트 발행 실패로 발생할 수 있는 데이터 정합성 문제를 방지하였습니다.

해당 포스팅을 작성한 시점에는 OutBox 테이블만 구현하였고 재발행하는 로직을 작성하지 않았습니다.

Transactional Outbox Pattern을 마무리하기 위해 이번에는 이벤트 재발행을 처리해보았습니다.

class QuestionPaymentEvent(
    override val eventId: String,
    val questionPayment: QuestionPayment
) : SQSEvent {
    override fun toRequest(): PublishRequest {
        return PublishRequest.builder()
            .topicArn("arn:aws:********")
            .messageGroupId(questionPayment.order.orderId)
            .messageDeduplicationId(questionPayment.order.orderId)
            .message(SQSEvent.objectMapper.writeValueAsString(this))
            .build()
    }

    companion object {
        fun create(questionPayment: QuestionPayment): QuestionPaymentEvent {
            return QuestionPaymentEvent(questionPayment.order.orderId, questionPayment)
        }
    }
}

문제 결제 후처리를 위해 발행되는 QuestionPaymentEvent입니다. Transactional Outbot Pattern을 도입하였으므로

해당 이벤트를 발행하기 전에는 Outbox table에 저장하는 로직이 추가됩니다.

class QuestionPaymentEventLog(
    val eventId: String,
    val isPublish: Boolean,
    val payload: String,
    val createdAt: LocalDateTime
) {
    companion object {
        fun create(eventId: String, payload: String): QuestionPaymentEventLog {
            return QuestionPaymentEventLog(eventId, false, payload, LocalDateTime.now())
        }
    }
}

OutBox 역할을 하는 QuestionPaymentEventLog 내부에는 QuestionPaymentEvent를 Json으로 직렬화한 데이터 payload,

발행 성공 여부를 의미하는 isPublish가 있습니다. 주기적으로 isPublish가 False인 Log를 찾아서 재발행 처리를 해주면 됩니다.

 

현재 프로젝트에서는 2개의 이벤트(문제 결제, 리뷰 작성)가 존재하고 따라서 2개의 EventProcessor가 존재합니다.

abstract class AbstractEventProcessor<T : SQSEvent> {
    val republishCoroutineScope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

    abstract fun saveEventLog(event: T)

    abstract fun publishEvent(event: T)

    @Scheduled(fixedDelay = 10000)
    suspend fun republishScheduled() {
        var hasMoreEvents = true

        while (hasMoreEvents) {
            val events = getUnpublishedEvents()

            if (events.isEmpty()) {
                hasMoreEvents = false
            }

            republish(events)
        }
    }

    abstract fun getUnpublishedEvents(): List<T>

    abstract suspend fun republish(events: List<T>)
}

EventProcessor 추상 클래스를 정의하였습니다.

2개의 EventProcessor 구현체는 동일하게 구현되기에 QuestionPaymentEventProcessor를 살펴보겠습니다.

@Transactional(propagation = Propagation.MANDATORY)
override fun saveEventLog(event: QuestionPaymentEvent) {
    questionPaymentEventLogRepository.save(QuestionPaymentEventLog.create(event.eventId, event.toJson()))
    applicationEventPublisher.publishEvent(event)
}

saveEventLog 메서드는 Outbox 테이블에 이벤트 발행 로그를 남기는 역할입니다.

saveEventLog는 비즈니스 로직의 트랜잭션(부모 트랜잭션)과 함께 묶이기 때문에 데이터 정합성을 보장할 수 있습니다.

이후 Outbox에 저장이 완료되었음을 알리는 내부 이벤트를 발행합니다.

@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
override fun publishEvent(event: QuestionPaymentEvent) {
    snsClient.publish(event.toRequest())
    questionPaymentEventLogRepository.publish(event.questionPayment.order.orderId)
}

publishEvent 메서드는 Outbox에 로그 저장 이후 snsClient를 통해 AWS SNS로 이벤트 토픽을 발행하는 과정입니다.

override fun getUnpublishedEvents(): List<QuestionPaymentEvent> {
    return questionPaymentEventLogRepository.getUnPublishedEvent()
        .stream()
        .map { log -> SQSEvent.objectMapper.readValue(log.payload, QuestionPaymentEvent::class.java) }
        .toList()
}

getUnPublishedEvents는 발행되지 않은 이벤트를 DB로부터 조회하는 메서드입니다.
발행되지 않은 이벤트가 많다면 한번에 조회하는 경우 메모리 사용량이 매우 높아질 수 있기에 repository를 통해 outbox 테이블로부터
데이터를 1000개씩 조회합니다. 

payload는 Event 객체를 JSON으로 직렬화 된 형태라고 했기에 objectMapper를 통해 다시 원본 객체로 역직렬화하여 반환합니다.

@Scheduled(fixedDelay = 10000)
suspend fun republishScheduled() {
    var hasMoreEvents = true

    while (hasMoreEvents) {
        val events = getUnpublishedEvents()

        if (events.isEmpty()) {
            hasMoreEvents = false
        }

        republish(events)
    }
}

republishScheduled 메서드는 추상 클래스에서 유일하게 구현되어 있는 메서드로 @Scheduled를 통해 10초마다 수행됩니다.

while문을 통해 발행되지 않은 이벤트가 없을 때까지 이벤트를 발행합니다. republish 메서드를 호출하여 재발행 처리를 합니다.

override suspend fun republish(events: List<QuestionPaymentEvent>) {
    republishCoroutineScope.launch {
        events.map { event ->
            launch {
                snsClient.publish(event.toRequest())
                questionPaymentEventLogRepository.publish(event.eventId)
            }
        }
    }.join()
}

republish 메서드는 events를 전달받아 AWS SNS API를 통해 이벤트를 발행하는 메서드입니다.

코루틴을 사용하여 동시에 처리되도록 하였습니다.

여러개의 이벤트를 발행하는 것을 동기적으로 처리를 하는 것은 비효율적이기 때문이라고 생각했기 때문입니다.

 

만약 1000개의 미발행 이벤트를 재발행 처리해야 한다면 AWS SNS API를 1000번 호출해야합니다.

단순하게 순회를 하면서 API를 호출하게 되는 경우 처리 시간이 길어질 수 있습니다.

예를 들어 한번 API를 호출하는 데 소요 시간이 0.1초라면 1000번 호출하는 경우 100초가 소요됩니다.

하지만 1000개를 동시에 요청할 수 있다면 이론상 0.1초가 소요됩니다.

 

원래는 ExecutorService을 이용한 병렬처리를 하려 했으나 코루틴이 논리 쓰레드를 통해 유사 병렬 처리가 가능하다는 것을 알게 되었고

코틀린으로 전환한 겸 코루틴을 통해 처리하였습니다. 무엇보다도 가볍고 사용하기 편리했습니다.

 

아직 간단한 코루틴 사용법을 알고 작성하였기에 조만간 코루틴을 공부하며 다시 정리해보는 시간을 가져봐야겠습니다.

'spring' 카테고리의 다른 글

Spring Logging 구축하기 - (메서드 로그)  (0) 2025.05.17
[Spring] AWS SNS 토픽 발행 - 블로킹 VS 논블로킹  (0) 2025.05.01
Spring Security 들여다보기 (2)  (0) 2025.04.05
Spring Security 들여다보기 (1)  (0) 2025.04.02
Spring / Spring Security filter chain 들여다보기  (0) 2025.03.29
'spring' 카테고리의 다른 글
  • Spring Logging 구축하기 - (메서드 로그)
  • [Spring] AWS SNS 토픽 발행 - 블로킹 VS 논블로킹
  • Spring Security 들여다보기 (2)
  • Spring Security 들여다보기 (1)
e4g3r
e4g3r
e4g3r 님의 블로그 입니다.
  • e4g3r
    e4g3r 님의 블로그
    e4g3r
  • 전체
    오늘
    어제
    • 분류 전체보기 (39)
      • spring (22)
      • kotlin (6)
      • java (3)
      • database (3)
      • cs 공부 기록용 (5)
  • 인기 글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
e4g3r
이벤트 재발행 처리하기 - Transactional Outbox Pattern
상단으로

티스토리툴바