일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- pandas
- Gradle
- Coroutines
- MyVoca
- Codeforces
- 프로그래머스
- 백준
- GitHub
- Python
- MiTweet
- 코루틴
- Kotlin
- livedata
- 암호학
- 코드포스
- 쿠링
- ProGuard
- Compose
- relay
- AWS
- Hilt
- Rxjava
- androidStudio
- TEST
- boj
- Coroutine
- android
- architecture
- activity
- textfield
- Today
- Total
이동식 저장소
[Kotlin] Coroutines - Channels 본문
목차
- Channel
- Channel 종료
- Producer-Consumer 패턴
- 파이프라인
- 분산처리
- 역(逆) 분산처리
- Buffered Channels
- Channel은 평등하다
- Ticker Channel
- Flow와 Channel의 차이
Deferred
를 사용하여 코루틴 간에 하나의 값을 전달할 수 있다. 여러 개의 값을 전달할 때는 channel을 사용할 수 있다.
Channel
Channel
은 개념상 BlockingQueue
와 매우 비슷하다. 차이점이 있다면 BlockingQueue
는 값을 전달하는 과정에서 스레드를 block하지만, Channel
은 스레드를 block하지 않고 suspending 한다.
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x) // 값을 보냄
}
repeat(5) { println(channel.receive()) } // 값을 받음
println("Done!")
send
를 이용하여 값을 보낼 수 있고, receive
를 이용하여 값을 받을 수 있다. 이 모든 과정이 non-blocking 으로 이루어진다.
1
4
9
16
25
Done!
Channel 종료
더이상 보낼 값이 더 없을 때 channel을 닫을 수 있다. 값을 수신하는 코드에서는 값이 몇 개나 올 지 모르기 때문에 위에서처럼 횟수를 정하는 대신 range-based for문을 이용하여 값을 받는 것이 좋다.
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 더 보낼 값이 없음, close
}
// range-based for
for (y in channel) println(y)
println("Done!")
1
4
9
16
25
Done!
Producer-Consumer 패턴
운영체제 등의 과목에서 Producer-Consumer
패턴을 들어본 일이 있을 것이다. produce
코루틴 빌더와 consumeEach
함수를 사용하여 이 패턴을 정확히 구현할 수 있다. 두 함수를 사용하면 Channel
객체를 명시적으로 만들지 않아도 된다.
fun CoroutineScope.produceSquares() = produce {
for (x in 1..5) send(x * x) // 값을 하나씩 produce
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) } // 값을 하나씩 consume
println("Done!")
}
1
4
9
16
25
Done!
파이프라인
파이프라인이란 유한 또는 무한 개의 값을 만드는 코루틴을 의미한다.
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 1부터 시작하여 모든 자연수를 produce
}
파이프라인이 만드는 값을 다른 코루틴에서 사용할 수도 있다. 예를 들어 이렇게.
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x) // 주어진 수의 제곱을 produce
}
이제 main 함수에서 두 개의 파이프라인을 연결해 보자.
val numbers = produceNumbers()
val squares = square(numbers)
repeat(5) {
println(squares.receive()) // 처음 5개만 출력
}
println("Done!")
coroutineContext.cancelChildren() // 파이프라인 중단
produceNumbers
와 square
은 무한히 실행되기 때문에, 두 코루틴을 명시적으로 종료할 필요가 있다. 물론 Sequence
나 Iterator
로도 위의 코드를 그대로 구현할 수 있으나, 내부 계산에 비동기적인 부분이 있는 경우 channel을 사용하는 것이 좋다. Sequence
와 Iterator
는 suspension을 허용하지 않기 때문이다.
분산처리
Channel에서 생성되는 값을 여러 개의 코루틴이 나누어 처리할 수도 있다. 다음의 코드는 자연수를 100ms당 하나씩 생산한다.
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) {
send(x++) // 생산
delay(100) // 100ms 기다린 후 다시 생산
}
}
produceNumber
가 생산하는 값을 여러 코루틴에 분산해 보자. 여기서는 일단 테스트를 위해 받은 값을 그대로 출력해 보았다.
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
이제 코루틴 5개를 만들어서 무슨 일이 일어나는지 살펴보자.
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 모든 코루틴을 중단하기 위해 channel을 중단
Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9
Processor #3 received 10
생산되는 값이 여러 코루틴에 분배됨을 알 수 있다. 위의 예시에서는 consumeEach
대신 for문을 사용했는데, 하나의 channel에서 생산하는 값을 여러 개의 코루틴에서 받을 때에는 for문을 사용하는 것이 안전하다. for문을 사용하면 하나의 코루틴이 중단되더라도 다른 코루틴은 계속 작동할 수 있기 때문이다.
역(逆) 분산처리
반대로 여러 개의 코루틴이 하나의 channel에 값을 보낼 수도 있다. 정해진 시간마다 채널에 문자열을 보내는 코루틴을 만들어 보자.
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
이제 코루틴을 여러 개 만들고, channel에 값이 어떻게 전달되는지 살펴보자.
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) {
println(channel.receive())
}
coroutineContext.cancelChildren() // 모든 자식 코루틴 취소
foo
foo
BAR!
foo
foo
BAR!
이처럼 여러 개의 코루틴에서 반환하는 결과를 하나의 채널로 모아서 받을 수 있다.
Buffered channels
지금까지 사용한 channel은 버퍼를 사용하지 않았다. 이 경우에는 Producer
(channel)와 Consumer
(main?)가 만날 때에만 값이 전달된다. 먼저 호출되는 함수에 따라 Producer
가 기다릴 수도 있고, Consumer
가 기다릴 수도 있다. 기다리는 시간이 길어질수록 성능은 나빠질 것이다.
버퍼를 사용하면 버퍼가 꽉 찰 때까지 값을 계속 보낼 수 있다. 따라서 적어도 Producer
가 기다리는 일은 줄어들 것이다. Channel()
또는 produce
에서 capacity
매개변수를 지정하면 된다.
val channel = Channel<Int>(4) // 크기가 4인 버퍼 생성
val sender = launch { // produce
repeat(10) {
println("Sending $it")
channel.send(it) // 버퍼가 꽉 차면 suspend됨
}
}
delay(1000) // 무슨 일이 일어나는지 보자.
sender.cancel()
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
0부터 3까지는 버퍼에 들어가고, 버퍼가 꽉 찼기 때문에 4는 send
되지 않고 suspend된다.
Channel은 평등하다
Channel은 FIFO(First-In, First-Out)으로 작동한다. 먼저 값을 요청한 코루틴이 먼저 값을 받게 되는 것이다. 두 개의 코루틴을 만들어서 실험해 보자.
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // channel, 두 개의 코루틴이 공유함
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // 공을 보냄
delay(1000) // 1초동안 무슨 일이 일어나는지 보자.
coroutineContext.cancelChildren() // 코드 종료
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive
ball.hits++
println("$name $ball")
delay(300) // 잠시 기다렸다가
table.send(ball) // 공을 상대편으로 보낸다.
}
}
먼저 공을 받는 쪽은 ping
이다. 하지만 pong
도 거의 즉시 실행되어 공을 기다리고 있으므로 ping
이 보낸 공은 pong
이 받게 된다. ping
이 아무리 빨리 줄을 서더라도 pong
앞으로 새치기할 수는 없다.
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
Ticker Channel
Ticker
channel은 정해진 시간마다 Unit
을 생산하는 특별한 channel이다. Ticker
를 사용하여 일정 시간마다 반복되는 작업을 쉽게 정의할 수 있다. ticker
함수를 이용하여 Ticker
를 만들 수 있으며, Ticker
를 종료하고 싶다면 cancel
함수를 사용하면 된다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
// 100ms마다 하나씩 생산
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0)
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement")
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// 뭔가 계산하는 척
println("Consumer pauses for 150ms")
delay(150)
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// 메인 스레드의 delay와 상관없이 계속 생산한다.
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
// ticker 종료
tickerChannel.cancel()
}
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
Flow와 Channel의 차이
글을 쓰면서 생각해 보니, Channel
은 이전에 살펴봤던 Flow
와 매우 유사하다는 생각이 들었다. 그렇다면 Flow
와 Channel
의 차이점은 무엇일까? 내가 생각한 바로는 다음과 같다.
Flow
는 직접 값을 반환하지만,Channel
은 값이 이동하는 통로이다. 값을 받는 입장에서는 비슷해 보일 수 있지만, 데이터를 직접 만드는지 단순히 전달할 뿐인지의 차이가 있다.Flow
는 flow 블럭 안에서만 값을 반환할 수 있지만,Channel
은 참조할 수만 있다면 어디에서든 값을 보낼 수 있다.Flow
는 누군가가 값을 기다리고 있는지와 상관없이 값을 반환할 수 있지만,Channel
은 receiver가 있어야만 값을 전달할 수 있다.
본질적으로 생산과 전달의 차이라고 본다.
'Primary > Kotlin' 카테고리의 다른 글
[Kotlin] Coroutines - Shared Mutable State and concurrency (0) | 2021.01.26 |
---|---|
[Kotlin] Coroutines - Exception Handling and Supervision (0) | 2021.01.25 |
[Kotlin] Coroutines - Asynchronous Flow (1) | 2021.01.22 |
[Kotlin] Coroutines - Context and Dispatchers (0) | 2021.01.21 |
[Kotlin] Coroutines - Suspending 함수 활용하기 (0) | 2021.01.20 |