[Kotlin] Coroutines - Shared Mutable State and concurrency
목차
- 문제 정의
- Volatile?
- Thread-safe가 필요하다
- Thread confinement: fine-grained
- Thread confinement: coarse-grained
- Mutual exclusion
- Actors
- 참고 문헌
Dispatchers.Default
등의 멀티 스레드 dispatcher를 사용하면 여러 개의 코루틴을 동시에 실행할 수 있다. 이 과정에서 여러 동시성 문제가 발생할 수 있다. 가장 대표적인 문제로 shared mutable state가 있다. 운영체제 과목을 수강한 적이 있다면 잘 알고 있을 것이다. 요약하자면 수정 가능한 값에 여러 스레드가 동시에 접근하려고 할 때 어떻게 해야 하는지에 대한 문제이다. 값을 읽기만 한다면 모를까, 동시에 수정하려고 하면 매우 큰 문제가 발생할 수 있다.
코루틴에서 이 문제를 해결하는 방법을 알아보자.
문제 정의
위에서 설명한 문제를 코드로 작성해 보자. Synchronization(이하 동기화) 없이 동시에 변수를 수정할 때 어떤 일이 일어나는지 볼 것이다.
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 코루틴을 실행할 횟수
val k = 1000 // 각 코루틴마다 action을 수행할 횟수
val time = measureTimeMillis {
coroutineScope { // 코루틴!
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
$n$개의 코루틴을 실행하여 각 코루틴마다 action
을 $k$번 실행하는 시간을 측정하는 함수이다. 이제 shared mutable에 동기화 없이 접근해 보자.
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
Completed 100000 actions in 46 ms
Counter = 94066
counter
를 $100,000$번 실행했지만 결과는 $94,066$이다. 여러 스레드가 동기화 없이 마구잡이로 접근하니까 제대로 실행되지 않는 것이다.
Volatile?
변수를 volatile
로 선언해도 소용 없다. volatile
은 변수를 메인 메모리에 저장하겠다고 명시하는 것이다. 변수에 접근할 때 캐시가 아닌 항상 메인 메모리에서 읽어 오라는 뜻이다.
하지만 여러 개의 스레드가 여전히 변수를 동시에 읽어올 수 있으므로 문제를 해결할 수 없다.
@Volatile // 코틀린에서는 volatile을 annotation으로 선언한다.
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
Completed 100000 actions in 41 ms
Counter = 96258
여러 번 실행해 봐도 "Counter = 100000"가 나오지 않는다.
Thread-safe가 필요하다
일반적인 해결법 중 하나로 thread-safe한 자료구조를 사용하는 방법이 있다. Thread-safe란, 동시에 최대 하나의 스레드만 변수에 접근할 수 있도록 자체적으로 제어하는 변수를 의미한다. 따라서 thread-safe한 변수를 사용하면 여러 개의 스레드가 동시에 접근해도 변수를 동기화할 수 있다.
예를 들어 코틀린에는 Int
의 thread-safe 타입 AtomicInteger
가 존재한다. incrementAndGet
연산을 사용하면 AtomicInteger
를 thread-safe하게 증가시킬 수 있다.
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
Completed 100000 actions in 41 ms
Counter = 100000
문제를 해결했다. AtomicInteger
외에도 AtomicReference<T>
에 변수를 넣어 사용할 수 있다.
val counter = AtomicReference(0)
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val time = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.getAndUpdate { it + 1 }
}
}
println("Counter = $counter")
}
하지만 Atomic...
으로는 접근 상태나 적용할 연산이 복잡한 경우를 처리할 수 없다. 코드를 근본적으로 thread-safe하게 만들어야 한다.
Thread confinement: fine-grained
Thread confinement 는 하나의 스레드를 통해서만 변수에 접근할 수 있도록 하는 해결법을 말한다. 예를 들어 안드로이드에서는 메인 스레드에서만 UI를 갱신할 수 있다.
위의 코드에 적용하려면 단일 스레드 문맥에서만 변수에 접근하도록 하면 된다.
val counterContext = newSingleThreadContext("CounterContext") // 단일 스레드 문맥
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 이 부분만 문맥을 counterContext로 바꿔서 실행
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
Completed 100000 actions in 2339 ms
Counter = 100000
문제는 해결했지만 실행 시간이 매우 느리다. 값을 증가시킬 때마다 Dispatchers.Default
문맥을 counterContext
로 바꿔서 실행하기 때문이다. 문맥을 10만 번이나 바꾸기 때문에 느릴 수밖에.
Thread confinement: coarse-grained
사실상 Dispatchers.Default
은 사용되지 않는 문맥이다. 애초에 실행 자체를 counterContext
에서 하면 불필요한 문맥 교환을 막을 수 있을 것이다.
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 실행 자체를 단일 스레드 문맥에서 하면 된다.
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
Completed 100000 actions in 37 ms
Counter = 100000
실행 시간이 매우 빨라졌다.
Mutual exclusion
Mutual exclusion이란 공유 변수에 접근하는 코드를 critical section 으로 보호하여 critical section 이 동시에 실행되지 않도록 막는 방법이다. 일반적으로는 synchronized
나 ReentrantLock
을 사용하지만, 코루틴에서는 Mutex
를 사용한다. lock
과 unlock
함수를 사용하여 critical section 을 표현할 수 있다. Mutex.lock()
등은 모두 suspending 함수이기 때문에 스레드를 block하지 않는다.
Mutex.withLock
함수를 사용하면 자동으로 lock을 걸고 풀어준다.
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// critical section
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
Completed 100000 actions in 1396 ms
Counter = 100000
실행 시간이 느린 이유는 코루틴을 fine-grained 방식으로 실행했기 때문이다. 하지만 특정 스레드에 종속되지 않은 변수를 주기적으로 갱신할 필요가 있는 경우 등, 특정한 상황에서는 사용해 볼 만 하다.
Actors
Actor는 코루틴의 묶음이다. 정확히는 1. private 변수와 2. 다른 코루틴과 의사소통할 수 있는 channel로 이루어진 객체이다. 변수가 여러 개 있다면 actor를 클래스로 정의하는 것이 좋지만, 지금은 변수가 하나뿐이므로 함수를 사용해서 작성할 수 있다.
actor
코루틴 빌더를 사용하면 메시지를 수신할 mailbox channel과 결과를 돌려줄 send channel을 간편하게 정의할 수 있다. 따라서 actor
가 반환한 객체만을 가지고도 actor를 조작할 수 있다.
Actor를 사용하려면 먼저 actor가 처리할 메시지를 정의해야 한다. 코틀린의 sealed class
를 사용하면 좋다. 여러 개의 메시지 타입을 sealed class
를 통해 하나로 묶고, when
문을 이용하여 메시지를 타입에 따라 처리할 수 있다. 말로 하면 어렵지만 코드를 보면 이해할 수 있을 것이다.
sealed class CounterMsg // 모든 메시지 타입의 부모 클래스
object IncCounter : CounterMsg() // 변수를 1 증가시키라는 메시지
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 변수의 값을 돌려달라는 메시지
CompletableDeferred
를 사용하면 값 하나를 언젠가 돌려주겠다는 약속을 표현할 수 있다. Actor도 코루틴이므로 본질적으로 비동기적이며, 값을 즉시 반환하지 못할 경우가 있을 수 있기 때문이다.
이제 actor를 정의하자.
// 위에서 정의한 메시지를 처리하는 actor 정의
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // 변수 (state)
for (msg in channel) { // 들어오는 메시지를 처리한다.
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
이제 main 함수에서 actor를 실행해 보자.
fun main() = runBlocking<Unit> {
val counter = counterActor() // actor 생성
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// actor로부터 값을 받는다.
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // actor 종료
}
Completed 100000 actions in 988 ms
Counter = 100000
Actor는 메시지를 순차적으로 처리하기 때문에 자연스럽게 변수에는 한 번에 하나의 작업만 수행된다. 또, actor는 오직 메시지를 통해서만 서로 의사소통할 수 있다.
이제 더 이상 스레드 문맥을 고려할 필요가 없다. 문맥이 바뀌지 않기 때문에 실행 시간도 더 빨라졌다.