동시성 제어 - 메시징 큐
이번에는 메시징 큐를 통해 동시성 문제를 해결하는 방법을 알아보자.
메시징 큐
메시징 큐(Message Queue)는 분산 시스템에서 다양한 컴퓨넌트 간에 비동기적으로 메시지를 전송하고 처리하기 위한 중간 매개체이다.
메시징 큐는 주로 다음과 같은 목적으로 사용된다.
비동기 통신: 메시지 큐를 사용하면 발신자(Producer)와 수신자(Consumer) 간의 통신이 비동기적으로 이루어진다. 메시지를 생성한 발신자는 메시지 전송만 하면 되기 때문이다.
시스템 간 결합: 메시징 큐를 사용하면 다른 시스템 간에 데이터를 교환할 수 있습니다. 시스템 간의 결합을 최소화하고 유연성을 높일 수 있습니다.
탄력성 및 확장성: 메시징 큐는 대량의 메시지를 안정적으로 처리하고, 시스템 부하에 따라 탄력적으로 확장할 수 있습니다. 이를 통해 시스템의 성능과 신뢰성을 향상시킬 수 있습니다.
주요한 메시징 큐 시스템으로는 Apache Kafka, RabbitMQ, Amazon SQS 등이 있다.
RabbitMQ로 동시성 문제 해결하기
RabbitMQ는 AMQP(Advanced Message Queuing Protocol)를 구현한 오픈소스 메시징 시스템이다.
동시성 문제는 여러 스레드가 동시에 공유 자원에 접근할 때 발생한다.
메시징 큐를 사용하면, 많은 공유 자원에 대한 접근 수정 요청을 순차적으로 처리할 수 있기 때문에 동시성 문제를 해결할 수 있다.
간단히 살펴보기 위해, 최대한 간단히 구현해보자.
먼저 docker를 통해 RabbitMQ를 실행한다. 나는 docker-compose를 사용했다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
version: '3'
services:
rabbitmq:
image: rabbitmq:management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: password
volumes:
rabbitmq_data:
이 후 http://localhost:15672/에서 직접 큐와 exchange를 만들어도 되지만 RabbitMQ 설정을 통해서도 가능하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
class RabbitConfig {
@Bean
fun queue(): Queue {
return Queue("testQueue",true)
}
@Bean
fun exchange(): TopicExchange {
return TopicExchange("testExchange")
}
@Bean
fun binding(queue: Queue, exchange: TopicExchange): Binding {
return BindingBuilder.bind(queue).to(exchange).with("routing.key")
}
}
이 후 메시지를 보내는 Producer와 메시지를 받는 Consumer를 구현하면 된다.
먼저 Producer는 다음과 같이 queue의 이름을 지정해주고 메시지를 보낸다.
1
2
3
4
5
6
7
@Service
class MessageSender(private val rabbitTemplate: RabbitTemplate) {
fun send(productId: String) {
rabbitTemplate.convertAndSend("testQueue", productId)
}
}
Consumer는 다음과 같이 @RabbitListener를 사용하여 메시지를 받는다. queue의 이름을 지정해주면 해당 queue에 있는 메시지를 받아온다.
이 때 concurrency를 1로 설정해 주면 한번에 하나의 메시지만 처리하게 된다.
1
2
3
4
5
6
7
8
@Component
class MessageReceiver(private val stockService: StockService) {
@RabbitListener(queues = ["testQueue"], concurrency = "1")
fun receive(productId: String) {
stockService.decreaseStock(productId.toLong(), 1)
}
}
실제로 동시성 문제가 해결되었는지 확인하기 위해 기존 테스트 코드를 messageSender를 사용하도록 변경해보자. 테스트가 잘 통과되는 것을 확인할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
fun 동시에_100개의_요청하기() {
val threadCount = 100
val executorService = Executors.newFixedThreadPool(threadCount)
val countDownLatch = CountDownLatch(threadCount)
for (i in 1..threadCount) {
executorService.submit {
try {
messageSender.send("1")
}finally {
countDownLatch.countDown()
}
}
}
countDownLatch.await()
Thread.sleep(3000)
val stock = stockRepository.findById(1).orElseThrow()
assertEquals(0L, stock.quantity)
}