spring

[Spring] AWS SNS 토픽 발행 - 블로킹 VS 논블로킹

e4g3r 2025. 5. 1. 21:35
 

이벤트 재발행 처리하기 - Transactional Outbox Pattern

이벤트 발행으로 결제 후처리 하기점점 많아지는 결제 후 처리 과정진행중인 사이드 프로젝트는 수능 문제를 구매하는 플랫폼으로 유저는 포인트를 지불해서 수능 문제를 구매할 수 있습니다.p

e4g3r.tistory.com


이전 포스팅에서는 Transaction Outbox Pattern을 적용하면서 발행되지 않은 이벤트를 재발행 처리하는 과정을 코루틴을 통해
처리하였습니다.

override suspend fun republish(events: List<ReviewEvent>) {
    supervisorScope {
        events.forEach { event ->
            launch(Dispatchers.IO) {
                snsClient.publish(event.toRequest())
            }
        }
    }
}

 

코루틴을 통해 1000개 단위로 이벤트를 발행 처리하며 코루틴 내부 로직은 snsClient(HTTP) 요청으로 구성되어있습니다.

그런데 코루틴을 공부하면서 동기 / 비동기, 블로킹 / 논블로킹 개념을 접하다보니 의문이 들게 되었습니다.

 

위 코드는 코루틴을 사용해서 여러 개의 Sns Client(HTTP)을 동시에 처리하긴 하지만 snsClien 작업 자체는 I/O 작업으로 인해

블로킹이 발생하게 됩니다.

 

코루틴은 논블로킹인 경우 굉장히 효율적이기 때문에 내부 로직도 논블로킹으로 변경하면 더 좋은 성능이 나오지 않을까 궁금했습니다.

블로킹 / 논블로킹 시간 측정

@Test
fun useSnsClient() {
    val time = measureTimeMillis {
        runBlocking {
            for (i in 1..10) {
                coroutineScope {
                    for (j in 1..1000) {
                        launch(Dispatchers.IO) {
                            snsClient.publish(TestTopic("abc-$j").toRequest())
                        }
                    }
                }
            }
        }
    }
    println(time)
}

// 4511
// 5001
// 4711

 

기존 블로킹 방식 snsClient를 사용하는 경우 10000개의 SNS 토픽을 1000개 단위로 끊어서 발행하는 경우 약 4.7 초 정도의 시간이
소요 되었습니다.

@Test
fun useSnsClient() {
    val time = measureTimeMillis {
        runBlocking {
            for (i in 1..10) {
                coroutineScope {
                    for (j in 1..1000) {
                        launch(Dispatchers.IO) {
                            snsAsyncClient.publish(TestTopic("abc-$j").toRequest()).await()
                        }
                    }
                }
            }
        }
    }
    println(time)
}

// 4649
// 4744
// 4692

 

논블로킹 방식으로 SNS 토픽을 발행하는 snsAsyncClient의 경우 또한 4.7초 정도로 블로킹 방식과 별 차이가 없었습니다.

블로킹과 논블로킹 처리 방식

@Test
fun useSnsClient() {
    val time = measureTimeMillis {
        runBlocking {
            for (i in 1..10) {
                coroutineScope {
                    for (j in 1..1000) {
                        launch(Dispatchers.IO) {
                            snsClient.publish(TestTopic("abc-$j").toRequest())
                            snsAsyncClient.publish(TestTopic("abc-$j").toRequest()).await()
                        }
                    }
                }
            }
        }
    }
    println(time)
}

 

기존 블로킹 방식의 SNS 토픽 발행은 HTTP I/O 작업이 발생할 때 Dispatcher에 의해 할당된 쓰레드가 블로킹 됩니다.

따라서 논블로킹 방식으로 변경해준다면 Dispatcher의 쓰레드 자원을 더 효율적으로 사용할 수 있을 것이라고 생각했습니다.

 

 

 

만약 블로킹 방식이였다면 1000개의 토픽을 동시에 코루틴을 통해 발행하려고 할 때 Dispatcher.IO의 Thread Pool의 크기만큼 동시에 

처리 될 겁니다. (Dispatcher.IO의 Default Thread Pool의 Thread 개수는 64개입니다.)

블로킹 방식이기에 먼저 64개의 요청이 수행되고 처리될 때 까지 모든 Pool을 점유할 것입니다.

그리고 요청이 수행되고 Thread Pool에 빈자리가 생기게 되면  Dispatcher에 의해 대기중인 publish coroutine이 처리될 것입니다.

 

예를 들어 publish-coroutine-1이 끝나고 나면 대기 중이던 publish-coroutine-7 작업이 시작된다고 가정하겠습니다.

publish-coroutine-1이 제일 처음 시작 된 작업이고 publish coroutine의 처리 시간이 10초 소요 된다면 publish-coroutine-7의
요청 시작까지 10초가 소요됩니다.

따라서 10초가 지난 이후에 publish-coroutine-7이 시작 되고 10초가 또 지나야 완료되기에 총 20초가 소요됩니다.

 

1. 프로그램 시작

2. publish-coroutine-1 처리 시작 (프로그램 시작 0초 경과)

