일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- MiTweet
- Rxjava
- Gradle
- TEST
- 프로그래머스
- textfield
- Codeforces
- 코루틴
- boj
- Python
- architecture
- pandas
- relay
- Hilt
- Coroutines
- Kotlin
- Compose
- AWS
- android
- 쿠링
- livedata
- MyVoca
- GitHub
- androidStudio
- 코드포스
- ProGuard
- activity
- 백준
- Coroutine
- 암호학
- Today
- Total
이동식 저장소
[Kotlin] Coroutines - Asynchronous Flow 본문
suspending 함수를 이용하여 비동기적으로 값을 계산할 수 있다. 그런데 비동기적으로 계산되는 값이 여러 개라면 어떻게 할까? 예를 들어 1번부터 n번째까지의 피보나치 수를 1초에 하나씩 계산해서 반환하는 함수가 있다고 하자. 물론 모든 결과가 계산될 때까지 기다리는 방법도 있지만, 결과값이 반환될 때마다 하나씩 출력하는 것이 성능 측면에서 더 좋을 수 있다.
코틀린의 Flow
를 이용하여 비동기적으로 계산된 값을 하나씩 받을 수 있다.
컬렉션 복습
collections을 이용하여 여러 개의 값을 표현할 수 있다. 다음의 코드에서 simple
함수는 세 개의 정수로 이루어진 List
를 반환한다.
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
1
2
3
Sequence 복습
계산 과정에서 CPU가 많이 필요하다면 Sequence
를 사용할 수 있다. 여기서는 CPU를 사용하는 작업을 흉내내기 위해 100ms동안 sleep했다.
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // 계산하는 척
yield(i) // 값을 하나씩 반환
}
}
fun main() {
simple().forEach { value -> println(value) }
}
1
2
3
suspending
하지만 Thread.sleep
은 메인 스레드를 block한다. 스레드를 block하지 않고 계산하기 위해 delay
함수를 사용해 보자. 함수 내부에서 suspending 함수를 호출하므로 simple
역시 suspending 함수가 되어야 한다.
suspend fun simple(): List<Int> {
delay(1000) // 뭔가 계산하고 있는 척
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
1
2
3
1초 후에 숫자가 출력된다.
Flow
그런데 위의 코드를 보면 결과값을 List
로 반환하고 있기 때문에, 모든 값을 계산하기 전까지는 결과값을 반환할 수 없다. 위에서 Sequence
를 이용하여 동기적으로 계산되는 값을 하나씩 반환했다. 비동기적으로 계산되는 값은 Flow
를 사용하여 반환할 수 있다.
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // 계산하는 척
emit(i) // 값 반환
}
}
fun main() = runBlocking<Unit> {
// main 스레드가 block되었는지 확인하는 용도
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
simple().collect { value -> println(value) }
}
Flow
를 통해 값이 하나씩 반환되고 있다. 한편 main
스레드는 block되지 않았기 때문에 다른 작업(코루틴 등)을 수행할 수 있다.
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
위 코드의 내용을 정리하면 다음과 같다.
- 함수
flow
를 이용하여Flow
를 만들 수 있다.List
를 만들 때listOf
를 사용하듯이. flow {...}
블럭 안의 코드는 suspend될 수 있다. 따라서 블럭 안에서 비동기 작업을 수행할 수 있다.simple
함수 앞에suspend
키워드가 붙지 않았다.Flow
는emit
함수를 이용하여 값을 반환한다.Flow
의 값은collect
함수를 이용하여 얻을 수 있다.
Flow는 게으르다
flow
블럭 안의 코드는 flow가 collect되기 전에는 실행되지 않는다.
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
simple
함수가 suspend
가 아닌 이유이기도 하다. simple()
은 호출된 즉시 Flow
객체를 반환하지만, 내부의 코드를 실행하지는 않는다. collect
함수가 호출되기 전까지는 블럭 안의 코드가 실행되지 않는다.
Flow는 collect
함수가 호출될 때마다 새로 시작된다. Flow started
라는 문자열이 두 번 출력된 것을 보면 알 수 있다. 또 main
스레드는 collect
가 끝날 때까지 non-blocking 상태로 기다린다. 값을 비동기적으로 출력하고 싶다면 다음과 같이 별도의 코루틴을 만들어야 한다.
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
launch {
println("Calling collect...")
flow.collect { value -> println(value) }
}
launch {
println("Calling collect again...")
flow.collect { value -> println(value) }
}
}
Calling simple function...
Calling collect...
Flow started
Calling collect again...
Flow started
1
1
2
2
3
3
Flow 취소
Flow도 일반적인 코루틴 취소의 원칙을 따른다. Flow는 취소 가능한 suspending 함수로 인해 중단되었을 때 취소될 수 있다. 예를 들어 flow에 취소 명령이 내려졌다면, flow 내부에서 delay
등이 실행될 때 자신이 취소 대상임을 인지하고 취소된다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 250ms 후 종료: 3은 emit되지 않는다.
simple().collect { value -> println(value) }
}
println("Done")
}
Emitting 1
1
Emitting 2
2
Done
Flow의 취소에 대해 글 후반부에서 다시 살펴볼 것이다.
Flow를 만드는 다른 방법
Flow는 flow {...}
함수를 이용해서 만들 수도 있고, 다른 함수를 통해서도 만들 수 있다.
flowOf
함수를 이용하여 반환할 값을 정할 수 있다.- 대부분의 Collections과 sequence에서
.asFlow()
확장 함수를 사용하여 flow를 만들 수 있다.
// Range를 Flow로 변환
(1..3).asFlow().collect { value -> println(value) }
1
2
3
Flow 연산
Collections에 map
과 filter
등의 연산을 적용하듯이 flow에도 연산을 적용할 수 있다. Flow에 연산을 적용하면 flow가 반환되며, 반환된 flow 역시 collect
되기 전까지는 실행되지 않는다. 연산 블럭 안에서 suspending 함수를 호출할 수 있다는 점을 기억하자.
suspend fun performRequest(request: Int): String {
delay(1000) // 뭔가 열심히 계산하는 척
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // flow
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
위 코드를 실행하면 1초에 하나씩 문자열이 출력된다. map
블럭 안에서 suspend
함수를 실행하고 있다는 점에 주목하자.
response 1
response 2
response 3
Transform
가장 많이 사용되는 Flow 연산 중 하나로 transform
이 있다. transform
을 사용하면 map
등의 간단한 연산을 직접 할 수도 있고, 더 복잡한 연산을 수행할 수도 있다. transform
연산을 사용하면 임의의 값을 임의의 횟수만큼 emit
할 수 있다.
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
performRequest
함수를 실행하기 전에 문자열을 하나 반환하고, 함수의 결과값을 반환한다. 이처럼 transform
을 이용하여 실행 한 번당 여러 개의 값을 반환할 수 있다.
처음 두 개만 받고 싶어요!
take
함수를 사용하여 가져올 값의 개수를 정할 수 있다. 주의할 점은 정해진 개수만큼 가져온 후 flow
가 취소된다는 점이다. 코루틴이 취소될 때는 항상 exception이 발생하므로, 일반적인 리소스 해제 코드(try-finally
등)를 그대로 사용할 수 있다.
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // 처음 두 개의 값만 collect
.collect { value -> println(value) }
}
1
2
Finally in numbers
종료 연산
종료 연산이란 flow의 값을 가져오는 suspending 함수를 의미한다.
collect
함수도 종료 연산이다.- Collection으로의 변환(
toList
,toSet
등)도 종료 연산이다. - Flow의 첫 번째 값을 받는
first
연산과 Flow가 반환할 단 하나의 값을 받는single
연산도 종료 연산이다. 두 연산의 차이점은 링크를 참고하길 바란다. reduce
와fold
도 종료 연산이다. 까먹었다면 링크 참고.
val sum = (1..5).asFlow()
.map { it * it } // (Intermediate Operator, 중간 연산)
.reduce { a, b -> a + b } // 합을 구함 (Terminal Operator, 종료 연산)
println(sum)
55
순차적으로 실행된다
Flow는 특별히 여러 개의 flow를 연산하지 않는 이상 순차적으로 연산된다. Sequence와 같은 방법으로 연산된다. 각 원소마다 연산을 수행하는데, 중간에 연산이 종료될 경우(조건을 만족하지 못하는 경우 등) 바로 다음 원소로 넘어간다. 아래의 예시를 보면 이해될 것이다.
값을 가져오는 과정은 별도의 코루틴을 실행하지 않고, 종료 연산을 호출한 코루틴에서 바로 시작된다.
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
filter
의 조건에 맞지 않는 1
, 3
등은 map
으로 넘어가지 않음을 알 수 있다.
Flow 문맥
Flow의 값을 가져오는 과정은 항상 종료 연산을 실행한 코루틴의 문맥에서 진행된다. 예를 들어 다음의 코드는 context
문맥에서 실행된다.
withContext(context) {
simple().collect { value ->
println(value) // 문맥이 context로 지정됨
}
}
이것을 문맥 보존 이라고 한다. 기본적으로 flow {...}
내부의 코드는 종료 연산을 실행한 문맥에서 진행된다. 정말 그런지 살펴보자.
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
Flow가 시작된 코루틴과 값을 collect
하는 코루틴이 같음을 알 수 있다. Caller를 block하지 않고, 코드가 실행되는 문맥이 중요하지 않은 경우 이렇게 사용하면 좋다. 사실 이게 기본값이긴 하다.
문맥을 잘못 사용한 경우
하지만 flow 내부에서 오래 걸리는 코드를 실행하는 경우에는 Dispatchers.Default
를 사용해야 한다. 마찬가지로 UI를 업데이트하는 코드는 Dispatchers.Main
을 사용해야 한다. withContext
를 사용하여 코루틴의 문맥을 바꿀 수 있지만, flow {...}
의 경우 자신이 실행된 문맥과 같은 문맥에만 값을 반환할 수 있기 때문에 문제가 된다.
어디 한번 withContext
로 문맥을 바꿔 보자. 미리 말하는데, 이 코드는 틀린 코드다.
fun simple(): Flow<Int> = flow {
// 잘못된 코드
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // 뭔가 계산하는 척
emit(i) // 값 반환
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
방금 이야기한 내용이 에러 메시지에 그대로 나와 있다. 종료 연산을 실행한 문맥(BlockingCoroutine)과 값이 반환된 문맥(DispatchedCoroutine)이 서로 다르기 때문에 에러가 발생했다. 메시지를 더 자세히 읽어보면 flowOn
을 사용하라고 하고 있다.
flowOn
Flow가 값을 반환하는 문맥을 바꾸려면 다음과 같이 해야 한다. 실제로 문맥이 바뀌었는지 확인하기 위해 로그를 찍어 보았다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 뭔가 계산하는 척
log("Emitting $i")
emit(i) // 값 반환
}
}.flowOn(Dispatchers.Default) // 올바른 방법
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
Flow가 별도의 스레드에서 값을 반환했지만, main
스레드에서 값을 정상적으로 가져왔다. 이처럼 flowOn
은 CoroutineDispatcher
가 바뀌는 경우 별도의 스레드를 만들어 flow를 실행한다.
버퍼링
Flow를 모으는 시간을 단축하기 위해 위에서처럼 여러 개의 flow를 서로 다른 코루틴에서 실행할 수 있다. 특히 각 flow의 실행 시간이 오래 걸릴때 더욱 그렇다. 예를 들어 다음의 예제에서는 값을 계산하는 데 100ms, 계산된 값을 처리하는 데 300ms가 걸린다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 뭔가 계산하는 척
emit(i) // 값 반환
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 뭔가 처리하는 척
println(value)
}
}
println("Collected in $time ms")
}
값 하나를 계산하고 처리하는 데 400ms가 걸리므로, 총 실행시간은 대략 1200ms 정도가 나온다.
1
2
3
Collected in 1218 ms
그런데 사실 이렇게 할 필요가 없다. 값을 계산하기 위해 이전 값이 처리될 때까지 기다릴 필요는 없기 때문이다. 따라서 buffer
연산을 이용하여 계산된 값을 대기시킬 수 있다. 이렇게 하면 대기중인 값을 바로 사용할 수 있다. 서류 쌓이는 속도가 나를 기다려주지 않듯이.
val time = measureTimeMillis {
simple()
.buffer() // buffer: 계산되면 바로바로 반환
.collect { value ->
delay(300) // 뭔가 계산하는 척
println(value)
}
}
println("Collected in $time ms")
버퍼링을 적용하면 첫 번째 값을 처리하는 동안 다른 값을 모두 계산할 수 있다. 따라서 총 수행 시간은 약 1000ms가 된다.
1
2
3
Collected in 1049 ms
flowOn
으로 Dispatcher를 바꿔도 값을 버퍼링할 수 있다.
최신 값만 처리하기
항상 모든 값을 처리해야 하는 것은 아니다. 예를 들어 주식 가격을 가져오는 데 10ms가 걸리는데, 가져온 가격을 보여주는 데 100ms가 걸린다고 가정하자. 물론 이따구로 앱을 만들면 아무도 안 쓰겠지만 어쨌든 가정해 보자. 그러면 내가 가격 하나를 보여주는 동안 10개의 가격이 새로 대기하게 된다. 그런데 나는 10개의 가격을 모두 보여줄 필요가 없다. 가장 최근 가격만 보여주면 되지 않겠는가?
이런 경우에 conflate
연산을 사용할 수 있다. 값의 처리가 늦어지는 경우, 가장 최근에 계산된 값만 처리하도록 하는 것이다.
val time = measureTimeMillis {
simple()
.conflate() // conflate: 가장 최근 값만 처리
.collect { value ->
delay(300) // 뭔가 연산하는 척
println(value)
}
}
println("Collected in $time ms")
1
3
Collected in 748 ms
건너뛰기
또는 값을 처리하는 도중에 계산된 값이 들어오는 경우, 처리 과정을 취소하고 다음 값을 처리할 수도 있다. map
, filter
등의 이름에 Latest
를 붙이면 된다. 예를 들어 collectLatest
를 사용하면, 값을 처리하는 도중 새 값이 계산됐을 때 처리 과정을 중단하고 새 값을 처리하기 시작한다. mapLatest
등도 동일하다.
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 취소 가능!
println("Collecting $value")
delay(300) // 뭔가 처리하는 척
println("Done $value")
}
}
println("Collected in $time ms")
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
1, 2를 각각 처리하는 도중 새 값(2, 3)이 계산되었기 때문에 처리 과정이 취소되고 다시 시작되었다.
Flow 결합
Zip
Sequence.zip
처럼 flow에도 zip
연산이 존재한다. Flow 2개를 zip
하면 새로운 flow
가 만들어진다.
val nums = (1..3).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" } // 문자열 flow 생성
.collect { println(it) } // collect, 출력
1 -> one
2 -> two
3 -> three
Combine
zip
을 사용하면 두 flow가 값을 반환할 때까지 기다린다. 따라서 zip
의 실행 속도는 더 느린 flow의 속도에 맞춰진다.
val nums = (1..3).asFlow().onEach { delay(300) } // 300ms마다 반환
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 400ms마다 반환
val startTime = System.currentTimeMillis()
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1 -> one at 426 ms from start
2 -> two at 826 ms from start
3 -> three at 1228 ms from start
반환 속도가 느린 strs
에 맞춰 zip
도 400ms마다 하나씩 값을 처리한다.
이번에는 flow의 최신 값만을 사용하는 경우를 생각해 보자. zip
과정에서 두 flow의 최신 값이 갱신될 때마다 새로운 값을 반환해야 하는 경우가 있을 수 있다. 이럴 때는 combine
연산을 사용해야 한다.
val nums = (1..3).asFlow().onEach { delay(300) } // 300ms마다 반환
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 400ms마다 반환
val startTime = System.currentTimeMillis()
nums.combine(strs) { a, b -> "$a -> $b" }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
nums
가 값을 반환할 때마다 새 문자열이 반환되고, 마찬가지로 strs
가 값을 반환할 때마다 새 문자열이 반환된다. combine
을 사용하면 두 flow의 최신 값을 계속 갱신받을 수 있다.
Flattening Flows
Flow는 비동기적인 sequence를 표현하기 때문에, sequence의 각 값이 또다른 sequence를 만들어낼 수 있다. 예를 들어 다음의 함수는 두 개의 문자열을 500ms 간격으로 반환한다.
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 500ms 기다린 후에 반환
emit("$i: Second")
}
이제 3개의 정수 flow에 대해 requestFlow
를 실행해 보자.
val flow1 = (1..3).asFlow().map { requestFlow(it) }
requestFlow
가 Flow<String>
을 반환하므로 flow1
의 타입은 Flow<Flow<String>>
이 된다. 여기서 뭔가 더 연산을 수행하려면 flow1
을 flatten 해야 한다. flatten은 다차원 collection을 1차원 collection으로 변환하는 작업이다. 그러나 flow는 비동기적이라는 특성상 flatten하는 방법이 다양하게 존재한다.
flatMapConcat,
가장 직관적인 flatten 메소드이다. 외부 flow의 각 값에 대해 블럭 안의 코드를 실행하고, 블럭에서 반환된 flow가 반환하는 값을 순서대로 이어붙인다. 출력 결과를 보면 바로 이해할 수 있을 것이다.
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) } // 100ms마다 실행
.flatMapConcat { requestFlow(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
flatMapMerge
이 메소드는 여러 개의 flow를 동시에 실행할 수 있으며, 각 flow가 반환하는 값을 반환되는 순서대로 이어붙인다. 즉 먼저 반환되면 먼저 이어붙여진다. 그런데 flow를 동시에 너무 많이 실행하면 flow가 CPU를 독점하게 될 수도 있기 때문에, concurrency
매개변수를 사용하여 동시에 실행될 수 있는 flow의 개수를 제한할 수 있다. 기본값은 DEFAULT_CONCURRENCY
이다.
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) } // 100ms마다 flow 실행
.flatMapMerge { requestFlow(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
먼저 오는 값이 먼저 collect
됨을 알 수 있다.
flatMapLatest
collectLatest
과 비슷하게 최신 값만 flatten할 수도 있다. flatMapLatest
연산을 사용하면 새로 실행된 flow에서 값이 반환됐을 때 이전에 실행 중이었던 모든 flow가 종료된다.
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) } // 100ms마다 flow 실행
.flatMapLatest { requestFlow(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
flatMapLatest
는 블럭 안의 모든 코드를 종료한다. 위의 예시에서는 { requestFlow(it) }
부분이다. 여기서는 requestFlow
가 suspending 함수가 아니라서 상관 없지만, 블럭 안에서 delay
등의 suspending 함수를 사용한다면 주의할 필요가 있다. 모든 코루틴은 종료될 때 Exception을 던지기 때문이다.
Flow Exceptions
Flow 내부에서 값을 반환할 때, 또는 반환된 값을 처리하는 과정에서 excpetion이 발생할 수 있다. Exception을 처리하는 방법은 다음과 같다.
Collector try-catch
collect
를 호출하는 부분에서 try-catch
블럭을 사용할 수 있다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
중간의 check(value <= 1)
부분에서 조건이 거짓일 때 IllegalStateException
을 던진다. 던져진 Exception은 catch
문에서 처리된다.
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
Collector는 전부 catch한다
위의 코드는 flow의 내부에서 발생한 예외와 처리 과정에서 발생한 예외를 모두 잡아낼 수 있다. 정말 그런지 Flow 내부에서 예외를 발생시켜 확인해 보자.
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
예외가 어디서 발생하던 다 잡아낸다.
Transparency를 지키려면
그런데 예외를 emit
과정에서 처리할 수는 없을까?
Flow는 transparent to exception 원칙을 지켜야 한다. 그런데 지금처럼 예외를 emit
바깥에서 처리하는 것은 이 원칙에 위배된다. collect
를 호출한 부분에서 항상 예외를 처리해야 하기 때문이다. 그냥 flow에서 예외를 처리하면 안 될까?
catch
연산을 이용하여 예외 처리를 캡슐화할 수 있다. 예외의 종류에 따라 catch
안에서 적절한 처리 방법을 선택할 수 있다.
- 예외를 떠넘기거나 (위의 예시처럼)
- 적절한 값을
emit
하거나 - 무시하거나, 로그를 찍거나 등등 다른 작업을 수행할 수 있다.
예외가 발생했을 때 값을 emit
하는 방법은 다음과 같다.
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
try-catch
를 사용하지 않고도 같은 효과를 얻을 수 있다.
Transparent catch
catch
연산은 자기 코드 위에서 발생한 예외만 잡아낸다. 다음 예시에서처럼 catch
밑의 collect
블럭에서 발생한 예외는 잡아낼 수 없다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // 아래에서 발생한 예외는 못 잡는다.
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at ...
catch가 다 잡으렴
catch
가 모든 예외를 잡도록 하려면, 값을 처리하는 코드를 catch
위로 올릴 필요가 있다. 그런데 collect
을 올릴 수는 없으니 값을 처리하는 코드를 catch
위의 onEach
로 옮겨야 한다.
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
onEach
는 값이 반환되었을 때 collect
되기 전에 수행할 연산을 정의할 수 있다.
Flow가 종료될 때
Flow가 정상적으로 또는 예외에 의해 종료될 때 특정 작업을 수행해야 하는 경우가 있다. Imperative 또는 declarative 방법으로 마무리 작업을 수행할 수 있다.
Imperative finally block
try-finally
구문을 통해 collect
가 종료되었을 때 특정 작업을 수행할 수 있다.
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
1
2
3
Done
collect
가 종료된 후에 Done
이 추가로 출력된다.
Declarative handling
중간 연산 onCompletion
을 이용하여 collect
가 종료되었을 때 수행할 작업을 정할 수 있다. onCompletion
을 이용하여 위의 코드를 다시 작성하면 다음과 같다.
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
onCompletion
이 좋은 이유는 collect
가 정상적으로 종료되었는지 아니면 예외에 의해 종료되었는지 알 수 있기 때문이다. 매개변수로 주어지는 Throwable
객체를 참조하면 된다. collect
가 정상적으로 종료될 수 있기 때문에 이 값은 nullable이다.
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause ->
if (cause != null) println("Flow completed exceptionally")
}
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
1
Flow completed exceptionally
Caught exception
onCompletion
은 예외를 감지할 수 있지만, 처리하지는 않는다. 위에서 catch
에 예외가 전달된 것을 보면 알 수 있다.
제대로 종료됐는가?
onCompletion
이 좋은 점은, flow에서 발생한 모든 에러가 전달된다는 점이다. 따라서 onCompletion
에 전달된 Throwable
이 null
이라면 해당 flow가 정상적으로 종료되었음을 확신할 수 있다.
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
반대로 onCompletion
에 전달된 Throwable
객체가 null
이 아니라면, 어디서 발생했는지는 몰라도 Exception
이 발생했다는 것을 확실하게 알 수 있다.
Imperative vs. declarative
위에서 flow를 collect
하고 에러를 핸들링하는 두 가지 방법-imperative와 declarative에 대해 알아봤다. 그럼 언제 어떤 방법을 사용해야 할까? 코틀린 공식 문서에 따르면 둘 중 아무거나 써도 상관 없으며, 자신이 편한 방법으로 코딩하면 된다고 한다.
Launching flow
Flow를 활용하면 비동기적으로 데이터를 가져오는 상황을 표현할 수 있다. 위의 예시에서 값이 들어올 때마다 동작할 코드를 collect
블럭에 작성하기도 했지만, onEach
블럭에 작성하기도 했다. 하지만 collect
와 다르게 onEach
는 중간 연산이기 때문에 값을 가져오지 못한다. 값을 실제로 가져오려면 collect
를 호출해야 한다.
// 뭔가 비동기적으로 반환하는 척
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // 모두 collect될 때까지 기다림
println("Done")
}
Event: 1
Event: 2
Event: 3
Done
그런데 flow 자체를 비동기적으로 실행할 수는 없을까? 위에서는 flow가 모두 collect
되기까지 기다리고 있는데, 비동기적으로 계산되는 값을 모두 기다리는 것은 엄청난 낭비이다. 이런 경우에는 onEach
대신 launchIn
을 사용하면 된다. launchIn
을 사용하면 별도의 코루틴에서 flow를 collect
한다.
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // 별도의 코루틴에서 collect 수행
println("Done")
}
Done
Event: 1
Event: 2
Event: 3
launchIn
을 사용하려면 반드시 CoroutineScope
을 매개변수로 명시해야 한다. 위의 코드에서는 runBlocking
의 스코프를 사용하였기 때문에, runBlocking
은 flow가 끝날 때까지 기다려야 한다.
안드로이드 앱에서는 수명주기가 있는 객체(Activity 등)의 스코프가 주로 사용될 것이다. 이런 상황에서는 onEach {...}.launchIn(scope)
가 마치 addEventListener
처럼 작동한다. 하지만 removeEventListener
에 대응되는 코드를 작성할 필요는 없다. 해당 스코프에서 알아서 처리하기 때문이다(structured concurrency!).
참고로 launchIn
은 Job
을 반환한다. 반환된 Job
객체를 가지고 flow의 흐름을 제어할 수 있다.
Flow 취소하기
flow
함수를 이용하여 flow를 만들면, 값을 emit
할 때(정확히는 suspending 함수가 실행될 때) 자신이 취소 대상인지 체크한다. 일종의 편의 기능이라고 보면 되겠다.
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
3까지는 정상적으로 emit
되지만, 4를 반환하려는 순간 자신이 취소 대상임을 인지하기 때문에 CancellationException
이 발생한다.
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
하지만 대부분의 다른 flow 연산은 내부에서 suspending 함수를 호출하지 않으면 자신이 취소 대상인지 체크하지 않는다. 공식 문서를 보면 성능상의 이유 때문이라고 한다.
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1590)
at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:217)
at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:215)
그럼에도 불구하고 취소하고 싶다면
suspending 함수를 호출하지 않는 flow를 종료하고 싶다면, flow가 취소 대상인지를 직접 확인해야 한다. 예를 들어 .onEach { currentCoroutineContext().ensureActive() }
를 써도 되지만, 이런 상황을 위해 cancellable
연산이 이미 구현되어 있다.
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
cancellable
을 사용하면 값을 emit
할 때마다 자신이 취소 대상인지를 확인한다.
'Primary > Kotlin' 카테고리의 다른 글
[Kotlin] Coroutines - Exception Handling and Supervision (0) | 2021.01.25 |
---|---|
[Kotlin] Coroutines - Channels (0) | 2021.01.23 |
[Kotlin] Coroutines - Context and Dispatchers (0) | 2021.01.21 |
[Kotlin] Coroutines - Suspending 함수 활용하기 (0) | 2021.01.20 |
[Kotlin] Coroutines - Cancellation and Timeouts (0) | 2021.01.20 |