-
RxJS 기본개념 정리RxJS 2021. 4. 26. 11:56
@ 이 문서는 RxJS 6 버전을 기준으로 작성하였습니다.
@ 작성 시점에서는 7 버전 (베타)가 있습니다. (마스터 브랜치) https://github.com/ReactiveX/rxjs
@ 6 버전은 6.x 브랜치로 이동하세요
@ 7 버전이 공식적으로 배포되기 전에 6 버전에 대해 알아두면 7 버전 학습에 도움이 될 것 같아 학습해봤습니다.
1] RxJS에서 말하는 개념
싱글 멀티플 pull 함수 (Function) 이터레이터 (iterator) push 프로미스 (Promise) 옵저버블 (Observable) 싱글: 하나의 값이나 이벤트를 다루는 것
멀티플: 여러 개의 값이나 이벤트를 다루는 것
pull: 데이터를 받을지 결정하는 것
push: 데이터를 보낼지 결정하는 것
2] 옵저버블의 라이프 사이클
1. 옵저버블 생성 (Creating Observables)
2. 옵저버블 구독 (Subscribing to Observables)
3. 옵저버블 실행 (Executing the Observable)
4. 옵저버블 구독 해제 (Disposing Observables)
3] 옵저버블
옵저버블의 생성은 Observable.create나 new Observable로 생성할 수 있다
import rxjs from 'rxjs' const Observable1$ = Observable.create() const Observable2$ = new Observable()
구독 (subscribe)
구독은 생성된 옵저버블 인스턴스에 있는 subscribe 함수를 호출해 옵저버블을 구독하고 실행한다.
반환 값은 Subscription 객체이다
구독 해제(unsubscribe)
구독을 해제한다. => 더 이상 옵저버블의 실행을 하지 않고 멈춘다.
옵저버 (Observer)
옵저버는 next, error, complete라는 3가지 함수로 구성된 객체이다
옵저버블은 각 연산자를 거쳐 subscribe 함수 안 옵저버로 값을 전달한다
subscribe 안 함수 각각을 사용해 옵저버 객체를 생성하고 이 옵저버 객체로 함수 각각을 호출해 값을 발행한다
4] 서브젝트 (Subject)
서브젝트는 멀티캐스팅을 지원하는 객체이다
멀티캐스팅이란 여러 옵저버가 이벤트 변경이나 값 전달을 관찰하도록 옵저버블을 구독한 후 실제 이벤트 변경이나 값 전달이 발생했을 때 이를 알린다는 뜻이다. => 구독 중인 모든 옵저버가 호출되어 같은 값을 전달받는다는 뜻
또한 서브젝트는 옵저버블이면서 옵저버 역할도 한다
옵저버블이므로 어러 옵저버가 옵저버블을 구독할 수 있고, 옵저버이기도 하므로 next, error, complete 함수를 호출해 같은 결과를 전달받을 수 있다.
import { Subject } from 'rxjs' const subject = new Subject() subject.subscribe({ next: function(v) { console.log(`observer A: ${v}`) } }) subject.subscribe({ next: function(v) { console.log(`observer B: ${v}`) } }) subject.next(1) subject.next(2) /* observer A: 1 observer B: 1 observer A: 2 observer B: 2 */
5] 연산자 (Operator)
JavaScript의 map, filter와 같은 메소드처럼 RxJS에서도 옵저버블에 대한 연산 메소드를 지원한다
다만 옵저버블에만 적용할 수 있고 사용하려면 옵저버블을 생성해야 한다.
import { range, pipe } from 'rxjs' import { filter } from 'rxjs/operators' // range도 옵저버블! // 아래 range 함수는 RxJS에서 range 함수를 정의한 모듈 파일에서 가져온 것이다. 옵저버블을 반환함 // export declare function range(start?: number, count?: number, scheduler?: SchedulerLike): Observable<number>; range(1, 10).pipe(filter(v => v % 2 == 0)).subscribe(x => console.log(x)) /* 2 4 6 8 10 */
// 생성 함수 (creation function) /* RxJS 5에서는 옵저버블의 정적 연산자와 인스턴스 연산자를 따로 구분했지만, 6 버전에서는 이러한 개념이 사라졌습니다. 따라서, 정적 연산자처럼 옵저버블을 생성하는 함수를 정의해야 합니다. 이러한 함수를 생성 함수라 합니다. */
pipable operator
생성 함수로 만들어진 옵저버블 인스턴스를 pipe 함수 안에서 다룰 수 있는 연산자
기본적으로 rxjs/operators 안에 정의되어 있다.
6] 스케줄러
옵저버가 옵저버블을 구독할 때 어떤 순서로 어떻게(동기/비동기 등) 실행할지 실행 컨텍스트를 관리하는 역할의 자료구조다
ex)
-> setTimeout, setInterval 함수 또는 마이크로 큐를 이용해 실행하는 asapScheduler, asyncScheduler
-> 동기 방식: (Trampoline) 방식으로 큐를 사용하는 queueScheduler (재귀 방식으로 구현했다면 콜 스택을 사용하지 않고 큐를 이용해 반복적으로 해제하는 방식을 지원) => 따라서 재귀 호출에서 발생할 수 있는 스택 오버플로우를 방지할 수 있다.
7] 마블 다이어그램
공식 문서에서는 옵저버블의 연산자를 설명하는 마블 다이어그램(Marble Diagram)이라는 그림을 제공한다.
다만 5 버전 기준이니 마블 다이어그램을 보는 방법을 익혀두면 좋을 것 같다.
마블 다이어그램 링크: https://rxmarbles.com/
1. 위 쪽의 가로줄은 시간에 따라 next함수에서 발행하는 값들을 표시하는데 입력 옵저버블이라고도 한다. 2. 구슬처럼 생긴 원 안에는 실제 값이 들어있다. 3. | 처럼 생긴 것은 complete함수가 호출되어 구독이 완료되었음을 알린다. 4. X 처럼 생긴건 에러가 발생하여 옵저버블 실행이 종료했다라는 뜻이다. 5. 위 쪽 가로줄 아래에 있는 사각형은 연산자를 가리킨다. 6. 가운데 연산자 아래에 있는 옵저버블은 출력 옵저버블이라고도 하며 각 값들은 색, 숫자로 구분해서 어떤 입력값으로 어떤 값을 발행했는지 구분할 수 있게 해주는데 동그라미, 세모, 네모 다양한 도형을 사용해서 구분하기도 한다.
8. 생성 함수
create
새로운 옵저버블을 만드는 함수이고 Observable의 정적 메서드이다.
import { Observable } from 'rxjs' const observer = (obs) => { for (let i = 0; i < 10; i++) { obs.next(i) } obs.complete() // => obs.complete나 obs.error 함수를 실행하면 구독이 해제되기 때문에 아래의 code... 부분은 실행되지 않는다. // code... return () => console.log(`unsubscribed`) // return 부분은 구독이 해제되었을 때 실행된다. } new Observable(observer).subscribe( (v) => console.log(`next ${v}`), (err) => console.error(err), () => console.log(`completed`) ) Observable.create(observer).subscribe( (v) => console.log(`next ${v}`), (err) => console.error(err), () => console.log(`completed`) ) // new 키워드로 만드나 정적 메서드인 create로 만드나 결과는 동일하다. // next 1 // next 2 // next 3 // . // . // . // next 8 // next 9 // completed // unsubscribed
of
순차적으로 값을 발행하는 옵저버블을 생성한다.
마지막 인자의 타입이 스케줄러이면 옵저버블의 next 함수를 비동기로 처리한다.
// of.d.ts export decleare function of<T>(...args: (T | SchedulerLike)[]): Observable<T>; import { of } from 'rxjs' of(1, 2, 3, Error('no').message).subscribe( (v) => console.log(`next ${v}`), (err) => console.error(err), () => console.log(`completed`) ) // next 1 // next 2 // next 3 // next no // completed
from
공식문서와 책의 설명이 조금 다른듯 한데...
책에서는 옵저버블로 변환할 수 있을 만한 객체들을 옵저버블로 변환하는 함수이고 지원하는 객체는 다음과 같다.(Observable, Array, Promise, Iterable, string, ArrayLike)
라고 되어있고
RxJS 공식 문서에 따르면 'Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object' 라고 되어있다.
그러나 ReactiveX 공식문서에서는 (http://reactivex.io/documentation/ko/operators.html) '다른 객체나 자료 구조를 Observable로 변환한다' 라고 소개가 되어있다. 아무래도 JavaScript만 있는게 아니다보니 이렇게 설명한 것 같기도 하고...
실제로 해보면 RxJS 공식 문서의 설명된 것과 달리 여러 타입을 섞어도 정상적으로 출력이 된다.
사실 배열 내부가 아닌 배열 자체를 이야기하는 것일까? 라는 생각도 든다.
// from.d.ts ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>; ObservedValueOf<O> = O extends ObservableInput<infer T> ? T : never; from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>; import { from } from 'rxjs' from([1, '2', window.document.body, () => console.log(0), false, { name: 1 }]).subscribe( (v) => console.log(`next ${v}`), (err) => console.error(err), () => console.log(`completed`) ) // next 1 // next 2 // next [object HTMLBodyElement] // next function () { // return console.log(0); // } // next false // next [object Object] // completed
fromEvent
Nodejs의 옵저버 패턴 구현인 EventEmitter 클래스의 객체와 조합하거나 브라우저에서 발생하는 이벤트를 옵저버블로 바꿀 때 사용한다.
import { fromEvent } from 'rxjs' fromEvent(document.getElementById('app'), 'click').subscribe( (v) => console.log(v.currentTarget), (err) => console.error(err), () => console.log(`completed`) ) // <div id="app">...</div>
defer
팩토리 함수로 옵저버블을 만들어서 구독하는 시점에 팩토리 함수를 호출해 만들어둔 옵저버블을 리턴받아 구독한다.
from함수는 프로미스를 생성해 바로 동작시키지만 defer함수는 옵저버블을 구독하는 시점에 프로미스를 생성할 수 있으므로 동작 시점을 옵저버블의 구독 시점으로 조절할 수 있다.
// types.d.ts export declare type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>; /** OBSERVABLE INTERFACES */ export interface Subscribable<T> { subscribe(observer?: PartialObserver<T>): Unsubscribable; /** @deprecated Use an observer instead of a complete callback */ subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable; /** @deprecated Use an observer instead of an error callback */ subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Unsubscribable; /** @deprecated Use an observer instead of a complete callback */ subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Unsubscribable; subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable; } export declare type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>; /** @deprecated use {@link InteropObservable } */ export declare type ObservableLike<T> = InteropObservable<T>; export declare type InteropObservable<T> = { [Symbol.observable]: () => Subscribable<T>; }; // defer.d.ts export declare function defer<R extends ObservableInput<any> | void>(observableFactory: () => R): Observable<ObservedValueOf<R>>;
from vs defer 함수 선택
from: 프로미스 안 구현 부분이 언제 실행되든 상관없을 때, 이미 실행 중이거나 완료한 프로미스를 옵저버블로 만들 때
defer: 옵저버블을 구독하는 시점에 프로미스를 생성하여 프로미스 안 구현 부분이 실행되어야 할 때, 프로미스 실행 시점(프로미스 객체 생성 시즘)이 구독하는 시점이어야 할 때
range
시작하고자 하는 숫자부터 몇 번 더 값을 정하면 그만큼 값을 발행하고 숫자는 1씩 증가한다.
// range import { range } from 'rxjs' range(1, 3).subscribe( (value) => console.log(`next! ${value}`), (err) => console.log(`err! ${err}`), () => console.log(`completed!`) ) // next! 1 // next! 2 // next! 3 // completed! // range.d.ts (export declare function이 함수 앞에 붙어있음) range(start?: number, count?: number, scheduler?: SchedulerLike): Observable<number>;
시간과 관련된 생성 함수
interval
JavaScript의 setInterval과 비슷하다.
일정한 간격마다 값을 발행하는데 0부터 순차적으로 증가한다.
setInterval을 clearInterval로 제거하지 않으면 무한하게 반복되듯이 interval도 명시적으로 구독해제를 하지 않으면 무한하게 값을 발행하니 주의해야 한다.
// interval.d.ts interval(period?: number, scheduler?: SchedulerLike): Observable<number>; import { interval } from 'rxjs' const x = interval(1000).subscribe( (value) => console.log(`next! ${value}`), (err) => console.log(`err! ${err}`), () => console.log(`completed!`) ) setTimeout(() => { x.unsubscribe() }, 3000); // next! 0 // next! 1 // next! 2
timer
JavaScript의 setTimeout과 비슷하다.
setTimeout처럼 일정 시간 후 딱 한 번 실행된다.
// timer.d.ts (길어서 줄 바꿈 처리) timer( dueTime?: number | Date, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike ): Observable<number>; import { timer } from 'rxjs' const x = timer(1000).subscribe( (value) => console.log(`next! ${value}`), (err) => console.log(`err! ${err}`), () => console.log(`completed!`) ); // next! 0 // completed! /* 아래 예제를 기준으로 두 번째 인수를 넣으면 1초 후에 0을 발행하고 0.5초마다 1씩 증가하는 값을 발행한다. 구독 해제하지 않으면 무한히 반복한다. */ const y = timer(1000, 500).subscribe( (value) => console.log(`next! ${value}`), (err) => console.log(`err! ${err}`), () => console.log(`completed!`) ); setTimeout(() => { y.unsubscribe(); }, 3000); // next! 0 // next! 1 // next! 2 // next! 3
empty
NEVER
6 버전에 와서는 이 연산자들이 상수로 변경되었고 기존의 empty()와 never()는 deprecated되었다.
EMPTY
// EMPTY // 호출되자마자 complete함수를 호출하여 값 발행을 즉시 중단한다. import { EMPTY } from 'rxjs' EMPTY.subscribe( () => console.log(`next!`), (err) => console.log(`err!`), () => console.log(`completed!`) ) // completed!
NEVER
// NEVER // 호출되자마자 그 어떤 값도 발행하지 않게 하는 상수 옵저버블이다. import { NEVER } from 'rxjs' NEVER.subscribe( () => console.log(`next!`), (err) => console.log(`err!`), () => console.log(`completed!`) ) // 아무것도 출력되지 않음
throwError
호출되자마자 즉시 에러를 발생시킨다.
// throwError import { throwError } from 'rxjs' throwError(`ERROR!`).subscribe( (value) => console.log(`next! ${value}`), (err) => console.log(`err! ${err}`), () => console.log(`completed!`) ) // err! ERROR!
Reference
RxJS: https://rxjs-dev.firebaseapp.com/
RxJS 프로그래밍: https://www.hanbit.co.kr/media/books/book_view.html?p_code=B2578596304
@@@