이동식 저장소

[Kotlin] Coroutines - Asynchronous Flow 본문

Primary/Kotlin

[Kotlin] Coroutines - Asynchronous Flow

해스끼 2021. 1. 22. 23:35

suspending 함수를 이용하여 비동기적으로 값을 계산할 수 있다. 그런데 비동기적으로 계산되는 값이 여러 개라면 어떻게 할까? 예를 들어 1번부터 n번째까지의 피보나치 수를 1초에 하나씩 계산해서 반환하는 함수가 있다고 하자. 물론 모든 결과가 계산될 때까지 기다리는 방법도 있지만, 결과값이 반환될 때마다 하나씩 출력하는 것이 성능 측면에서 더 좋을 수 있다.

코틀린의 Flow를 이용하여 비동기적으로 계산된 값을 하나씩 받을 수 있다.

컬렉션 복습

collections을 이용하여 여러 개의 값을 표현할 수 있다. 다음의 코드에서 simple 함수는 세 개의 정수로 이루어진 List를 반환한다.

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
    simple().forEach { value -> println(value) } 
}
1
2
3

Sequence 복습

계산 과정에서 CPU가 많이 필요하다면 Sequence를 사용할 수 있다. 여기서는 CPU를 사용하는 작업을 흉내내기 위해 100ms동안 sleep했다.

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // 계산하는 척
        yield(i) // 값을 하나씩 반환
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}
1
2
3

suspending

하지만 Thread.sleep은 메인 스레드를 block한다. 스레드를 block하지 않고 계산하기 위해 delay 함수를 사용해 보자. 함수 내부에서 suspending 함수를 호출하므로 simple 역시 suspending 함수가 되어야 한다.

suspend fun simple(): List<Int> {
    delay(1000) // 뭔가 계산하고 있는 척
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}
1
2
3

1초 후에 숫자가 출력된다.

Flow

그런데 위의 코드를 보면 결과값을 List로 반환하고 있기 때문에, 모든 값을 계산하기 전까지는 결과값을 반환할 수 없다. 위에서 Sequence를 이용하여 동기적으로 계산되는 값을 하나씩 반환했다. 비동기적으로 계산되는 값은 Flow를 사용하여 반환할 수 있다.


fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // 계산하는 척
        emit(i)    // 값 반환
    }
}

fun main() = runBlocking<Unit> {
    // main 스레드가 block되었는지 확인하는 용도
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    simple().collect { value -> println(value) } 
}

Flow를 통해 값이 하나씩 반환되고 있다. 한편 main 스레드는 block되지 않았기 때문에 다른 작업(코루틴 등)을 수행할 수 있다.

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

위 코드의 내용을 정리하면 다음과 같다.

  • 함수 flow를 이용하여 Flow를 만들 수 있다. List를 만들 때 listOf를 사용하듯이.
  • flow {...} 블럭 안의 코드는 suspend될 수 있다. 따라서 블럭 안에서 비동기 작업을 수행할 수 있다.
  • simple 함수 앞에 suspend 키워드가 붙지 않았다.
  • Flowemit 함수를 이용하여 값을 반환한다.
  • Flow의 값은 collect 함수를 이용하여 얻을 수 있다.

Flow는 게으르다

flow 블럭 안의 코드는 flow가 collect되기 전에는 실행되지 않는다.


fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

simple 함수가 suspend가 아닌 이유이기도 하다. simple()은 호출된 즉시 Flow 객체를 반환하지만, 내부의 코드를 실행하지는 않는다. collect 함수가 호출되기 전까지는 블럭 안의 코드가 실행되지 않는다.

Flow는 collect 함수가 호출될 때마다 새로 시작된다. Flow started라는 문자열이 두 번 출력된 것을 보면 알 수 있다. 또 main 스레드는 collect가 끝날 때까지 non-blocking 상태로 기다린다. 값을 비동기적으로 출력하고 싶다면 다음과 같이 별도의 코루틴을 만들어야 한다.


fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    launch {
        println("Calling collect...")
        flow.collect { value -> println(value) } 
    }
    launch {
        println("Calling collect again...")
        flow.collect { value -> println(value) } 
    }
}
Calling simple function...
Calling collect...
Flow started
Calling collect again...
Flow started
1
1
2
2
3
3

Flow 취소

Flow도 일반적인 코루틴 취소의 원칙을 따른다. Flow는 취소 가능한 suspending 함수로 인해 중단되었을 때 취소될 수 있다. 예를 들어 flow에 취소 명령이 내려졌다면, flow 내부에서 delay 등이 실행될 때 자신이 취소 대상임을 인지하고 취소된다.


fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // 250ms 후 종료: 3은 emit되지 않는다.
        simple().collect { value -> println(value) } 
    }
    println("Done")
}
Emitting 1
1
Emitting 2
2
Done

Flow의 취소에 대해 글 후반부에서 다시 살펴볼 것이다.

Flow를 만드는 다른 방법

Flow는 flow {...} 함수를 이용해서 만들 수도 있고, 다른 함수를 통해서도 만들 수 있다.

  • flowOf 함수를 이용하여 반환할 값을 정할 수 있다.
  • 대부분의 Collections과 sequence에서 .asFlow() 확장 함수를 사용하여 flow를 만들 수 있다.
// Range를 Flow로 변환
(1..3).asFlow().collect { value -> println(value) }
1
2
3

Flow 연산

Collections에 mapfilter 등의 연산을 적용하듯이 flow에도 연산을 적용할 수 있다. Flow에 연산을 적용하면 flow가 반환되며, 반환된 flow 역시 collect되기 전까지는 실행되지 않는다. 연산 블럭 안에서 suspending 함수를 호출할 수 있다는 점을 기억하자.


suspend fun performRequest(request: Int): String {
    delay(1000) // 뭔가 열심히 계산하는 척
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // flow
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

위 코드를 실행하면 1초에 하나씩 문자열이 출력된다. map 블럭 안에서 suspend 함수를 실행하고 있다는 점에 주목하자.

response 1
response 2
response 3

Transform

가장 많이 사용되는 Flow 연산 중 하나로 transform이 있다. transform을 사용하면 map 등의 간단한 연산을 직접 할 수도 있고, 더 복잡한 연산을 수행할 수도 있다. transform 연산을 사용하면 임의의 값을 임의의 횟수만큼 emit할 수 있다.

(1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

performRequest 함수를 실행하기 전에 문자열을 하나 반환하고, 함수의 결과값을 반환한다. 이처럼 transform을 이용하여 실행 한 번당 여러 개의 값을 반환할 수 있다.

처음 두 개만 받고 싶어요!

take 함수를 사용하여 가져올 값의 개수를 정할 수 있다. 주의할 점은 정해진 개수만큼 가져온 후 flow취소된다는 점이다. 코루틴이 취소될 때는 항상 exception이 발생하므로, 일반적인 리소스 해제 코드(try-finally등)를 그대로 사용할 수 있다.

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // 처음 두 개의 값만 collect
        .collect { value -> println(value) }
}            
1
2
Finally in numbers

종료 연산

종료 연산이란 flow의 값을 가져오는 suspending 함수를 의미한다.

  • collect 함수도 종료 연산이다.
  • Collection으로의 변환(toList, toSet 등)도 종료 연산이다.
  • Flow의 첫 번째 값을 받는 first 연산과 Flow가 반환할 단 하나의 값을 받는 single 연산도 종료 연산이다. 두 연산의 차이점은 링크를 참고하길 바란다.
  • reducefold도 종료 연산이다. 까먹었다면 링크 참고.
val sum = (1..5).asFlow()
    .map { it * it }          // (Intermediate Operator, 중간 연산)
    .reduce { a, b -> a + b } // 합을 구함 (Terminal Operator, 종료 연산)
println(sum)
55

순차적으로 실행된다

Flow는 특별히 여러 개의 flow를 연산하지 않는 이상 순차적으로 연산된다. Sequence와 같은 방법으로 연산된다. 각 원소마다 연산을 수행하는데, 중간에 연산이 종료될 경우(조건을 만족하지 못하는 경우 등) 바로 다음 원소로 넘어간다. 아래의 예시를 보면 이해될 것이다.

값을 가져오는 과정은 별도의 코루틴을 실행하지 않고, 종료 연산을 호출한 코루틴에서 바로 시작된다.

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }    
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

