RxJava Android Integration - Retrofit, RxBinding, and Testing
Source: Dev.to
1️⃣ Gradle 의존성
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxkotlin:2.1.0'
implementation 'com.jakewharton.rxbinding2:rxbinding-kotlin:2.0.0' // UI 이벤트 바인딩
implementation "com.netflix.rxjava:rxjava-apache-http:0.20.7" // Apache HTTP Client
2️⃣ Retrofit + RxJava 설정
APIClient.kt
class APIClient {
private var retrofit: Retrofit? = null
enum class LogLevel {
LOG_NOT_NEEDED,
LOG_REQ_RES,
LOG_REQ_RES_BODY_HEADERS,
LOG_REQ_RES_HEADERS_ONLY
}
fun getClient(logLevel: LogLevel): Retrofit {
val interceptor = HttpLoggingInterceptor()
when (logLevel) {
LogLevel.LOG_NOT_NEEDED -> interceptor.level = HttpLoggingInterceptor.Level.NONE
LogLevel.LOG_REQ_RES -> interceptor.level = HttpLoggingInterceptor.Level.BASIC
LogLevel.LOG_REQ_RES_BODY_HEADERS -> interceptor.level = HttpLoggingInterceptor.Level.BODY
LogLevel.LOG_REQ_RES_HEADERS_ONLY -> interceptor.level = HttpLoggingInterceptor.Level.HEADERS
}
val client = OkHttpClient.Builder()
.connectTimeout(3, TimeUnit.MINUTES)
.writeTimeout(3, TimeUnit.MINUTES)
.readTimeout(3, TimeUnit.MINUTES)
.addInterceptor(interceptor)
.build()
if (retrofit == null) {
retrofit = Retrofit.Builder()
.baseUrl(Constants.BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // RxJava Adapter
.client(client)
.build()
}
return retrofit!!
}
fun getAPIService(logLevel: LogLevel = LogLevel.LOG_REQ_RES_BODY_HEADERS) =
getClient(logLevel).create(APIService::class.java)
}
APIService.kt
interface APIService {
@POST(Constants.GET_TODO_LIST)
fun getToDoList(): Observable
@POST(Constants.EDIT_TODO)
fun editTodo(@Body todo: String): Observable
@POST(Constants.ADD_TODO)
fun addTodo(@Body todo: String): Observable
}
3️⃣ API 호출 예시
private fun fetchTodoList() {
APIClient()
.getAPIService()
.getToDoList()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(
onNext = { response ->
adapter.setDataset(response.data)
},
onError = { e ->
e.printStackTrace()
}
)
}
4️⃣ UI 이벤트를 Observable 로 변환
클릭 이벤트
itemView.clicks()
.subscribeBy {
onClickTodoSubject.onNext(Pair(itemView, todoItem))
}
텍스트 변경 이벤트
textview.textChanges()
.subscribeBy { changedText ->
Log.d("Text Changed", changedText.toString())
}
FAB 클릭을 Subject 에 연결
fab.clicks()
.map { Pair(fab, "a") }
.subscribe(onClickTodoSubject)
5️⃣ Apache HTTP Client 사용 예시
val httpClient = HttpAsyncClients.createDefault()
httpClient.start()
ObservableHttp.createGet("http://example.com/api", httpClient)
.toObservable()
.flatMap { response ->
response.content.map { bytes -> String(bytes) }
}
.onErrorReturn { "Error Parsing data" }
.subscribe {
println(it)
httpClient.close()
}
6️⃣ 커스텀 연산자 – 시리얼 번호 부여
연산자 구현
class AddSerialNumber : ObservableOperator<T, T> {
private val counter = AtomicInteger()
override fun apply(observer: Observer<T>): Observer<T> {
return object : Observer<T> {
override fun onComplete() = observer.onComplete()
override fun onSubscribe(d: Disposable) = observer.onSubscribe(d)
override fun onError(e: Throwable) = observer.onError(e)
override fun onNext(t: T) {
observer.onNext(Pair(counter.incrementAndGet(), t))
}
}
}
}
사용 예시
Observable.range(10, 20)
.lift(AddSerialNumber())
.subscribeBy(
onNext = { println("Next $it") },
onError = { it.printStackTrace() },
onComplete = { println("Completed") }
)
확장 함수 형태
fun <T> Observable<T>.addSerialNumber(): Observable<Pair<Int, T>> =
lift(AddSerialNumber())
// 사용
Observable.range(10, 20)
.addSerialNumber()
.subscribe { println("Next $it") }
7️⃣ 스케줄러 관리 – ObservableTransformer
구현
class SchedulerManager(
private val subscribeScheduler: Scheduler,
private val observeScheduler: Scheduler
) : ObservableTransformer<Any, Any> {
override fun apply(upstream: Observable<Any>): ObservableSource<Any> =
upstream.subscribeOn(subscribeScheduler)
.observeOn(observeScheduler)
}
사용 예시
Observable.range(1, 10)
.map { println("map - ${Thread.currentThread().name} $it"); it }
.compose(SchedulerManager(Schedulers.computation(), Schedulers.io()))
.subscribe { println("onNext - ${Thread.currentThread().name} $it") }
확장 함수 형태
fun <T> Observable<T>.scheduler(
subscribeScheduler: Scheduler,
observeScheduler: Scheduler
): Observable<T> = compose(SchedulerManager(subscribeScheduler, observeScheduler))
// 사용
Observable.range(1, 10)
.scheduler(
subscribeScheduler = Schedulers.computation(),
observeScheduler = Schedulers.io()
)
.subscribe { println(it) }
8️⃣ 블로킹 연산 (blockingSubscribe, blockingFirst, blockingLast, blockingGet)
blockingSubscribe
val emissionsCount = AtomicInteger()
Observable.range(1, 10)
.subscribeOn(Schedulers.computation())
.blockingSubscribe { _ -> emissionsCount.incrementAndGet() }
assertEquals(10, emissionsCount.get())
blockingFirst & blockingLast
val observable = listOf(2, 10, 5, 6, 9, 8, 7, 1, 4, 3)
.toObservable()
.sorted()
val firstItem = observable.blockingFirst()
assertEquals(1, firstItem)
val lastItem = observable.blockingLast()
assertEquals(10, lastItem)
blockingGet (Monad)
// Single
val firstElement: Single<Int> = observable.first(0)
val firstItem = firstElement.blockingGet()
// Maybe
val maybeElement: Maybe<Int> = observable.firstElement()
val item = maybeElement.blockingGet()
📚 정리
- Retrofit + RxJava2 로 네트워크 레이어를 구성하고,
Observable로 API 응답을 받아 UI 스레드에서 처리합니다. - RxBinding 을 활용해 클릭·텍스트 변경 등 UI 이벤트를
Observable로 변환합니다. - Apache HTTP Client 와
rxjava-apache-http를 사용하면 기존HttpAsyncClient를 Rx 형태로 감쌀 수 있습니다. - 커스텀 연산자와 확장 함수 로 재사용성을 높이고, SchedulerManager 로 스레드 전환 로직을 깔끔하게 캡슐화합니다.
- 필요 시 blocking 연산을 이용해 동기식 테스트 혹은 초기값 획득에 활용합니다.
이제 위 예시들을 프로젝트에 적용해 보세요! 🚀
Reactive Streams Examples (Kotlin & Java)
1. blockingIterable
val list = listOf(2, 10, 5, 6, 9, 8, 7, 1, 4, 3)
val observable = list.toObservable().sorted()
val iterable = observable.blockingIterable()
assertEquals(list.sorted(), iterable.toList())
2. blockingForEach (prevent OOM)
// Even‑number filter
val list = listOf(
2, 10, 5, 6, 9, 8, 7, 1, 4, 3,
12, 20, 15, 16, 19, 18, 17, 11, 14, 13
)
val observable = list.toObservable().filter { it % 2 == 0 }
observable.blockingForEach { item ->
assertTrue { item % 2 == 0 }
}
3. Using TestObserver
val list = listOf(
2, 10, 5, 6, 9, 8, 7, 1, 4, 3,
12, 20, 15, 16, 19, 18, 17, 11, 14, 13
)
val observable = list.toObservable().sorted()
val testObserver = TestObserver<Int>()
observable.subscribe(testObserver)
testObserver.assertSubscribed()
testObserver.awaitTerminalEvent() // block until completion
testObserver.assertNoErrors()
testObserver.assertComplete()
testObserver.assertValueCount(20)
testObserver.assertValues(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20
)
4. Time‑based testing with TestScheduler
val testScheduler = TestScheduler()
val observable = Observable.interval(5, TimeUnit.MINUTES, testScheduler)
val testObserver = TestObserver<Long>()
observable.subscribe(testObserver)
testObserver.assertSubscribed()
testObserver.assertValueCount(0)
testScheduler.advanceTimeBy(100, TimeUnit.MINUTES)
testObserver.assertValueCount(20)
testScheduler.advanceTimeBy(400, TimeUnit.MINUTES)
testObserver.assertValueCount(100)
Note: Requires Java 8+ and Android SDK 26+.
compile 'io.projectreactor:reactor-core:3.1.1.RELEASE'
5. Reactor Flux & Mono examples (Kotlin)
val flux = Flux.just("Item 1", "Item 2", "Item 3")
flux.subscribe(object : Consumer<String> {
override fun accept(item: String) {
println("Got Next $item")
}
})
val consumer = object : Consumer<String> {
override fun accept(item: String) {
println("Got $item")
}
}
val emptyMono = Mono.empty<String>()
emptyMono.log().subscribe(consumer)
val monoWithData = Mono.justOrEmpty("A String")
monoWithData.log().subscribe(consumer)
val monoByExtension = "Another String".toMono()
monoByExtension.log().subscribe(consumer)
*Originally published at *