3. publish-coroutine-1 작업 완료 (프로그램 시작 10초 경과)
4. publish-coroutine-7 처리 시작 (프로그램 시작 10초 경과)
5. publish-coroutine-7 처리 완료 (프로그램 시작 20초 경과)


만약 논블로킹 방식으로 처리한다면 publish 요청을 보내고 바로 suspend 상태가 되면서 Thread를 블로킹하지 않고 반납합니다.

따라서 대기 중이던 publish-coroutine 작업들은 바로 Thread 자원을 사용할 수 있고 즉시 publish 요청을 보냅니다.

앞의 publish 작업 완료를 기다리지 않기 때문에 이론상 1000개의 publish 요청을 동시에 보낼 수 있습니다.

 

위 내용만 보면 논블로킹 방식이 더 빠를 것 같지만 거의 비슷한 처리 속도였습니다. 왜 그럴까 더 원인을 찾아보았습니다.

I/O 작업의 처리 속도

먼저 여러가지 테스트를 해보던 중 I/O 작업의 처리 속도를 기준으로 생각해보았습니다.
왜냐하면 블로킹, 논블로킹에서 처리 속도를 유의미하게 차이를 낼 조건은 I/O 작업이 얼마나 소요되냐가 핵심일 것 같았기 때문입니다.

 

위에서 작성된 시나리오처럼 블로킹의 경우 다음 I/O 작업이 처리되기 위해선 쓰레드를 점유하고 있는 I/O 작업이 끝나야 시작되는 것인데 I/O 작업이 오래걸릴수록 다음 I/O 작업의 시작이 지연되기 때문입니다.

suspend fun main() {
    val blockingTime = measureTimeMillis {
        coroutineScope {
            for (i in 1..1000) {
                launch(Dispatchers.IO) {
                    blocking()
                }
            }
        }
    }

    val nonBlockingTime = measureTimeMillis {
        coroutineScope {
            for (i in 1..1000) {
                launch(Dispatchers.IO) {
                    nonBlocking()
                }
            }
        }
    }

    println("Blocking Time: $blockingTime")
    println("Non-Blocking Time: $nonBlockingTime")
}


fun blocking() {
    Thread.sleep(1000)
}

suspend fun nonBlocking() {
    delay(1000)
}

 

따라서 blocking 메서드에서는 Thread.sleep을 통해 1초의 블로킹을 가정하였고 nonBlocking 메서드에서는 delay를 통해 1초의
논블로킹을 가정하였습니다. 그리고 각 1000개의 코루틴을 생성해서 처리 소요시간을 비교해보았습니다.

 

결과는 매우 유의미하였습니다. 블로킹 방식의 경우 약 16초가 소요되었고 논블로킹 방식의 경우 1초가 소요되었습니다.

약 16배가 차이가 나게 되었습니다.

 

이전에 언급되었듯이 Dispatcher.IO의 기본 쓰레드 풀의 쓰레드 개수는 64개입니다.

따라서 1000개의 코루틴 중 먼저 64개의 코루틴이 Dispatcher.IO의 쓰레드 풀을 사용하게 됩니다.

 

하지만 블로킹 방식이기 때문에 64개의 코루틴 작업은 1초동안 쓰레드 풀을 점유하게 됩니다.

1초가 지난 후 기존 64개의 코루틴들은 종료가 되고 다음 64개의 코루틴 작업들이 쓰레드 풀을 사용하게 됩니다.

즉 1초마다 64개의 블로킹 코루틴 작업들이 완료되기에 1000 / 64 = 15.625이므로 약 15초가 소요됩니다.

 

반면에 Non-Blocking의 경우 블로킹 작업이 아니므로 거의 동시에 1000개의 코루틴 작업들이 시작되고 1초 뒤에 종료됩니다.

 

blocking 및 nonBlocking의 수치를 점점 줄이면 줄일수록 blocking 방식의 경우 쓰레드를 점유하는 시간이 줄어들어 논블로킹
방식과 차이가 줄어들게 됩니다.

 

 

지연 시간을 10ms로 주었을 경우 1000개의 코루틴을 처리하는데 소요되는 시간은 블로킹 방식의 경우 205ms,

논블로킹 방식의 경우 37ms가 소요되었습니다.

 

 

지연 시간을 1ms로 주었을 경우에는 블로킹 방식의 경우 54ms, 논블로킹 방식의 경우 21ms가 소요되었습니다.

 

그런데 지연시간이 1ms임에도 블로킹 방식과 논블로킹 방식의 경우 2배가 차이가 나는데 왜 SNS 발행의 경우 이런 유의미한

차이가 나지 않았을까요.

 

SNS 토픽 발행의 경우 내부적으로 AWS SDK를 통해 HTTP 요청을 보내기에 적어도 50ms이상은 소요 된다는 것은 확신할 수 있습니다.

AWS SDK 논블로킹 동작 방식


AWS SDK가 제공하는 논블로킹 방식의 SnsAsyncClient는 내부적으로 SdkAsyncHttpClient를 사용합니다.

 

그리고 SdkAsyncHttpClient 구현체 NettyNioAsyncHttpClient를 사용하게 됩니다.