filter의 조건에 맞지 않는 1, 3 등은 map으로 넘어가지 않음을 알 수 있다.

Flow 문맥

Flow의 값을 가져오는 과정은 항상 종료 연산을 실행한 코루틴의 문맥에서 진행된다. 예를 들어 다음의 코드는 context 문맥에서 실행된다.

withContext(context) {
    simple().collect { value ->
        println(value) // 문맥이 context로 지정됨
    }
}

이것을 문맥 보존 이라고 한다. 기본적으로 flow {...} 내부의 코드는 종료 연산을 실행한 문맥에서 진행된다. 정말 그런지 살펴보자.

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}            
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

Flow가 시작된 코루틴과 값을 collect하는 코루틴이 같음을 알 수 있다. Caller를 block하지 않고, 코드가 실행되는 문맥이 중요하지 않은 경우 이렇게 사용하면 좋다. 사실 이게 기본값이긴 하다.

문맥을 잘못 사용한 경우

하지만 flow 내부에서 오래 걸리는 코드를 실행하는 경우에는 Dispatchers.Default를 사용해야 한다. 마찬가지로 UI를 업데이트하는 코드는 Dispatchers.Main을 사용해야 한다. withContext를 사용하여 코루틴의 문맥을 바꿀 수 있지만, flow {...}의 경우 자신이 실행된 문맥과 같은 문맥에만 값을 반환할 수 있기 때문에 문제가 된다.

어디 한번 withContext로 문맥을 바꿔 보자. 미리 말하는데, 이 코드는 틀린 코드다.

fun simple(): Flow<Int> = flow {
    // 잘못된 코드
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // 뭔가 계산하는 척
            emit(i)           // 값 반환
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}            
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
        but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
        Please refer to 'flow' documentation or use 'flowOn' instead
    at ...

방금 이야기한 내용이 에러 메시지에 그대로 나와 있다. 종료 연산을 실행한 문맥(BlockingCoroutine)과 값이 반환된 문맥(DispatchedCoroutine)이 서로 다르기 때문에 에러가 발생했다. 메시지를 더 자세히 읽어보면 flowOn을 사용하라고 하고 있다.

flowOn

Flow가 값을 반환하는 문맥을 바꾸려면 다음과 같이 해야 한다. 실제로 문맥이 바뀌었는지 확인하기 위해 로그를 찍어 보았다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // 뭔가 계산하는 척
        log("Emitting $i")
        emit(i)           // 값 반환
    }
}.flowOn(Dispatchers.Default) // 올바른 방법

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}            
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

Flow가 별도의 스레드에서 값을 반환했지만, main 스레드에서 값을 정상적으로 가져왔다. 이처럼 flowOnCoroutineDispatcher가 바뀌는 경우 별도의 스레드를 만들어 flow를 실행한다.

버퍼링

Flow를 모으는 시간을 단축하기 위해 위에서처럼 여러 개의 flow를 서로 다른 코루틴에서 실행할 수 있다. 특히 각 flow의 실행 시간이 오래 걸릴때 더욱 그렇다. 예를 들어 다음의 예제에서는 값을 계산하는 데 100ms, 계산된 값을 처리하는 데 300ms가 걸린다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 뭔가 계산하는 척
        emit(i)    // 값 반환
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // 뭔가 처리하는 척
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

값 하나를 계산하고 처리하는 데 400ms가 걸리므로, 총 실행시간은 대략 1200ms 정도가 나온다.

