RxJava Android Integration - Retrofit, RxBinding, and Testing

Published: (December 30, 2025 at 10:25 PM EST)
5 min read
Source: Dev.to

Source: Dev.to

1️⃣ Gradle 의존성

implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxkotlin:2.1.0'
implementation 'com.jakewharton.rxbinding2:rxbinding-kotlin:2.0.0'   // UI 이벤트 바인딩
implementation "com.netflix.rxjava:rxjava-apache-http:0.20.7"   // Apache HTTP Client

2️⃣ Retrofit + RxJava 설정

APIClient.kt

class APIClient {
    private var retrofit: Retrofit? = null

    enum class LogLevel {
        LOG_NOT_NEEDED,
        LOG_REQ_RES,
        LOG_REQ_RES_BODY_HEADERS,
        LOG_REQ_RES_HEADERS_ONLY
    }

    fun getClient(logLevel: LogLevel): Retrofit {
        val interceptor = HttpLoggingInterceptor()
        when (logLevel) {
            LogLevel.LOG_NOT_NEEDED -> interceptor.level = HttpLoggingInterceptor.Level.NONE
            LogLevel.LOG_REQ_RES -> interceptor.level = HttpLoggingInterceptor.Level.BASIC
            LogLevel.LOG_REQ_RES_BODY_HEADERS -> interceptor.level = HttpLoggingInterceptor.Level.BODY
            LogLevel.LOG_REQ_RES_HEADERS_ONLY -> interceptor.level = HttpLoggingInterceptor.Level.HEADERS
        }

        val client = OkHttpClient.Builder()
            .connectTimeout(3, TimeUnit.MINUTES)
            .writeTimeout(3, TimeUnit.MINUTES)
            .readTimeout(3, TimeUnit.MINUTES)
            .addInterceptor(interceptor)
            .build()

        if (retrofit == null) {
            retrofit = Retrofit.Builder()
                .baseUrl(Constants.BASE_URL)
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())   // RxJava Adapter
                .client(client)
                .build()
        }

        return retrofit!!
    }

    fun getAPIService(logLevel: LogLevel = LogLevel.LOG_REQ_RES_BODY_HEADERS) =
        getClient(logLevel).create(APIService::class.java)
}

APIService.kt

interface APIService {
    @POST(Constants.GET_TODO_LIST)
    fun getToDoList(): Observable

    @POST(Constants.EDIT_TODO)
    fun editTodo(@Body todo: String): Observable

    @POST(Constants.ADD_TODO)
    fun addTodo(@Body todo: String): Observable
}

3️⃣ API 호출 예시

private fun fetchTodoList() {
    APIClient()
        .getAPIService()
        .getToDoList()
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeBy(
            onNext = { response ->
                adapter.setDataset(response.data)
            },
            onError = { e ->
                e.printStackTrace()
            }
        )
}

4️⃣ UI 이벤트를 Observable 로 변환

클릭 이벤트

itemView.clicks()
    .subscribeBy {
        onClickTodoSubject.onNext(Pair(itemView, todoItem))
    }

텍스트 변경 이벤트

textview.textChanges()
    .subscribeBy { changedText ->
        Log.d("Text Changed", changedText.toString())
    }

FAB 클릭을 Subject 에 연결

fab.clicks()
    .map { Pair(fab, "a") }
    .subscribe(onClickTodoSubject)

5️⃣ Apache HTTP Client 사용 예시

val httpClient = HttpAsyncClients.createDefault()
httpClient.start()

ObservableHttp.createGet("http://example.com/api", httpClient)
    .toObservable()
    .flatMap { response ->
        response.content.map { bytes -> String(bytes) }
    }
    .onErrorReturn { "Error Parsing data" }
    .subscribe {
        println(it)
        httpClient.close()
    }

6️⃣ 커스텀 연산자 – 시리얼 번호 부여

연산자 구현

class AddSerialNumber : ObservableOperator<T, T> {
    private val counter = AtomicInteger()

    override fun apply(observer: Observer<T>): Observer<T> {
        return object : Observer<T> {
            override fun onComplete() = observer.onComplete()
            override fun onSubscribe(d: Disposable) = observer.onSubscribe(d)
            override fun onError(e: Throwable) = observer.onError(e)
            override fun onNext(t: T) {
                observer.onNext(Pair(counter.incrementAndGet(), t))
            }
        }
    }
}

사용 예시

Observable.range(10, 20)
    .lift(AddSerialNumber())
    .subscribeBy(
        onNext = { println("Next $it") },
        onError = { it.printStackTrace() },
        onComplete = { println("Completed") }
    )

확장 함수 형태

fun <T> Observable<T>.addSerialNumber(): Observable<Pair<Int, T>> =
    lift(AddSerialNumber())

// 사용
Observable.range(10, 20)
    .addSerialNumber()
    .subscribe { println("Next $it") }

7️⃣ 스케줄러 관리 – ObservableTransformer

구현

class SchedulerManager(
    private val subscribeScheduler: Scheduler,
    private val observeScheduler: Scheduler
) : ObservableTransformer<Any, Any> {
    override fun apply(upstream: Observable<Any>): ObservableSource<Any> =
        upstream.subscribeOn(subscribeScheduler)
                .observeOn(observeScheduler)
}

사용 예시

Observable.range(1, 10)
    .map { println("map - ${Thread.currentThread().name} $it"); it }
    .compose(SchedulerManager(Schedulers.computation(), Schedulers.io()))
    .subscribe { println("onNext - ${Thread.currentThread().name} $it") }

확장 함수 형태

