Scheduler로 Multi Thread 관리하기
Rx를 이용하면 여러 스레드를 사용해 어떤 작업을 수행하는 도중에도 다른 작업을 실행하는 비동기작업을 할 수 있다. Rx에서는 비동기 처리를 수행하는데 필요한 API를 제공하므로 기존에 구축한 비즈니스 로직에 영향을 주지 않고도 데이터를 통지하는 생산자 측의 처리와 데이터를 받는 측의 처리를 분리하며 서로 다른 스레드에서 실행할 수 있다. 즉 생산자가 무엇을 하더라도 소비자가 받은 데이터의 범주에서만 작업을 하게 된다면 비동기로 쉽게 전환하며 교체할 수 있다. 또한 용도별로 적절히 스레드를 관리하는 클래스를 제공해 직접 스레드를 관리해야 하는 번거로움도 없다는 편리함이 있다. Rx에서 제공해주는 operator들을 적절히 사용하여 스레드 관리를 해보자.
1) Scheduler
RxJava에서는 개발자가 직접 스레드를 관리하지 않게 각 처리 목적에 맞춰 스레드를 관리하는 스케쥴러(Scheduler)를 제공한다.
이 스케줄러를 설정하면 어떤 스레드에서 무엇을 처리할지 제어할 수 있다. 이 Scheduler라는 개념은 멀티스레드 환경에서 여러 스레드간의 Observable들의 흐름을 관장하는 역할을 한다. 스케쥴러는 데이터 생성의 기본이 되는 생산자(Observable)에 지정할 수도 있고 데이터의 수신 처리를 하는 소비자(Observer)에서도 지정할 수 있다. 전자는 데이터 통지를 어떤 스케줄러에서 처리할 지를 제어할 수 있고 후자는 데이터의 수신 처리를 어느 스케줄러에서 할 지를 제어하고 전환하며 비동기처리를 할 수 있는 것이다.
안드로이드에서 많이 쓰는 스케줄러로는,
Schedulers.io() : 동기 I/O를 별도로 처리시켜 비동기 효율을 얻기 위한 스케쥴러. 자체적인 스레드 풀인 CachedThreadPool을 사용한다. API 호출 등 네트워크를 사용한 호출 시 사용된다.
- Schedulers.computation() : 이벤트 룹에서 간단한 연산이나 콜백 처리를 위해 사용된다.RxComputationThreadPool라는 별도의 스레드 풀에서 돌아간다. 최대 cpu갯수 개의 스레드 풀이 순환하면서 실행된다.
- Schedulers.immediate() : 현재 스레드에서 즉시 수행한다. observeOn()이 여러번 쓰였을 경우 immediate()를 선언한 바로 윗쪽의 스레드를 따라간다.
- Schedulers.from(executor) : 특정 executor를 스케쥴러.
- Schedulers.newThread() : 새로운 스레드를 만드는 스케쥴러.
- Schedulers.trampoline() : 큐에 있는 일이 끝나면 이어서 현재 스레드에서 수행하는 스케쥴러.
- AndroidSchedulers.mainThread(): 안드로이드의 UI 스레드에서 동작.
- HandlerScheduler.from(handler) : 특정 핸들러 handler에 의존하여 동작.
2) observeOn, subscribeOn
스케쥴러는 subsctibeOn, observeOn 함수에서 각각 지정할 수 있다.
멀티스레드를 사용하여 여러가지 작업을 Observable 연산자로 묶어 수행하는 경우가 있다. 가령 백그라운드 스레드에서는 네트워크 작업, 많은 연산이 필요한 작업을 해야하고 화면에 보여주기 위해서는 메인 스레드에서 작업을 해야한다. 이 작업들은 Observable 연산자로 묶어 만들 수 있으므로 각각의 작업에 맞게 스레드 지정을 해야한다. 이 때 각각의 작업에 맞게 스레드 지정을 도와주는 메소드가 observeOn,subscribeOn이다. subscribeOn와 observeOn를 모두 지정하면 Observable에서 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다. RxJava 스케줄러의 핵심은 결국 제공되는 스케줄러의 종류를 선택한 후 subscribeOn과 observeOn 함수를 호출하는 것이다.
subscribeOn
Observable이 어느 스레드에서 작업할 지 지정. Observable에서 구독자가 subscribe 함수를 호출했을 때 데이터 흐름을 발생하는 스레드를 지정.
- Many implementations of ReactiveX use “Schedulers” to govern an Observable’s transitions between threads in a multi-threaded environment. You can instruct an Observable to do its work on a particular Scheduler by calling the Observable’s SubscribeOn operator.
observeOn
처리된 결과를 구독자에게 전달하는 스레드를 지정
- The ObserveOn operator is similar, but more limited. It instructs the Observable to send notifications to observers on a specified Scheduler.
그래서 무엇을 먼저써야하고, 같이 쓸 때는 어떤경우이며 사용하지 않으면 어느 스레드에서 작동할까?
subscribeOn은 Observable이 어떤 스레드에서 오퍼레이팅을 시작할지 정해주고 이후 추가 적인 오퍼레이션이나 스레드를 변경하고 싶을 시 observeOn을 사용하면 된다. observeOn로 작업마다 원하는 스레드로 변경할 수 있다.
subscribeOn은 여러번 호출되더라도 맨 처음 지정한 스레드를 고정시키므로 맨 처음 호출만 영향을 주며 어디에 위치하든 상관없다. 하지만 observeOn은 특정 작업의 스케쥴러를 변경할 수 있어 여러번 호출될 수 있으며 이후에 실행되는 연산에 영향을 주므로 위치가 중요하다.
subscribeOn만 호출하면 처음 지정한 스레드만 지정한 것이므로 Observable의 모든 흐름이 동일한 스레드에서 실행된다. 스케줄러를 별도로 지정하지 않으면 현재(main)스레드에서 동작을 실행한다.
위 마블다이어그램의 예시를 보자.
subscribeOn은 호출 시점에 상관없이 Observable이 어떤 스레드에서 오퍼레이팅을 시작할지 정해준다고 하였다. subscribeOn은 map 오퍼레이터 이후 호출되고 있지만, Observable이 오퍼레이팅을 시작하는 곳은 subscribeOn이 지정한 파란색 삼각형에서 시작하고 있다. 맨 처음 호출하는 observeOn에서는 주황색 삼각형으로 스레드를 작동하길 변경하였으므로 그 다음의 연산은 주황색 삼각형에서 실행되고 있다. 두 번째 observeOn에서는 분홍색 삼각형으로 스레드를 지정해주었고 그 이후의 Observable은 분홍색 삼각형에서 실행되고 있다. 이러한 특징으로 Observable 오퍼레이터들이 체이닝 되는 시점에 여러번 observeOn을 사용할 수 있고 그 이후의 스레드 흐름에 영향을 주는 것을 볼 수 있다.
코드로 예시를 보면,
Observable.just("print") //UI
.map (str -> str.size()) //UI
.observeOn(Schedulers.computation()) // 스레드변경
.map (size -> size * 2) //Schedulers.computation()에서 실행
.subscribe(num -> Log.d("num", num)) // Schedulers.computation()에서 실행
subscribe을 호출하면 먼저 Observable.just() 작업이 실행되고, 해당 작업 스레드는 subscribeOn을 지정해주지 않았으므로 main 스레드가 될 것이다. observeOn(Schedulers.computation()) 을 통해 작업 스레드는 Computation으로 스레드가 변경될 것이고 이 후 처리되는 size * 2 연산은 RxComputationThreadPool 스레드에서 일어날 것이다.
만약 subscribeOn을 두 번 사용한다면 어떻게 될까?
Observable.just("print")
.map(str -> str.size())
.subscribeOn(Schedulers.computation()) // Schedulers.computation()로 스레드 변경
.subscribeOn(Schedulers.io()) // Schedulers.io()로 변경되지 않음
.subscribe(number -> Log.d("", "Number " + number));
맨 처음 지정한 subscribeOn만 실질적으로 실행될 것이다. subscribeOn은 여러번 호출되더라도 맨 처음 지정한 스레드만 고정시킨다.
val observable: Observable<Int> = Observable.create { subscriber ->
// 해당 작업은 subscribeOn(AndroidSchedulers.mainThread()) 로 인해 main 쓰레드로 동작함
for (i in arrayOf(1, 2, 3, 4, 5)) {
Log.d(
"runRx mainThread",
Thread.currentThread().name + " : onNext " + i
)
subscriber.onNext(i)
if (i == 5) {
subscriber.onComplete()
}
}
}
observable
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io()) // 아래 작업은 observeOn(Schedulers.io()) 로 인해 RxCachedThreadScheduler 쓰레드로 동작함
.doOnNext { i ->
Log.d(
"runRx io",
Thread.currentThread().name + " : onNext " + i
)
}
.observeOn(Schedulers.computation()) // 아래 작업은 observeOn(Schedulers.computation()) 로 인해 RxComputationThreadPool 쓰레드로 동작함
.doOnNext { i ->
Log.d(
"runRx computation",
Thread.currentThread().name + " : onNext " + i
)
}
.observeOn(Schedulers.newThread()) // 아래 작업은 observeOn(Schedulers.newThread()) 로 인해 RxNewThreadScheduler 쓰레드로 동작함
.subscribe(object : Observer<Int?> {
override fun onSubscribe(d: Disposable) {
Log.d("runRx onSubscribe", Thread.currentThread().name)
}
override fun onNext(t: Int) {
Log.d("runRx onNext", Thread.currentThread().name)
}
override fun onError(e: Throwable) {
Log.d("runRx onError", Thread.currentThread().name)
}
override fun onComplete() {
Log.d("runRx onComplete", Thread.currentThread().name)
}
})
D/runRx onSubscribe: main
D/runRx mainThread: main : onNext 1
D/runRx mainThread: main : onNext 2
D/runRx mainThread: main : onNext 3
D/runRx mainThread: main : onNext 4
D/runRx mainThread: main : onNext 5
D/runRx io: RxCachedThreadScheduler-1 : onNext 1
D/runRx io: RxCachedThreadScheduler-1 : onNext 2
D/runRx io: RxCachedThreadScheduler-1 : onNext 3
D/runRx io: RxCachedThreadScheduler-1 : onNext 4
D/runRx io: RxCachedThreadScheduler-1 : onNext 5
D/runRx computation: RxComputationThreadPool-1 : onNext 1
D/runRx computation: RxComputationThreadPool-1 : onNext 2
D/runRx computation: RxComputationThreadPool-1 : onNext 3
D/runRx computation: RxComputationThreadPool-1 : onNext 4
D/runRx computation: RxComputationThreadPool-1 : onNext 5
D/runRx onNext: RxNewThreadScheduler-1
D/runRx onNext: RxNewThreadScheduler-1
D/runRx onComplete: RxNewThreadScheduler-1
실제 일반적으로 사용하는 방법은 별도의 스레드에서 작업 한 후 결과를 메인 스레드로 보여준다.
val observable: Observable<Int> = Observable.create { subscriber ->
for (i in arrayOf(1, 2, 3, 4, 5)) {
Log.d(
"runRx mainThread",
Thread.currentThread().name + " : onNext " + i
)
subscriber.onNext(i)
}
}
observable
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: Observer<Int?> {
override fun onSubscribe(d: Disposable) {
Log.d("runRx onSubscribe", Thread.currentThread().name)
}
override fun onNext(t: Int) {
Log.d("runRx onNext", Thread.currentThread().name)
}
override fun onError(e: Throwable) {
Log.d("runRx onError", Thread.currentThread().name)
}
override fun onComplete() {
Log.d("runRx onComplete", Thread.currentThread().name)
}
})
D/runRx onSubscribe: main
D/runRx mainThread: RxComputationThreadPool-1 : onNext 1
D/runRx mainThread: RxComputationThreadPool-1 : onNext 2
D/runRx mainThread: RxComputationThreadPool-1 : onNext 3
D/runRx mainThread: RxComputationThreadPool-1 : onNext 4
D/runRx mainThread: RxComputationThreadPool-1 : onNext 5
D/runRx onNext: main
D/runRx onNext: main
참고자료
'💤 RxJava' 카테고리의 다른 글
[RxJava] Single, Maybe and Completable (0) | 2023.09.12 |
---|---|
[RxJava] Observable 생성하기 (0) | 2023.09.12 |
[RxJava] Observable (0) | 2023.09.12 |
[RxJava] subscribe inside subscribe (0) | 2023.09.06 |
[RxJava] Rxjava + Retrofit = Error Handling (0) | 2023.03.31 |