1
2
3
Collected in 1218 ms

그런데 사실 이렇게 할 필요가 없다. 값을 계산하기 위해 이전 값이 처리될 때까지 기다릴 필요는 없기 때문이다. 따라서 buffer 연산을 이용하여 계산된 값을 대기시킬 수 있다. 이렇게 하면 대기중인 값을 바로 사용할 수 있다. 서류 쌓이는 속도가 나를 기다려주지 않듯이.

val time = measureTimeMillis {
    simple()
        .buffer()      // buffer: 계산되면 바로바로 반환
        .collect { value -> 
            delay(300) // 뭔가 계산하는 척
            println(value) 
        } 
}   
println("Collected in $time ms")

버퍼링을 적용하면 첫 번째 값을 처리하는 동안 다른 값을 모두 계산할 수 있다. 따라서 총 수행 시간은 약 1000ms가 된다.

1
2
3
Collected in 1049 ms

flowOn으로 Dispatcher를 바꿔도 값을 버퍼링할 수 있다.

최신 값만 처리하기

항상 모든 값을 처리해야 하는 것은 아니다. 예를 들어 주식 가격을 가져오는 데 10ms가 걸리는데, 가져온 가격을 보여주는 데 100ms가 걸린다고 가정하자. 물론 이따구로 앱을 만들면 아무도 안 쓰겠지만 어쨌든 가정해 보자. 그러면 내가 가격 하나를 보여주는 동안 10개의 가격이 새로 대기하게 된다. 그런데 나는 10개의 가격을 모두 보여줄 필요가 없다. 가장 최근 가격만 보여주면 되지 않겠는가?

이런 경우에 conflate 연산을 사용할 수 있다. 값의 처리가 늦어지는 경우, 가장 최근에 계산된 값만 처리하도록 하는 것이다.

val time = measureTimeMillis {
    simple()
        .conflate()         // conflate: 가장 최근 값만 처리
        .collect { value -> 
            delay(300)      // 뭔가 연산하는 척
            println(value) 
        } 
}   
println("Collected in $time ms")
1
3
Collected in 748 ms

건너뛰기

또는 값을 처리하는 도중에 계산된 값이 들어오는 경우, 처리 과정을 취소하고 다음 값을 처리할 수도 있다. map, filter 등의 이름에 Latest를 붙이면 된다. 예를 들어 collectLatest를 사용하면, 값을 처리하는 도중 새 값이 계산됐을 때 처리 과정을 중단하고 새 값을 처리하기 시작한다. mapLatest 등도 동일하다.

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // 취소 가능!
            println("Collecting $value") 
            delay(300)            // 뭔가 처리하는 척
            println("Done $value") 
        } 
}   
println("Collected in $time ms")
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

1, 2를 각각 처리하는 도중 새 값(2, 3)이 계산되었기 때문에 처리 과정이 취소되고 다시 시작되었다.

Flow 결합

Zip

Sequence.zip처럼 flow에도 zip 연산이 존재한다. Flow 2개를 zip하면 새로운 flow가 만들어진다.

val nums = (1..3).asFlow() 
val strs = flowOf("one", "two", "three") 
nums.zip(strs) { a, b -> "$a -> $b" } // 문자열 flow 생성
    .collect { println(it) }          // collect, 출력
1 -> one
2 -> two
3 -> three

Combine

zip을 사용하면 두 flow가 값을 반환할 때까지 기다린다. 따라서 zip의 실행 속도는 더 느린 flow의 속도에 맞춰진다.

