이동식 저장소

Observable 본문

Secondary/RxJava

Observable

해스끼 2023. 1. 26. 23:33

Disclaimer: 이 글에서는 원문의 느낌을 살리기 위해 일부 용어를 원문 그대로 사용한다. 

 

Rx에서는 observer가 Observable을 subscribe한다. Observer는 Observable이 emit하는 값을 처리한다. Observer 패턴은 특히 비동기 작업에서 유용하다. 데이터가 언제 주어질 지 모르고, 몇 개나 주어질 지도 모르는 상황에서 스레드를 무작정 block해놓고 기다릴 수는 없다. 따라서 작업을 호출할 때 주어진 데이터에 대해 수행할 작업을 지정해놓기만 하고(subscribe), 실제 작업은 데이터가 주어진 시점에 수행하면 된다. Rx에서는 이 패턴을 reactive pattern이라고 부른다.

 

이미지 한 장으로 보면 다음과 같다.

 

장황하게 적긴 했지만, Kotlin의 ``Flow``와 거의 같다고 이해하면 된다. 용어와 코드가 조금 다를 뿐이다.

onNext, onError, onCompleted

위에서 주어진 데이터에 대해 수행할 작업을 지정한다고 했다. 사실 이 작업은 크게 세 가지로 나눌 수 있는데, 1) 데이터가 정상적으로 주어졌을 때 수행할 작업, 2) observe 과정에서 에러가 발생했을 때 수행할 작업, 3) 데이터가 더 이상 주어지지 않을 때(즉 데이터가 모두 주어졌을 때) 수행할 작업으로 나눌 수 있다.

 

Rx에서는 1), 2), 3)에 대응하는 작업을 각각 ``onNext``, ``onError``, ``onCompleted``라고 부른다. Observable을 subscribe할 때 위의 세 가지 작업을 지정해야 한다.

Unsubscribe?

어떤 Rx 구현체에는 observer의 특수한 경우인 ``Subscriber``라는 인터페이스가 있다. ``Subscriber``는 자신이 observe하던 Observable을 ``unsubscribe``할 수 있다. 유튜브 채널을 구독 취소하는 작업과 같다. 공식 문서에 따르면 아무도 구독하지 않는 observable은 데이터를 그만 emit하도록 설정할 수도 있다. 상황에 따라 그럴 수도 있다는 점만 기억해 두자.

Hot vs. Cold

Observable이 데이터를 emit하기 시작하는 시점에 따라 hot Observable과 cold Observable로 구분할 수 있다.

 

Hot Observable은 만들어진 즉시 데이터를 emit할 수 있다. 구독자가 몇 명이건 상관없이 데이터를 emit하는 것이다. 따라서 hot Observable을 구독하는 observer는 (빨리 구독하지 않으면) 데이터의 일부만을 얻을 수도 있다.

 

반대로 cold Observable은 누군가가 자신을 구독한 후에 emit을 시작한다. 따라서 cold Observable을 구독하는 observer는 Observable이 emit하는 모든 값을 구독할 수 있다.

 

Kotlin의 hot/cold stream과는 약간 다르다. 헷갈리지 말자.

Observable에 적용할 수 있는 연산자

사실 observer 패턴 자체는 매우 쉽다. 중요한 것은 꺾이지 않는 마음...이 아니라 emit되는 값 중 원하는 값만을 골라 구독하고, 적절하게 처리하는 연산 기술이다.

 

연산의 종류에 따라 나누면 다음과 같다. 지금은 개별 연산자보다는 연산의 종류만 기억하고 넘어가자.

Observable을 만드는 연산

``Create``, ``Defer``, ``Empty``/``Never``/``Throw``, ``From``, ``Interval``, ``Just``, ``Range``, ``Repeat``, ``Start``, ``Timer``

Observable을 변환하는 연산

``Buffer``, ``FlatMap``, ``GroupBy``, ``Map``, ``Scan``, ``Window``

필터링 연산

``Debounce``, ``Distinct``, ``ElementAt``, ``Filter``, ``First``, ``IgnoreElements``, ``Last``, ``Sample``, ``Skip``, ``SkipLast``, ``Take``, ``TakeLast``

결합 연산

``And``/``Then``/``When``, ``CombineLatest``, ``Join``, ``Merge``, ``StartWith``, ``Switch``, ``Zip``

에러 처리 연산

``Catch``, ``Retry``

유틸리티 연산

``Delay``, ``Do``, ``Materialize``/``Dematerialize``, ``ObserveOn``, ``Serialize``, ``Subscribe``, ``SubscribeOn``, ``TimeInterval``, ``Timeout``, ``Timestamp``, ``Using``

조건 연산

``All``, ``Amb`` ``Contains``, ``DefaultIfEmpty``, ``SequenceEqual``, ``SkipUntil``, ``SkipWhile``, ``TakeUntil``, ``TakeWhile``

수학, 통계 연산

``Average``, ``Concat``, ``Count``, ``Max``, ``Min``, ``Reduce``, ``Sum``

Converting 연산

``To`` (emit되는 값을 다른 자료형으로 변환)

속도 제한 연산

Observer가 처리하는 속도보다 Observable이 값을 만드는 속도가 더 빠를 때, 값을 만드는 속도를 제어하는 연산이다. 문서에서도 대충 그런 게 있다 정도로만 언급하고 있다.

체이닝

연산 뒤에 연산을 덧붙이는 식으로 연산 묶음을 만들 수 있다. 연산이 정의된 순서대로 적용되기 때문에 순서가 매우 중요하다는 사실만 기억하자.

참고문헌

 

ReactiveX - Observable

Observable In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Ob

reactivex.io

 

'Secondary > RxJava' 카테고리의 다른 글

Single  (0) 2023.01.28
ReactiveX 공부 시작  (0) 2023.01.26
Comments