RxJava Concurrency and Flowable - Threading and Backpressure

Published: (December 30, 2025 at 10:24 PM EST)
4 min read
Source: Dev.to

Source: Dev.to

Schedulers

// Unbounded worker thread pool, 재사용
Schedulers.io()

// CPU 코어 수만큼 제한된 스레드
Schedulers.computation()

// 항상 새 스레드 생성
Schedulers.newThread()

// 단일 스레드
Schedulers.single()

// 호출 스레드 (기본값)
Schedulers.trampoline()

// Custom Executor
val executor = Executors.newFixedThreadPool(2)
val scheduler = Schedulers.from(executor)

기본 사용 예시

Observable.range(1, 10)
    .subscribeOn(Schedulers.computation())  // emit 스레드
    .observeOn(Schedulers.io())             // 이후 연산 스레드
    .subscribe { println(it) }
  • subscribeOn: 전체 subscription(emit 포함)에 사용할 스레드
  • observeOn: observeOn 이후의 연산에 사용할 스레드
observable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { updateUI(it) }

Custom Scheduler 사용 예시

val executor = Executors.newFixedThreadPool(2)
val scheduler = Schedulers.from(executor)

Observable.range(1, 10)
    .subscribeOn(scheduler)
    .subscribe {
        runBlocking { delay(200) }
        println("Observable1 Item Received $it - ${Thread.currentThread().name}")
    }

Observable.range(21, 10)
    .subscribeOn(scheduler)
    .subscribe {
        runBlocking { delay(100) }
        println("Observable2 Item Received $it - ${Thread.currentThread().name}")
    }

Flowable and Backpressure

Observable은 메모리 사용량이 급증할 수 있어, 대량 아이템을 처리할 때는 Flowable을 사용해 Backpressure를 관리합니다.

  • 사용 시점: 10,000개 이상의 아이템 & 비동기 처리
  • 특징: 128개의 요소를 emit하고 consumer가 처리할 때까지 대기 → 메모리 안전

Backpressure 전략

전략설명
BackpressureStrategy.BUFFER (기본)버퍼에 쌓고 기다림
BackpressureStrategy.ERRORMissingBackpressureException 발생
BackpressureStrategy.DROP버퍼 초과분 버림
BackpressureStrategy.LATESTDROP과 같지만 마지막 값은 보존
BackpressureStrategy.MISSING커스터마이징 가능

Flowable 예시

Flowable.range(1, 1000)
    .map { MyItem(it) }
    .observeOn(Schedulers.io())
    .subscribe({
        println("Received $it")
        runBlocking { delay(50) }
    }, { it.printStackTrace() })

Flowable.create with BUFFER

val flowable = Flowable.create({
    for (i in 1..10) {
        it.onNext(i)
    }
    it.onComplete()
}, BackpressureStrategy.BUFFER)

flowable.observeOn(Schedulers.io())
    .subscribe(subscriber)

Converting Observable to Flowable

val source = Observable.range(1, 1000)

// 기본 BUFFER
source.toFlowable(BackpressureStrategy.BUFFER)
    .subscribe { println(it) }

// MISSING + explicit buffer (동일 효과)
source.toFlowable(BackpressureStrategy.MISSING)
    .onBackpressureBuffer()
    .subscribe { println(it) }

// 용량 제한 (20개 초과 시 ERROR)
source.toFlowable(BackpressureStrategy.MISSING)
    .onBackpressureBuffer(20)
    .subscribe { println(it) }

// DROP with side‑effect
source.toFlowable(BackpressureStrategy.MISSING)
    .onBackpressureDrop { println("Dropped $it") }
    .subscribe { println("Received $it") }

// LATEST
source.toFlowable(BackpressureStrategy.MISSING)
    .onBackpressureLatest()
    .subscribe { println("Received $it") }

Flowable.generate 예시

object GenerateFlowableItem {
    var item: Int = 0
        get() {
            field += 1
            return field
        }
}

val flowable = Flowable.generate {
    it.onNext(GenerateFlowableItem.item)
}

flowable.map { MyItem(it) }
    .observeOn(Schedulers.io())
    .subscribe {
        runBlocking { delay(100) }
        println("Next $it")
    }

Connectable Flowable

val connectableFlowable = listOf("String 1", "String 2", "String 3")
    .toFlowable()
    .publish()

connectableFlowable.subscribe({
    println("Subscription 1: $it")
    runBlocking { delay(1000) }
    println("Subscription 1 delay")
})

connectableFlowable.subscribe { println("Subscription 2: $it") }

connectableFlowable.connect()

Error Handling

기본 onErrorReturn

Observable.just(1, 2, 3, 4, 5)
    .map { it / (3 - it) }
    .onErrorReturn { -1 }  // 에러 시 -1 반환
    .subscribe { println("Received $it") }

onErrorResumeNext 로 다른 Observable 전환

Observable.just(1, 2, 3, 4, 5)
    .map { it / (3 - it) }
    .onErrorResumeNext(Observable.range(10, 5))
    .subscribe { println("Received $it") }

재시도 (retry)

Observable.just(1, 2, 3, 4, 5)
    .map { it / (3 - it) }
    .retry(3)  // 3번 재시도
    .subscribeBy(
        onNext = { println("Received $it") },
        onError = { println("Error") }
    )

Predicate 사용 예시

var retryCount = 0
Observable.just(1, 2, 3, 4, 5)
    .map { it / (3 - it) }
    .retry { _, _ -> (++retryCount) }
// 데이터 가져오기
Observable.just(resource)
}, { resource: Resource ->
    // 리소스 해제
    resource.close()
}).subscribe {
    println("Resource Data ${it.data}")
}

Note: ObservableMainActivity에서 create 하면, Observable이 반환되기 전까지 MainActivity가 반환되지 않아 메모리 릭이 발생할 수 있습니다. subscribe() 호출 시 다른 스레드에서 emit 작업 중에는 반환되지 않는 문제가 있습니다.

Originally published at

Back to Blog

Related posts

Read more »

RxJava Fundamentals - Reactive Programming on Android

RxJava의 기본 개념과 안드로이드에서의 활용법을 알아봅니다. RxJava란? RxJava는 Reactive + Functional Programming을 결합한 라이브러리입니다. 핵심 개념 - 데이터와 처리를 분리하고, 데이터는 처리에 푸시만 합니다. - Threading을 라이브러...

ProGuard Android Guide - Code Shrinking and Obfuscation

Android ProGuard 설정과 문제 해결 방법 Shrinking - 사용되지 않는 메서드, 클래스, 필드 등을 제거합니다. Optimizing - 메서드 바이트코드 최적화 및 인라인inline 처리를 수행합니다. Obfuscating - 이름을 의미 없는 짧은 문자열로 변경하여...