val nums = (1..3).asFlow().onEach { delay(300) }               // 300ms마다 반환
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 400ms마다 반환
val startTime = System.currentTimeMillis()
nums.zip(strs) { a, b -> "$a -> $b" } 
    .collect { value ->
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1 -> one at 426 ms from start
2 -> two at 826 ms from start
3 -> three at 1228 ms from start

반환 속도가 느린 strs에 맞춰 zip도 400ms마다 하나씩 값을 처리한다.

이번에는 flow의 최신 값만을 사용하는 경우를 생각해 보자. zip 과정에서 두 flow의 최신 값이 갱신될 때마다 새로운 값을 반환해야 하는 경우가 있을 수 있다. 이럴 때는 combine 연산을 사용해야 한다.

val nums = (1..3).asFlow().onEach { delay(300) }               // 300ms마다 반환
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 400ms마다 반환
val startTime = System.currentTimeMillis()
nums.combine(strs) { a, b -> "$a -> $b" } 
    .collect { value ->
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

nums가 값을 반환할 때마다 새 문자열이 반환되고, 마찬가지로 strs가 값을 반환할 때마다 새 문자열이 반환된다. combine을 사용하면 두 flow의 최신 값을 계속 갱신받을 수 있다.

Flattening Flows

Flow는 비동기적인 sequence를 표현하기 때문에, sequence의 각 값이 또다른 sequence를 만들어낼 수 있다. 예를 들어 다음의 함수는 두 개의 문자열을 500ms 간격으로 반환한다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500)           // 500ms 기다린 후에 반환
    emit("$i: Second")    
}

이제 3개의 정수 flow에 대해 requestFlow를 실행해 보자.

val flow1 = (1..3).asFlow().map { requestFlow(it) }

requestFlowFlow<String>을 반환하므로 flow1의 타입은 Flow<Flow<String>>이 된다. 여기서 뭔가 더 연산을 수행하려면 flow1flatten 해야 한다. flatten은 다차원 collection을 1차원 collection으로 변환하는 작업이다. 그러나 flow는 비동기적이라는 특성상 flatten하는 방법이 다양하게 존재한다.

flatMapConcat,

가장 직관적인 flatten 메소드이다. 외부 flow의 각 값에 대해 블럭 안의 코드를 실행하고, 블럭에서 반환된 flow가 반환하는 값을 순서대로 이어붙인다. 출력 결과를 보면 바로 이해할 수 있을 것이다.

