이동식 저장소

[Kotlin] Coroutines - Shared Mutable State and concurrency 본문

Primary/Kotlin

[Kotlin] Coroutines - Shared Mutable State and concurrency

해스끼 2021. 1. 26. 11:24

목차

Dispatchers.Default 등의 멀티 스레드 dispatcher를 사용하면 여러 개의 코루틴을 동시에 실행할 수 있다. 이 과정에서 여러 동시성 문제가 발생할 수 있다. 가장 대표적인 문제로 shared mutable state가 있다. 운영체제 과목을 수강한 적이 있다면 잘 알고 있을 것이다. 요약하자면 수정 가능한 값에 여러 스레드가 동시에 접근하려고 할 때 어떻게 해야 하는지에 대한 문제이다. 값을 읽기만 한다면 모를까, 동시에 수정하려고 하면 매우 큰 문제가 발생할 수 있다.

코루틴에서 이 문제를 해결하는 방법을 알아보자.

문제 정의

위에서 설명한 문제를 코드로 작성해 보자. Synchronization(이하 동기화) 없이 동시에 변수를 수정할 때 어떤 일이 일어나는지 볼 것이다.

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 코루틴을 실행할 횟수
    val k = 1000 // 각 코루틴마다 action을 수행할 횟수
    val time = measureTimeMillis {
        coroutineScope { // 코루틴!
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

$n$개의 코루틴을 실행하여 각 코루틴마다 action을 $k$번 실행하는 시간을 측정하는 함수이다. 이제 shared mutable에 동기화 없이 접근해 보자.

var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
Completed 100000 actions in 46 ms
Counter = 94066

counter를 $100,000$번 실행했지만 결과는 $94,066$이다. 여러 스레드가 동기화 없이 마구잡이로 접근하니까 제대로 실행되지 않는 것이다.

Volatile?

변수를 volatile로 선언해도 소용 없다. volatile은 변수를 메인 메모리에 저장하겠다고 명시하는 것이다. 변수에 접근할 때 캐시가 아닌 항상 메인 메모리에서 읽어 오라는 뜻이다.

하지만 여러 개의 스레드가 여전히 변수를 동시에 읽어올 수 있으므로 문제를 해결할 수 없다.

@Volatile // 코틀린에서는 volatile을 annotation으로 선언한다.
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
Completed 100000 actions in 41 ms
Counter = 96258

여러 번 실행해 봐도 "Counter = 100000"가 나오지 않는다.

Thread-safe가 필요하다

일반적인 해결법 중 하나로 thread-safe한 자료구조를 사용하는 방법이 있다. Thread-safe란, 동시에 최대 하나의 스레드만 변수에 접근할 수 있도록 자체적으로 제어하는 변수를 의미한다. 따라서 thread-safe한 변수를 사용하면 여러 개의 스레드가 동시에 접근해도 변수를 동기화할 수 있다.

예를 들어 코틀린에는 Int의 thread-safe 타입 AtomicInteger가 존재한다. incrementAndGet 연산을 사용하면 AtomicInteger를 thread-safe하게 증가시킬 수 있다.

val counter = AtomicInteger()

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}
Completed 100000 actions in 41 ms
Counter = 100000

문제를 해결했다. AtomicInteger 외에도 AtomicReference<T>에 변수를 넣어 사용할 수 있다.

val counter = AtomicReference(0)

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  
    val k = 1000 
    val time = measureTimeMillis {
        coroutineScope { 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.getAndUpdate { it + 1 }
        }
    }
    println("Counter = $counter")
}

하지만 Atomic...으로는 접근 상태나 적용할 연산이 복잡한 경우를 처리할 수 없다. 코드를 근본적으로 thread-safe하게 만들어야 한다.

Thread confinement: fine-grained

Thread confinement하나의 스레드를 통해서만 변수에 접근할 수 있도록 하는 해결법을 말한다. 예를 들어 안드로이드에서는 메인 스레드에서만 UI를 갱신할 수 있다.

위의 코드에 적용하려면 단일 스레드 문맥에서만 변수에 접근하도록 하면 된다.

val counterContext = newSingleThreadContext("CounterContext") // 단일 스레드 문맥
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 이 부분만 문맥을 counterContext로 바꿔서 실행
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}
Completed 100000 actions in 2339 ms
Counter = 100000

