Observable 특집으로 Observable의 생성과 종류까지 다뤄보고자 한다. 그 전에 Reactive Programming에 대해 정리한 글이 있는데 옵저버 패턴부터 다시 짚어보고자 한다.
0. Observable
관찰할 수 있는 데이터 스트림
관찰할 수 있는 데이터 스트림을 Rx에서는 Observable 이라고 부른다. 이 데이터 스트림을 관찰하는 Observer들은 Observable이 발행하는 데이터 스트림을 구독하여 데이터를 구독하고 처리한다. 이는 옵저버 패턴을 따른 것으로 옵저버 패턴은 관찰할 수 있는 대상의 상태가 변하면 이를 관찰하는 대상에게 알려주는 구조이다.
쉬운 예시로, 식당 예약을 관리하는 애플리케이션이 있다고 하자.
예약하고 싶었던 식당이 이미 인원이 다 차버려 예약할 수 없는 상황이라면 사용자들은 예약 대기를 등록해 놓고, 예약이 취소되면 애플리케이션은 등록을 한 대기자들에게 알람을 동시에 보내준다. 여기서 식당(Subject)의 예약 마감 상태에서 예약 가능으로 상태 변경이 되면 대기 예약에 등록한 사용자들(Observers)은 동시에 알람을 받을 수 있다. Rx에서는 이 Subject가 Observable이라고 불린다. Observable은 Events나 Data를 방출(Emit)하고 이렇게 방출된 데이터들을 Observer이 받는 형태가 Rx에서 이 디자인패턴을 사용하는 방법이다.
1. Observer들은 어떻게 데이터를 받나?
그렇다면 Observer는 어떻게 Observable이 발행하는 데이터를 이벤트 형태로 받아보게 되는가?
이는 Emitter라는 인터페이스가 가지고 있는 세 가지의 이벤트를 활용하여 Observer에게 알릴 수 있다.
interface Observable<T> {
fun subscribe(observer: Observer<T>)
}
Observable interface는 subscribe() 메소드를 가지고 있고 이는 이벤트를 수신하는 주체를 정하는 데에 쓰인다. 이 메소드는 Observer가 Observable에 subscribe 할 수 있도록 invoke하는 역할을 한다. 그리고 Observable에 subscribe을 한 후에는 Observable는 Observer에게 세 개 타입의 이벤트들을 전달한다.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
}
- onNext()
- Observable은 데이터를 방출 할 때마다 onNext 함수를 호출한다. onNext 함수는 인자로 Observable로부터 방출된 데이터를 갖게 된다. - onError()
- Observable이 데이터를 방출하는 것을 실패했거나 다른 에러가 발생했을 경우 호출하며 에러 객체와 함께 에러를 통지한다. 반응형 프로그래밍에서의 에러는 데이터 플로우에서 나오는 한 데이터처럼 다뤄진다. - onComplete()
- onNext() 함수 호출을 마지막으로 한 후 더 이상의 방출할 데이터가 없거나 이벤트가 정상적으로 끝났을 때 onComplete 함수를 호출한다.
- 모든 emissions이 완료되었음을 알 수 있다.
이처럼 서로 다른 세 개의 채널이 있다고 생각하고, Observable의 pipeline이 끝나면 결과값에 따라 최종 목적지인 한 개의 채널로 데이터가 전달될 것이다. 항상 onComplete() 혹은 onError() 둘 중 하나로만 데이터 발행이 종료되어야 한다.
그리고 Observable에서 데이터, 오류 등을 발행할 때 null 을 발행하면 안 된다. null 값을 지정하면 NullPointerException이 발생하고 이를 따로 처리해줘야하는데 Observable은 null 값을 방출하지 않도록 설계되어 있기 때문에 이를 위반하면서까지 NPE를 발생시키면서 에러를 처리해주기 보다는 Rx의 설계 철학을 따르는 것이 좋다.
'💤 RxJava' 카테고리의 다른 글
[RxJava] Single, Maybe and Completable (0) | 2023.09.12 |
---|---|
[RxJava] Observable 생성하기 (0) | 2023.09.12 |
[RxJava] subscribe inside subscribe (0) | 2023.09.06 |
[RxJava] Scheduler로 Multi Thread 관리하기 (0) | 2023.07.04 |
[RxJava] Rxjava + Retrofit = Error Handling (0) | 2023.03.31 |