val startTime = System.currentTimeMillis() 
(1..3).asFlow().onEach { delay(100) }      // 100ms마다 실행
    .flatMapConcat { requestFlow(it) } 
    .collect { value -> 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

flatMapMerge

이 메소드는 여러 개의 flow를 동시에 실행할 수 있으며, 각 flow가 반환하는 값을 반환되는 순서대로 이어붙인다. 즉 먼저 반환되면 먼저 이어붙여진다. 그런데 flow를 동시에 너무 많이 실행하면 flow가 CPU를 독점하게 될 수도 있기 때문에, concurrency 매개변수를 사용하여 동시에 실행될 수 있는 flow의 개수를 제한할 수 있다. 기본값은 DEFAULT_CONCURRENCY이다.

val startTime = System.currentTimeMillis() 
(1..3).asFlow().onEach { delay(100) }       // 100ms마다 flow 실행
    .flatMapMerge { requestFlow(it) }             
    .collect { value ->
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

먼저 오는 값이 먼저 collect됨을 알 수 있다.

flatMapLatest

collectLatest과 비슷하게 최신 값만 flatten할 수도 있다. flatMapLatest 연산을 사용하면 새로 실행된 flow에서 값이 반환됐을 때 이전에 실행 중이었던 모든 flow가 종료된다.

val startTime = System.currentTimeMillis() 
(1..3).asFlow().onEach { delay(100) }       // 100ms마다 flow 실행
    .flatMapLatest { requestFlow(it) }             
    .collect { value ->
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

flatMapLatest는 블럭 안의 모든 코드를 종료한다. 위의 예시에서는 { requestFlow(it) } 부분이다. 여기서는 requestFlowsuspending 함수가 아니라서 상관 없지만, 블럭 안에서 delay 등의 suspending 함수를 사용한다면 주의할 필요가 있다. 모든 코루틴은 종료될 때 Exception을 던지기 때문이다.

Flow Exceptions

Flow 내부에서 값을 반환할 때, 또는 반환된 값을 처리하는 과정에서 excpetion이 발생할 수 있다. Exception을 처리하는 방법은 다음과 같다.

Collector try-catch

collect를 호출하는 부분에서 try-catch 블럭을 사용할 수 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            

중간의 check(value <= 1) 부분에서 조건이 거짓일 때 IllegalStateException을 던진다. 던져진 Exception은 catch 문에서 처리된다.

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

Collector는 전부 catch한다

위의 코드는 flow의 내부에서 발생한 예외와 처리 과정에서 발생한 예외를 모두 잡아낼 수 있다. 정말 그런지 Flow 내부에서 예외를 발생시켜 확인해 보자.

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

예외가 어디서 발생하던 다 잡아낸다.

Transparency를 지키려면

그런데 예외를 emit 과정에서 처리할 수는 없을까?

Flow는 transparent to exception 원칙을 지켜야 한다. 그런데 지금처럼 예외를 emit 바깥에서 처리하는 것은 이 원칙에 위배된다. collect를 호출한 부분에서 항상 예외를 처리해야 하기 때문이다. 그냥 flow에서 예외를 처리하면 안 될까?

catch 연산을 이용하여 예외 처리를 캡슐화할 수 있다. 예외의 종류에 따라 catch 안에서 적절한 처리 방법을 선택할 수 있다.

  • 예외를 떠넘기거나 (위의 예시처럼)
  • 적절한 값을 emit하거나
  • 무시하거나, 로그를 찍거나 등등 다른 작업을 수행할 수 있다.

예외가 발생했을 때 값을 emit하는 방법은 다음과 같다.

simple()
    .catch { e -> emit("Caught $e") }     // emit on exception
    .collect { value -> println(value) }
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

try-catch를 사용하지 않고도 같은 효과를 얻을 수 있다.

Transparent catch

catch 연산은 자기 코드 위에서 발생한 예외만 잡아낸다. 다음 예시에서처럼 catch 밑의 collect 블럭에서 발생한 예외는 잡아낼 수 없다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // 아래에서 발생한 예외는 못 잡는다.
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}            
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
    at ...

catch가 다 잡으렴

catch가 모든 예외를 잡도록 하려면, 값을 처리하는 코드를 catch 위로 올릴 필요가 있다. 그런데 collect을 올릴 수는 없으니 값을 처리하는 코드를 catch 위의 onEach로 옮겨야 한다.

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

onEach는 값이 반환되었을 때 collect되기 전에 수행할 연산을 정의할 수 있다.

Flow가 종료될 때

Flow가 정상적으로 또는 예외에 의해 종료될 때 특정 작업을 수행해야 하는 경우가 있다. Imperative 또는 declarative 방법으로 마무리 작업을 수행할 수 있다.

Imperative finally block

try-finally 구문을 통해 collect가 종료되었을 때 특정 작업을 수행할 수 있다.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}            
1
2
3
Done

collect가 종료된 후에 Done이 추가로 출력된다.

Declarative handling

중간 연산 onCompletion을 이용하여 collect가 종료되었을 때 수행할 작업을 정할 수 있다. onCompletion을 이용하여 위의 코드를 다시 작성하면 다음과 같다.

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

onCompletion이 좋은 이유는 collect가 정상적으로 종료되었는지 아니면 예외에 의해 종료되었는지 알 수 있기 때문이다. 매개변수로 주어지는 Throwable 객체를 참조하면 된다. collect가 정상적으로 종료될 수 있기 때문에 이 값은 nullable이다.

    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> 
            if (cause != null) println("Flow completed exceptionally")
        }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}            
1
Flow completed exceptionally
Caught exception

onCompletion은 예외를 감지할 수 있지만, 처리하지는 않는다. 위에서 catch에 예외가 전달된 것을 보면 알 수 있다.

제대로 종료됐는가?