https://sdk.amazonaws.com/java/api/2.1.3/software/amazon/awssdk/http/SdkHttpConfigurationOption.html#MAX_CONNECTIONS

 

그리고 내부적으로 MAX_CONNECTIONS 라는 설정이 존재합니다. 해당 값은 커넥션 풀만큼이라고 명시되어있습니다.

디버깅을 확인해본 결과 MAX_CONNECTIONS의 값은 50임을 볼 수 있었습니다.

 

따라서 SNSAsyncClient를 사용하여 SNS 토픽을 논블로킹으로 발행하더라도 Netty의 Connection Pool은 50개이기에

1000개의 코루틴이 논블로킹으로 실행되더라도 실제 HTTP 요청은 50개씩 처리됩니다.

(이미 Connection Pool 50개를 사용중이면 대기 상태가 되기 때문입니다.)

 

논블로킹 방식도 결국 제한된 Connection Pool 자원을 사용하기에 1000개의 요청을 50개씩 끊어서 처리한다고 볼 수 있습니다.

(쓰레드 블로킹은 되지 않음)


그럼 Netty의 Connection Pool을 늘릴수록 한번에 요청할 수 있는 HTTP 요청이 늘어날테니 더 빠르게 처리할 수 있을 것 같아

테스트 해보았습니다.

@Bean
fun snsAsyncClient(): SnsAsyncClient {
    return SnsAsyncClient.builder()
        .httpClientBuilder(
            NettyNioAsyncHttpClient.builder()
                .maxConcurrency(300)
        )
        .region(Region.of(awsRegion))
        .credentialsProvider(
            StaticCredentialsProvider.create(
                AwsBasicCredentials.create(
                    awsAccessKey,
                    awsSecretKey
                )
            )
        )
        .build()
}

 

SnsAsyncClinet를 Bean으로 등록할 때 httpClientBuilder를 직접 설정해서 NettyNioAsyncHttpClient 관련 설정을 할 수 있습니다.

이 때 maxConcurrency (최대 연결 수)를 300으로 지정하여 최대 300개의 HTTP 요청을 가능하도록 하였습니다.

 

테스트 결과 2243ms가 소요되었습니다. 기존 maxConcurrency가 50이였을때에는 평균 4.7초 정도가 소요되었지만

maxConcurrency를 300으로 수정한 후에는 약 2배 이상 빠른 2.2초가 소요되었습니다.

 

이론 상 6배 가까이 시간이 줄어들어야 하지만 외부 API인만큼 외부 API 처리 속도 및 API가 허용하는 최대 커넥션 수를 고려하면

최대 2배 정도 단축할 수 있는 것 같습니다.

무엇을 써야할까

위 내용들을 종합해보면 처리 속도를 결정 했던 것은 Connection Pool 및 I/O 작업 소요 시간입니다.

 

SDK 내부를 확인해보면 블로킹 방식에서는 Apache Connection Pool을 사용하며, 논블로킹 방식에서는 Netty Connection Pool을 사용합니다. 블로킹 / 논블로킹 방식 모두 Connection Pool 설정에 따라 동시에 가능한 요청 수가 정해집니다.

따라서 블로킹 방식에서도 Apache Connection Pool을 늘린다면 유의미한 효과를 볼 수 있습니다.

 

하지만 코루틴의 Dispatcher.IO의 기본 Thread Pool 개수는 64이므로 블로킹 방식에서는 64개가 넘는 Connection Pool은

의미가 없습니다.

 

토픽 발행 코루틴에서 쓰레드가 블로킹 되지 않으면 좋은 영향을 받는 곳은 또 다른 코루틴 영역입니다.

왜냐하면 코루틴들을 처리하는 Thread는 Spring MVC와 관련된 Thread가 아닌 Coroutine Dispatcher 영역의 Thread입니다.

따라서 Coroutine Dispatcher 내부의 Thread가 블로킹 되더라도 Spring MVC 요청을 처리하는 Thread에는 큰 영향을 주지 않습니다.

즉 코루틴 내부에서의 쓰레드 블로킹은 또 다른 코루틴 영역에 큰 영향을 줍니다.

 

프로젝트에서 코루틴을 사용하고 있는 곳은 토픽 발행에서만 사용되고 있습니다.

하지만 애플리케이션에서 발행되는 이벤트 종류는 2개이기 때문에 코루틴 영역은 2개입니다.

결론적으로는 일단 논블로킹 방식을 사용하기로 결정했습니다. 최소한의 블로킹 작업을 가져가고 싶었기 때문입니다.

마무리

Transaction Outbox Pattern은 원래 별도의 배치 서버를 두고 처리하는 게 효율적인 것 같은데 별도의 추가 서버를 구성하는 게

프리티어 환경에서는 불가능 할 것 같아 애플리케이션에서 스케줄러를 통해 코루틴을 동작하게 하고 있습니다.

 

이벤트 발행을 스케줄러 + 코루틴으로 처리하는 게 맞는지 의문이 들긴해서 조만간 이벤트 재발행 배치 서버를 구성하는 것도
고려해봐야겠습니다.