Primary/Kotlin

[Kotlin] Coroutines - Channels

해스끼 2021. 1. 23. 12:08

목차

Deferred를 사용하여 코루틴 간에 하나의 값을 전달할 수 있다. 여러 개의 값을 전달할 때는 channel을 사용할 수 있다.

Channel

Channel은 개념상 BlockingQueue와 매우 비슷하다. 차이점이 있다면 BlockingQueue는 값을 전달하는 과정에서 스레드를 block하지만, Channel은 스레드를 block하지 않고 suspending 한다.

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)  // 값을 보냄
}
repeat(5) { println(channel.receive()) } // 값을 받음
println("Done!")

send를 이용하여 값을 보낼 수 있고, receive를 이용하여 값을 받을 수 있다. 이 모든 과정이 non-blocking 으로 이루어진다.

1
4
9
16
25
Done!

Channel 종료

더이상 보낼 값이 더 없을 때 channel을 닫을 수 있다. 값을 수신하는 코드에서는 값이 몇 개나 올 지 모르기 때문에 위에서처럼 횟수를 정하는 대신 range-based for문을 이용하여 값을 받는 것이 좋다.

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // 더 보낼 값이 없음, close
}
// range-based for
for (y in channel) println(y)
println("Done!")
1
4
9
16
25
Done!

Producer-Consumer 패턴

운영체제 등의 과목에서 Producer-Consumer 패턴을 들어본 일이 있을 것이다. produce 코루틴 빌더와 consumeEach 함수를 사용하여 이 패턴을 정확히 구현할 수 있다. 두 함수를 사용하면 Channel 객체를 명시적으로 만들지 않아도 된다.

fun CoroutineScope.produceSquares() = produce {
    for (x in 1..5) send(x * x)           // 값을 하나씩 produce
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }   // 값을 하나씩 consume
    println("Done!")
}
1
4
9
16
25
Done!

파이프라인

파이프라인이란 유한 또는 무한 개의 값을 만드는 코루틴을 의미한다.

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // 1부터 시작하여 모든 자연수를 produce
}

파이프라인이 만드는 값을 다른 코루틴에서 사용할 수도 있다. 예를 들어 이렇게.

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x) // 주어진 수의 제곱을 produce
}

이제 main 함수에서 두 개의 파이프라인을 연결해 보자.

val numbers = produceNumbers()   
val squares = square(numbers)    
repeat(5) {
    println(squares.receive())    // 처음 5개만 출력
}
println("Done!") 
coroutineContext.cancelChildren() // 파이프라인 중단

produceNumberssquare은 무한히 실행되기 때문에, 두 코루틴을 명시적으로 종료할 필요가 있다. 물론 SequenceIterator로도 위의 코드를 그대로 구현할 수 있으나, 내부 계산에 비동기적인 부분이 있는 경우 channel을 사용하는 것이 좋다. SequenceIterator는 suspension을 허용하지 않기 때문이다.

분산처리

Channel에서 생성되는 값을 여러 개의 코루틴이 나누어 처리할 수도 있다. 다음의 코드는 자연수를 100ms당 하나씩 생산한다.

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 
    while (true) {
        send(x++)  // 생산
        delay(100) // 100ms 기다린 후 다시 생산
    }
}

produceNumber가 생산하는 값을 여러 코루틴에 분산해 보자. 여기서는 일단 테스트를 위해 받은 값을 그대로 출력해 보았다.

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}

이제 코루틴 5개를 만들어서 무슨 일이 일어나는지 살펴보자.

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 모든 코루틴을 중단하기 위해 channel을 중단
Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9
Processor #3 received 10

생산되는 값이 여러 코루틴에 분배됨을 알 수 있다. 위의 예시에서는 consumeEach 대신 for문을 사용했는데, 하나의 channel에서 생산하는 값을 여러 개의 코루틴에서 받을 때에는 for문을 사용하는 것이 안전하다. for문을 사용하면 하나의 코루틴이 중단되더라도 다른 코루틴은 계속 작동할 수 있기 때문이다.

역(逆) 분산처리

반대로 여러 개의 코루틴이 하나의 channel에 값을 보낼 수도 있다. 정해진 시간마다 채널에 문자열을 보내는 코루틴을 만들어 보자.

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

