오늘은 RxJava의 설명을 듣다 보면 빠지지 않는 마블 다이어그램과 옵저버를 알아보려 한다.
마블 다이어그램
이 마블 다이어그램은 반응형 프로그래밍에서 일어나는 비동기적인 데이터 흐름을 시각화한 도표이다.
이 마블 다이어그램을 읽는 방법을 알아보자.
읽는 방법
위 사진에서 설명하는 것처럼 읽으면 되는데, 위 사진에서 Observable을 언급한다.
이 Observable을 알아야 마블 다이어그램을 읽을 수 있을 것 같으니 한번 알아보자.
Observable
RxJava에서는 Observable을 구독하는 Observer가 존재하며, Observable이 순차적으로 발행하는 데이터에 대해서 반응한다.
이 Observable은 3가지의 이벤트를 사용하여 동작한다.
- onNext() : 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행한다.
- onComplete() : 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 더는 onNext() 호출이 발생하지 않음을 나타낸다.
- onError() : 오류가 발생했음을 Observer에 전달한다.
위 세 가지 이벤트 메서드들은 Emitter라는 인터페이스에 선언된다. 데이터 및 오류 내용을 발행할 때 null을 발행할 수 없으니 주의하자.
Observable 생성하기
RxJava에서는 연산자(Operator)라고 부르는 여러 정적 메서드를 통해 기존 데이터를 참조하거나 변형하여 Observable을 생성할 수도 있다. 여기서 소개한 연산자보다 훨씬 많은 연산자가 있으니 궁금하면 찾아보자.
create() 연산자
Observable.create()를 사용하면 Emitter를 사용하여 직접 아이템을 발행하고, 아이템 발행의 완료 및 오류의 알림을 직접 설정할 수 있다. 아래 예제를 살펴보자.
var s:Observable<String> = Observable.create {
it.onNext("Hello")
it.onNext("World")
it.onComplete()
}
s.subscribe(System.out::print)
아 참고로 Observable을 사용할 때 자동완성이 3개가 나오는데
빨간색으로 표시해둔 게 RxJava에서 지원하는 Observable이다.
위 코드에서 emitter를 통해 문자열 "Hello"와 "World"를 발행했다.
Observable을 구독하도록 subscribe() 메서드를 호출하여 Observer 또는 Consumer를 추가할 수 도 있다.
그리고 아이템의 발행이 끝났다면 반드시 onComplete()를 호출해야 한다. onComplete()를 호출 후에는 아이템이 추가로 발행하더라도 구독자는 데이터를 받지 못한다.
var s:Observable<String> = Observable.create {
it.onNext("Hello")
it.onComplete()
it.onNext("World")
}
s.subscribe(System.out::print)
즉, 위와 같이 코드를 작성하면 Hello만 출력이 된다는 소리이다.
또 만약 오류가 발생했을 시에는 Emitter를 통해 onError(Throwable)를 호출해야 하며, 구독자는 이를 적절히 처리해야만 한다.
var s:Observable<String> = Observable.create {
it.onNext("Hello")
it.onError(Throwable())
it.onNext("World")
}
s.subscribe(System.out::println) { e -> println(e) }
위 코드를 보면 중간에 에러를 발생하게 해 놓았다. 그다음 람다식을 이용해 subscribe의 에러를 처리하였다.
이렇게 간단한 예제들을 살펴보았다. 하지만 실제로는 create() 연산자는 개발자가 직접 Emitter를 제어하므로 주의하여 사용해야 한다. 예를 들어 Observable이 폐기되었을 때 등록된 콜백을 모두 해제하지 않으면 메모리 누수가 발생하고, BackPressure(배압)를 직접 처리해야 한다. 폐기와 배압은 다음에 다뤄보자.
just() 연산자
just() 연산자는 해당 아이템을 그대로 발행하는 Observable을 생성해 준다. just() 연산자의 인자로 넣은 아이템을 차례로 발행하며, 한 개의 아이템을 넣을 수도 있고, 타입이 같은 여러 개의 아이템을 넣을 수도 있다.
쉽게 이해하자면 아래 사진과 같다.
아무튼 코드를 보자.
var s:Observable<String> = Observable.just("Hello","World")
s.subscribe(System.out::println)
여기서 just의 아이템으로 null을 넣으면 에러가 난다. 그러니 만약 아이템이 없는 Observable을 만들고 싶다면, Observable.empty() 연산자를 사용하자.
간단히 Observable로 변환하기
이미 참조할 수 있는 배열 및 리스트 등의 자료 구조나 Future, Callable 또는 Publisher가 있다면, from으로 시작하는 연산자를 통해 Observable로 변환이 가능하다.
이 from과 관련된 메서드들을 알아보자.
연산자 이름 | 설명 |
fromArray() | 배열을 ObservableSource로 변환하여 아이템을 순차적으로 발행한다. |
fromIterable() | ArrayList, HashSet처럼 Iterable을 구현한 모든 객체를 ObservableSource로 변환하여 아이템을 순차적으로 발행한다. |
fromFurute() | Future 인터페이스를 지원하는 모든 객체를 ObservableSource로 변환하여 Future.get() 메서드를 호출한 값을 반환한다. |
fromPublisher() | Publisher를 Observable로 변환한다. |
fromCallable() | Callable을 Observable로 변환한다. |
fromArray() 연산자
가지고 있는 아이템들이 배열일 경우에는 fromArray() 연산자를 이용하여 아이템을 순차적으로 발행할 수 있다.
코드를 보자.
val a = arrayOf("A","B","C")
val s = Observable.fromArray(*a)
s.subscribe(System.out::println)
참고로 fromArray(*배열)을 하지 않고 a.toObservable을 하는 방법도 있다. 또한 val로 설정하지 않고 var로 설정하면 포인터가 안 찍힌다. (포인터가 없으면 배열의 주소 값을 출력함)
fromIterable() 연산자
Arraylist, HashSet 등과 같이 일반적으로 Iterable을 구현한 자료 구조 클래스는 fromIterable() 연산자를 통해 쉽게 Observable로 변환이 가능하다. 이것도 코드를 보자.
val a:ArrayList<String> = ArrayList()
a.add("AA")
a.add("BB")
a.add("CC")
val s = Observable.fromIterable(a)
s.subscribe(System.out::println)
fromFuture() 연산자
Future 인터페이스는 비동기적인 작업의 결과를 구할 때 사용한다. 보통 Executor Service를 통해 비동기적인 작업을 할 때 사용된다. Future 또한 fromFuture() 연산자를 통해 Observable로 변환이 가능하다. Emitter는 Observable 내부에서 Future.get() 메서드를 호출하고, Future의 작업이 끝나기 전까지 스레드는 블로킹된다.
그럼 코드를 보자.
val a = Executors.newSingleThreadExecutor()
.submit<String> { "Hello World" }
val s = Observable.fromFuture(a)
s.subscribe(System.out::println)
fromPublisher() 연산자
Publisher는 잠재적인 아이템 발행을 제공하는 생산자로 Subscriber로부터 요청을 받아 아이템을 발행한다. fromPublisher() 연산자를 통해 Publisher를 Obsevable로 변환할 수 있다.
코드를 보자.
val a = Publisher<String> {
it.onNext("AAA")
it.onNext("BBB")
it.onNext("CCC")
it.onComplete()
}
val s = Observable.fromPublisher(a)
s.subscribe(System.out::println)
fromCallable() 연산자
Callable 인터페이스는 비동기적인 실행 결과를 반환한다는 점이 Runnable과 다르다.
fromCallable() 연산자를 통해 Callable을 Observable로 변환하고, 비동기적으로 아이템을 발행할 수 있다.
코드를 보자.
val a = Callable{ "Hello World" }
val s = Observable.fromCallable(a)
s.subscribe(System.out::println)
이렇게 오늘은 마블 다이어그램을 보는 법과 Observable, Observable의 연산자들을 알아보았다. 아직 Observable이 끝난 것이 아니기에 더 공부해서 다음에 들고 오겠다.
'안드로이드 > RxJava' 카테고리의 다른 글
[안드로이드/Kotlin] RxJava - RxJava란? (0) | 2022.06.24 |
---|