동시성 테스트 도구들
Executor Service
- ExecutorService는 동시성 테스트를 위해 작업을 비동기로 수행해야할 때 주로 사용되는 인터페이스이다.
- 이미 풀링된 여러 스레드 중 하나를 사용해 작업을 수행한다.
스레드 풀
스레드 풀은 미리 생성된 스레드의 집합으로 사용자가 쉽게 여러 스레드를 관리할 수 있다. DB ConnectionPool과 마찬가지로 많은 수의 비동기 작업을 실행할 때 호출 오버헤드가 감소하여 성능이 향상되고, 실행되는 스레드 리소스를 관리할 수 있다.
스레드 풀 자체를 설정할 수 있는 ThreadPoolExecutor 클래스를 사용할 수도 있지만, 더 편한 Executors Factory Method가 제공하는 다음 3가지 유형을 사용하는 것이 권장된다.
- FixedThreadPool
- 고정된 수의 스레드를 가지고 있는 풀이다.
- 모든 스레드가 활성 상태에서 추가 작업이 제출되면 스레드를 사용할 수 있을 때까지 대기한다.
- CachedThreadPool
- FixedThreadPool과 다르게 필요에 따라 새 스레드를 생성한다.
- 스레드가 사용되지 않으면 자동으로 제거되기 때문에 작업이 짧으면서 많은 수의 스레드를 사용해야할 때 유용하다.
- ScheduledThreadPool
- 작업을 딜레이를 주거나 주기적으로 실행할 수 있도록 스케줄링 할 수 있다.
ThreadPoolExecutor
ThreadPoolExecutor는 ExecutorService 인터페이스를 실제 구현한 클래스로 스레드 풀을 생성하고 관리하는 클래스이다.
다음은 Executors 클래스를 사용하여 ThreadPoolExecutor를 생성하는 예제이다.
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 고정 스레드풀은 아래 생성자 호출을 통해 알 수 있듯이 풀에 돌아가는 기본 스레드 수와 최대 스레드 수가 같은 것을 알 수 있다.
fun newFixedThreadPool(nThreads: Int): ExecutorService {
return ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
LinkedBlockingQueue())
}
// CachedThreadPool은 고정과 다르게 고정된 최대 스레드 수도 없고 60초 동안의 유휴 시간이 지나면 스레드를 제거한다.
fun newCachedThreadPool(): ExecutorService {
return ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
SynchronousQueue())
}
// ScheduledThreadPool도 고정된 스레드 수를 가지고 있지만, 스레드를 사용하지 않을 때 스레드를 제거하지 않는다.
fun newScheduledThreadPool(corePoolSize: Int): ScheduledExecutorService {
return ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE,
0, TimeUnit.NANOSECONDS,
DelayedWorkQueue())
}
기본적인 사용법
- newFixedThreadPool() 메서드를 사용하여 고정된 스레드 수를 가지는 스레드 풀을 생성하고, submit() 메서드를 사용하여 스레드에 작업을 제출한다.
1
2
3
4
5
6
7
8
9
10
val threadCount = 100
val executorService = Executors.newFixedThreadPool(threadCount) // 고정된 스레드 수를 지정한다.
for (i in 1..threadCount) {
executorService.submit { // 작업을 제출한다.
stockService.decreaseStock(1, 1L)
}
}
ScheduledThreadPool을 사용하여 3초 후에 작업을 실행하는 예제
1
2
3
4
5
6
7
8
9
10
val executor = Executors.newScheduledThreadPool(1)
// 3초 후에 작업을 실행
executor.schedule({
println("Hello from thread " + Thread.currentThread().name)
}, 3, TimeUnit.SECONDS)
// 스케줄러를 종료
executor.shutdown()
BlockingQueue
위의 생성자에서 봤듯이 ThreadPoolExecutor는 BlockingQueue를 인자로 받는다. 위의 3가지 유형의 스레드 풀은 각기 다른 BlockingQueue를 사용하는 것을 볼 수 있다.
BlockingQueue란
BlockingQueue는 Java의 java.util.concurrent 패키지에 있는 인터페이스로, 스레드 안전한 큐를 제공한다. BlockingQueue가 기존 큐와 다른 특징은 다음과 같다.
- 원소 추가 : 만약 큐가 가득찬 경우라면 빈 공간이 생길 때까지 스레드를 블록한다.
- 원소 제거 : 만약 큐가 비어있는 경우라면 요소가 추가될 때까지 스레드를 블록한다.
이렇게 BlockingQueue는 큐가 가득차거나 비어있을 때 스레드를 블록시켜 멀티스레드 환경에서 안전하게 요소를 추가하거나 제거할 수 있다.
BlockingQueue 구현체
- LinkedBlockingQueue : BlockingQueue의 구현체 중 하나로, 연결 리스트로 구현되어 있다. 큐의 크기를 지정하지 않으면 Integer.MAX_VALUE로 지정된다.
- SynchronousQueue: 큐의 크기가 0인 큐로, 요소가 추가되면 그 즉시 요소를 소비할 스레드가 나타날 때까지 스레드가 블록된다.
- CachedThreadPool에서는 작업이 할당되면 그 즉시 새로운 스레드를 생성하던가 기존 스레드를 사용해야하기 때문에 SynchronousQueue를 사용한다.
- 만약 SynchronousQueue를 사용하지 않는다면, 새 작업 할당 시 큐에 빈 공간이 생길 때까지 스레드가 블록되어 스레드가 생성되지 않을 수 있다.
- DelayedWorkQueue : Delayed 인터페이스를 추가 구현한 큐이다.
BLockingQueue 테스트
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// BlockingQueue는 공간이 가득찬 상태에서 put을 호출하면 스레드가 블록되고, 공간이 비어있는 상태에서 take를 호출하면 스레드가 블록된다.
val queueSize = 5
val blockingQueue: BlockingQueue<Int> = LinkedBlockingQueue(queueSize)
val producer = Thread {
for (i in 1..10) {
try {
blockingQueue.put(i)
println("Produced: $i")
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}
val consumer = Thread {
try {
while (true) {
Thread.sleep(2000)
val value = blockingQueue.take()
println("Consumed: $value")
}
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
producer.start()
consumer.start()
// Wait for both threads to finish
producer.join()
consumer.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
결과:
Produced: 1
Produced: 2
Produced: 3
Produced: 4
Produced: 5
Produced: 6
Consumed: 1
Consumed: 2
Produced: 7
Consumed: 3
Produced: 8
Consumed: 4
Produced: 9
Consumed: 5
Produced: 10
Consumed: 6
Consumed: 7
Consumed: 8
Consumed: 9
Consumed: 10
5까지는 바로바로 큐에 삽입 되지만 큐가 꽉 찼을 때는 블록 된다.
이후 take()메서드가 호출 되어 큐가 빈 공간이 생기면 바로 대기중인 6을 추가하는 스레드가 실행되어 추가 되는 것을 볼 수 있다.