문제는 해결했지만 실행 시간이 매우 느리다. 값을 증가시킬 때마다 Dispatchers.Default 문맥을 counterContext로 바꿔서 실행하기 때문이다. 문맥을 10만 번이나 바꾸기 때문에 느릴 수밖에.

Thread confinement: coarse-grained

사실상 Dispatchers.Default은 사용되지 않는 문맥이다. 애초에 실행 자체를 counterContext에서 하면 불필요한 문맥 교환을 막을 수 있을 것이다.

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    // 실행 자체를 단일 스레드 문맥에서 하면 된다.
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
Completed 100000 actions in 37 ms
Counter = 100000

실행 시간이 매우 빨라졌다.

Mutual exclusion

Mutual exclusion이란 공유 변수에 접근하는 코드를 critical section 으로 보호하여 critical section 이 동시에 실행되지 않도록 막는 방법이다. 일반적으로는 synchronizedReentrantLock을 사용하지만, 코루틴에서는 Mutex를 사용한다. lockunlock 함수를 사용하여 critical section 을 표현할 수 있다. Mutex.lock() 등은 모두 suspending 함수이기 때문에 스레드를 block하지 않는다.

Mutex.withLock 함수를 사용하면 자동으로 lock을 걸고 풀어준다.

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // critical section
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}
Completed 100000 actions in 1396 ms
Counter = 100000

실행 시간이 느린 이유는 코루틴을 fine-grained 방식으로 실행했기 때문이다. 하지만 특정 스레드에 종속되지 않은 변수를 주기적으로 갱신할 필요가 있는 경우 등, 특정한 상황에서는 사용해 볼 만 하다.

Actors

Actor는 코루틴의 묶음이다. 정확히는 1. private 변수와 2. 다른 코루틴과 의사소통할 수 있는 channel로 이루어진 객체이다. 변수가 여러 개 있다면 actor를 클래스로 정의하는 것이 좋지만, 지금은 변수가 하나뿐이므로 함수를 사용해서 작성할 수 있다.

actor 코루틴 빌더를 사용하면 메시지를 수신할 mailbox channel과 결과를 돌려줄 send channel을 간편하게 정의할 수 있다. 따라서 actor가 반환한 객체만을 가지고도 actor를 조작할 수 있다.

Actor를 사용하려면 먼저 actor가 처리할 메시지를 정의해야 한다. 코틀린의 sealed class를 사용하면 좋다. 여러 개의 메시지 타입을 sealed class를 통해 하나로 묶고, when 문을 이용하여 메시지를 타입에 따라 처리할 수 있다. 말로 하면 어렵지만 코드를 보면 이해할 수 있을 것이다.

sealed class CounterMsg // 모든 메시지 타입의 부모 클래스
object IncCounter : CounterMsg() // 변수를 1 증가시키라는 메시지
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 변수의 값을 돌려달라는 메시지

CompletableDeferred를 사용하면 값 하나를 언젠가 돌려주겠다는 약속을 표현할 수 있다. Actor도 코루틴이므로 본질적으로 비동기적이며, 값을 즉시 반환하지 못할 경우가 있을 수 있기 때문이다.

이제 actor를 정의하자.

// 위에서 정의한 메시지를 처리하는 actor 정의
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0        // 변수 (state)
    for (msg in channel) { // 들어오는 메시지를 처리한다.
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

이제 main 함수에서 actor를 실행해 보자.

fun main() = runBlocking<Unit> {
    val counter = counterActor()        // actor 생성
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }
    // actor로부터 값을 받는다.
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // actor 종료
}
Completed 100000 actions in 988 ms
Counter = 100000

Actor는 메시지를 순차적으로 처리하기 때문에 자연스럽게 변수에는 한 번에 하나의 작업만 수행된다. 또, actor는 오직 메시지를 통해서만 서로 의사소통할 수 있다.

이제 더 이상 스레드 문맥을 고려할 필요가 없다. 문맥이 바뀌지 않기 때문에 실행 시간도 더 빨라졌다.

참고 문헌

Shared Mutable State and concurrency - kotlinlang

Comments