onCompletion이 좋은 점은, flow에서 발생한 모든 에러가 전달된다는 점이다. 따라서 onCompletion에 전달된 Throwablenull이라면 해당 flow가 정상적으로 종료되었음을 확신할 수 있다.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2

반대로 onCompletion에 전달된 Throwable 객체가 null이 아니라면, 어디서 발생했는지는 몰라도 Exception이 발생했다는 것을 확실하게 알 수 있다.

Imperative vs. declarative

위에서 flow를 collect하고 에러를 핸들링하는 두 가지 방법-imperative와 declarative에 대해 알아봤다. 그럼 언제 어떤 방법을 사용해야 할까? 코틀린 공식 문서에 따르면 둘 중 아무거나 써도 상관 없으며, 자신이 편한 방법으로 코딩하면 된다고 한다.

Launching flow

Flow를 활용하면 비동기적으로 데이터를 가져오는 상황을 표현할 수 있다. 위의 예시에서 값이 들어올 때마다 동작할 코드를 collect 블럭에 작성하기도 했지만, onEach 블럭에 작성하기도 했다. 하지만 collect와 다르게 onEach는 중간 연산이기 때문에 값을 가져오지 못한다. 값을 실제로 가져오려면 collect를 호출해야 한다.

// 뭔가 비동기적으로 반환하는 척
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // 모두 collect될 때까지 기다림
    println("Done")
}            
Event: 1
Event: 2
Event: 3
Done

그런데 flow 자체를 비동기적으로 실행할 수는 없을까? 위에서는 flow가 모두 collect되기까지 기다리고 있는데, 비동기적으로 계산되는 값을 모두 기다리는 것은 엄청난 낭비이다. 이런 경우에는 onEach 대신 launchIn을 사용하면 된다. launchIn을 사용하면 별도의 코루틴에서 flow를 collect한다.

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // 별도의 코루틴에서 collect 수행
    println("Done")
}            
Done
Event: 1
Event: 2
Event: 3

launchIn을 사용하려면 반드시 CoroutineScope을 매개변수로 명시해야 한다. 위의 코드에서는 runBlocking의 스코프를 사용하였기 때문에, runBlocking은 flow가 끝날 때까지 기다려야 한다.

안드로이드 앱에서는 수명주기가 있는 객체(Activity 등)의 스코프가 주로 사용될 것이다. 이런 상황에서는 onEach {...}.launchIn(scope)가 마치 addEventListener처럼 작동한다. 하지만 removeEventListener에 대응되는 코드를 작성할 필요는 없다. 해당 스코프에서 알아서 처리하기 때문이다(structured concurrency!).

참고로 launchInJob을 반환한다. 반환된 Job 객체를 가지고 flow의 흐름을 제어할 수 있다.

Flow 취소하기

flow 함수를 이용하여 flow를 만들면, 값을 emit할 때(정확히는 suspending 함수가 실행될 때) 자신이 취소 대상인지 체크한다. 일종의 편의 기능이라고 보면 되겠다.

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

3까지는 정상적으로 emit되지만, 4를 반환하려는 순간 자신이 취소 대상임을 인지하기 때문에 CancellationException이 발생한다.

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

하지만 대부분의 다른 flow 연산은 내부에서 suspending 함수를 호출하지 않으면 자신이 취소 대상인지 체크하지 않는다. 공식 문서를 보면 성능상의 이유 때문이라고 한다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
 at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1590) 
 at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:217) 
 at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:215) 

그럼에도 불구하고 취소하고 싶다면

suspending 함수를 호출하지 않는 flow를 종료하고 싶다면, flow가 취소 대상인지를 직접 확인해야 한다. 예를 들어 .onEach { currentCoroutineContext().ensureActive() }를 써도 되지만, 이런 상황을 위해 cancellable 연산이 이미 구현되어 있다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365

cancellable을 사용하면 값을 emit할 때마다 자신이 취소 대상인지를 확인한다.

Comments