RxJava Concurrency and Flowable - Threading and Backpressure
Source: Dev.to
Schedulers
// Unbounded worker thread pool, 재사용
Schedulers.io()
// CPU 코어 수만큼 제한된 스레드
Schedulers.computation()
// 항상 새 스레드 생성
Schedulers.newThread()
// 단일 스레드
Schedulers.single()
// 호출 스레드 (기본값)
Schedulers.trampoline()
// Custom Executor
val executor = Executors.newFixedThreadPool(2)
val scheduler = Schedulers.from(executor)
기본 사용 예시
Observable.range(1, 10)
.subscribeOn(Schedulers.computation()) // emit 스레드
.observeOn(Schedulers.io()) // 이후 연산 스레드
.subscribe { println(it) }
subscribeOn: 전체 subscription(emit 포함)에 사용할 스레드observeOn:observeOn이후의 연산에 사용할 스레드
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { updateUI(it) }
Custom Scheduler 사용 예시
val executor = Executors.newFixedThreadPool(2)
val scheduler = Schedulers.from(executor)
Observable.range(1, 10)
.subscribeOn(scheduler)
.subscribe {
runBlocking { delay(200) }
println("Observable1 Item Received $it - ${Thread.currentThread().name}")
}
Observable.range(21, 10)
.subscribeOn(scheduler)
.subscribe {
runBlocking { delay(100) }
println("Observable2 Item Received $it - ${Thread.currentThread().name}")
}
Flowable and Backpressure
Observable은 메모리 사용량이 급증할 수 있어, 대량 아이템을 처리할 때는 Flowable을 사용해 Backpressure를 관리합니다.
- 사용 시점: 10,000개 이상의 아이템 & 비동기 처리
- 특징: 128개의 요소를 emit하고 consumer가 처리할 때까지 대기 → 메모리 안전
Backpressure 전략
| 전략 | 설명 |
|---|---|
BackpressureStrategy.BUFFER (기본) | 버퍼에 쌓고 기다림 |
BackpressureStrategy.ERROR | MissingBackpressureException 발생 |
BackpressureStrategy.DROP | 버퍼 초과분 버림 |
BackpressureStrategy.LATEST | DROP과 같지만 마지막 값은 보존 |
BackpressureStrategy.MISSING | 커스터마이징 가능 |
Flowable 예시
Flowable.range(1, 1000)
.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe({
println("Received $it")
runBlocking { delay(50) }
}, { it.printStackTrace() })
Flowable.create with BUFFER
val flowable = Flowable.create({
for (i in 1..10) {
it.onNext(i)
}
it.onComplete()
}, BackpressureStrategy.BUFFER)
flowable.observeOn(Schedulers.io())
.subscribe(subscriber)
Converting Observable to Flowable
val source = Observable.range(1, 1000)
// 기본 BUFFER
source.toFlowable(BackpressureStrategy.BUFFER)
.subscribe { println(it) }
// MISSING + explicit buffer (동일 효과)
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureBuffer()
.subscribe { println(it) }
// 용량 제한 (20개 초과 시 ERROR)
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureBuffer(20)
.subscribe { println(it) }
// DROP with side‑effect
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureDrop { println("Dropped $it") }
.subscribe { println("Received $it") }
// LATEST
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureLatest()
.subscribe { println("Received $it") }
Flowable.generate 예시
object GenerateFlowableItem {
var item: Int = 0
get() {
field += 1
return field
}
}
val flowable = Flowable.generate {
it.onNext(GenerateFlowableItem.item)
}
flowable.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe {
runBlocking { delay(100) }
println("Next $it")
}
Connectable Flowable
val connectableFlowable = listOf("String 1", "String 2", "String 3")
.toFlowable()
.publish()
connectableFlowable.subscribe({
println("Subscription 1: $it")
runBlocking { delay(1000) }
println("Subscription 1 delay")
})
connectableFlowable.subscribe { println("Subscription 2: $it") }
connectableFlowable.connect()
Error Handling
기본 onErrorReturn
Observable.just(1, 2, 3, 4, 5)
.map { it / (3 - it) }
.onErrorReturn { -1 } // 에러 시 -1 반환
.subscribe { println("Received $it") }
onErrorResumeNext 로 다른 Observable 전환
Observable.just(1, 2, 3, 4, 5)
.map { it / (3 - it) }
.onErrorResumeNext(Observable.range(10, 5))
.subscribe { println("Received $it") }
재시도 (retry)
Observable.just(1, 2, 3, 4, 5)
.map { it / (3 - it) }
.retry(3) // 3번 재시도
.subscribeBy(
onNext = { println("Received $it") },
onError = { println("Error") }
)
Predicate 사용 예시
var retryCount = 0
Observable.just(1, 2, 3, 4, 5)
.map { it / (3 - it) }
.retry { _, _ -> (++retryCount) }
// 데이터 가져오기
Observable.just(resource)
}, { resource: Resource ->
// 리소스 해제
resource.close()
}).subscribe {
println("Resource Data ${it.data}")
}
Note:
Observable을MainActivity에서create하면, Observable이 반환되기 전까지MainActivity가 반환되지 않아 메모리 릭이 발생할 수 있습니다.subscribe()호출 시 다른 스레드에서 emit 작업 중에는 반환되지 않는 문제가 있습니다.
Originally published at