이제 코루틴을 여러 개 만들고, channel에 값이 어떻게 전달되는지 살펴보자.

val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { 
    println(channel.receive())
}
coroutineContext.cancelChildren() // 모든 자식 코루틴 취소
foo
foo
BAR!
foo
foo
BAR!

이처럼 여러 개의 코루틴에서 반환하는 결과를 하나의 채널로 모아서 받을 수 있다.

Buffered channels

지금까지 사용한 channel은 버퍼를 사용하지 않았다. 이 경우에는 Producer(channel)와 Consumer(main?)가 만날 때에만 값이 전달된다. 먼저 호출되는 함수에 따라 Producer가 기다릴 수도 있고, Consumer가 기다릴 수도 있다. 기다리는 시간이 길어질수록 성능은 나빠질 것이다.

버퍼를 사용하면 버퍼가 꽉 찰 때까지 값을 계속 보낼 수 있다. 따라서 적어도 Producer가 기다리는 일은 줄어들 것이다. Channel() 또는 produce에서 capacity 매개변수를 지정하면 된다.

val channel = Channel<Int>(4)  // 크기가 4인 버퍼 생성
val sender = launch {          // produce
    repeat(10) {
        println("Sending $it") 
        channel.send(it)       // 버퍼가 꽉 차면 suspend됨
    }
}
delay(1000)                    // 무슨 일이 일어나는지 보자.
sender.cancel()
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

0부터 3까지는 버퍼에 들어가고, 버퍼가 꽉 찼기 때문에 4는 send되지 않고 suspend된다.

Channel은 평등하다

Channel은 FIFO(First-In, First-Out)으로 작동한다. 먼저 값을 요청한 코루틴이 먼저 값을 받게 되는 것이다. 두 개의 코루틴을 만들어서 실험해 보자.

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>()        // channel, 두 개의 코루틴이 공유함
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0))                // 공을 보냄
    delay(1000)                        // 1초동안 무슨 일이 일어나는지 보자.
    coroutineContext.cancelChildren()  // 코드 종료
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) {       // receive
        ball.hits++
        println("$name $ball")
        delay(300)              // 잠시 기다렸다가
        table.send(ball)        // 공을 상대편으로 보낸다.
    }
}

먼저 공을 받는 쪽은 ping이다. 하지만 pong도 거의 즉시 실행되어 공을 기다리고 있으므로 ping이 보낸 공은 pong이 받게 된다. ping이 아무리 빨리 줄을 서더라도 pong 앞으로 새치기할 수는 없다.

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

Ticker Channel

Ticker channel은 정해진 시간마다 Unit을 생산하는 특별한 channel이다. Ticker를 사용하여 일정 시간마다 반복되는 작업을 쉽게 정의할 수 있다. ticker 함수를 이용하여 Ticker를 만들 수 있으며, Ticker를 종료하고 싶다면 cancel 함수를 사용하면 된다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    // 100ms마다 하나씩 생산
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) 
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement")

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // 뭔가 계산하는 척
    println("Consumer pauses for 150ms")
    delay(150)

    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")

    // 메인 스레드의 delay와 상관없이 계속 생산한다.
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    // ticker 종료
    tickerChannel.cancel()
}
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

Flow와 Channel의 차이

글을 쓰면서 생각해 보니, Channel은 이전에 살펴봤던 Flow와 매우 유사하다는 생각이 들었다. 그렇다면 FlowChannel의 차이점은 무엇일까? 내가 생각한 바로는 다음과 같다.

  • Flow는 직접 값을 반환하지만, Channel은 값이 이동하는 통로이다. 값을 받는 입장에서는 비슷해 보일 수 있지만, 데이터를 직접 만드는지 단순히 전달할 뿐인지의 차이가 있다.
  • Flow는 flow 블럭 안에서만 값을 반환할 수 있지만, Channel은 참조할 수만 있다면 어디에서든 값을 보낼 수 있다.
  • Flow는 누군가가 값을 기다리고 있는지와 상관없이 값을 반환할 수 있지만, Channel은 receiver가 있어야만 값을 전달할 수 있다.

본질적으로 생산전달의 차이라고 본다.