fun <T> Observable<T>.scheduler(
    subscribeScheduler: Scheduler,
    observeScheduler: Scheduler
): Observable<T> = compose(SchedulerManager(subscribeScheduler, observeScheduler))

// 사용
Observable.range(1, 10)
    .scheduler(
        subscribeScheduler = Schedulers.computation(),
        observeScheduler = Schedulers.io()
    )
    .subscribe { println(it) }

8️⃣ 블로킹 연산 (blockingSubscribe, blockingFirst, blockingLast, blockingGet)

blockingSubscribe

val emissionsCount = AtomicInteger()
Observable.range(1, 10)
    .subscribeOn(Schedulers.computation())
    .blockingSubscribe { _ -> emissionsCount.incrementAndGet() }

assertEquals(10, emissionsCount.get())

blockingFirst & blockingLast

val observable = listOf(2, 10, 5, 6, 9, 8, 7, 1, 4, 3)
    .toObservable()
    .sorted()

val firstItem = observable.blockingFirst()
assertEquals(1, firstItem)

val lastItem = observable.blockingLast()
assertEquals(10, lastItem)

blockingGet (Monad)

// Single
val firstElement: Single<Int> = observable.first(0)
val firstItem = firstElement.blockingGet()

// Maybe
val maybeElement: Maybe<Int> = observable.firstElement()
val item = maybeElement.blockingGet()

📚 정리

  • Retrofit + RxJava2 로 네트워크 레이어를 구성하고, Observable 로 API 응답을 받아 UI 스레드에서 처리합니다.
  • RxBinding 을 활용해 클릭·텍스트 변경 등 UI 이벤트를 Observable 로 변환합니다.
  • Apache HTTP Clientrxjava-apache-http 를 사용하면 기존 HttpAsyncClient 를 Rx 형태로 감쌀 수 있습니다.
  • 커스텀 연산자와 확장 함수 로 재사용성을 높이고, SchedulerManager 로 스레드 전환 로직을 깔끔하게 캡슐화합니다.
  • 필요 시 blocking 연산을 이용해 동기식 테스트 혹은 초기값 획득에 활용합니다.

이제 위 예시들을 프로젝트에 적용해 보세요! 🚀

Reactive Streams Examples (Kotlin & Java)

1. blockingIterable

val list = listOf(2, 10, 5, 6, 9, 8, 7, 1, 4, 3)
val observable = list.toObservable().sorted()
val iterable = observable.blockingIterable()
assertEquals(list.sorted(), iterable.toList())

2. blockingForEach (prevent OOM)

// Even‑number filter
val list = listOf(
    2, 10, 5, 6, 9, 8, 7, 1, 4, 3,
    12, 20, 15, 16, 19, 18, 17, 11, 14, 13
)
val observable = list.toObservable().filter { it % 2 == 0 }
observable.blockingForEach { item ->
    assertTrue { item % 2 == 0 }
}

3. Using TestObserver

val list = listOf(
    2, 10, 5, 6, 9, 8, 7, 1, 4, 3,
    12, 20, 15, 16, 19, 18, 17, 11, 14, 13
)
val observable = list.toObservable().sorted()
val testObserver = TestObserver<Int>()

observable.subscribe(testObserver)

testObserver.assertSubscribed()
testObserver.awaitTerminalEvent()   // block until completion
testObserver.assertNoErrors()
testObserver.assertComplete()
testObserver.assertValueCount(20)
testObserver.assertValues(
    1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
    11, 12, 13, 14, 15, 16, 17, 18, 19, 20
)

4. Time‑based testing with TestScheduler

val testScheduler = TestScheduler()
val observable = Observable.interval(5, TimeUnit.MINUTES, testScheduler)
val testObserver = TestObserver<Long>()

observable.subscribe(testObserver)
testObserver.assertSubscribed()
testObserver.assertValueCount(0)

testScheduler.advanceTimeBy(100, TimeUnit.MINUTES)
testObserver.assertValueCount(20)

testScheduler.advanceTimeBy(400, TimeUnit.MINUTES)
testObserver.assertValueCount(100)

Note: Requires Java 8+ and Android SDK 26+.

compile 'io.projectreactor:reactor-core:3.1.1.RELEASE'

5. Reactor Flux & Mono examples (Kotlin)

val flux = Flux.just("Item 1", "Item 2", "Item 3")
flux.subscribe(object : Consumer<String> {
    override fun accept(item: String) {
        println("Got Next $item")
    }
})

val consumer = object : Consumer<String> {
    override fun accept(item: String) {
        println("Got $item")
    }
}

val emptyMono = Mono.empty<String>()
emptyMono.log().subscribe(consumer)

val monoWithData = Mono.justOrEmpty("A String")
monoWithData.log().subscribe(consumer)

val monoByExtension = "Another String".toMono()
monoByExtension.log().subscribe(consumer)

*Originally published at *

Back to Blog

Related posts

Read more »

RxJava Fundamentals - Reactive Programming on Android

RxJava의 기본 개념과 안드로이드에서의 활용법을 알아봅니다. RxJava란? RxJava는 Reactive + Functional Programming을 결합한 라이브러리입니다. 핵심 개념 - 데이터와 처리를 분리하고, 데이터는 처리에 푸시만 합니다. - Threading을 라이브러...

ProGuard Android Guide - Code Shrinking and Obfuscation

Android ProGuard 설정과 문제 해결 방법 Shrinking - 사용되지 않는 메서드, 클래스, 필드 등을 제거합니다. Optimizing - 메서드 바이트코드 최적화 및 인라인inline 처리를 수행합니다. Obfuscating - 이름을 의미 없는 짧은 문자열로 변경하여...