-
@ 이 글은 글의 최하단에 위치해있는 References의 도서(RxJS 6 버전)를 기준으로 작성하였습니다.
@ Github: (https://github.com/ReactiveX/rxjs 6버전은 6.x 브랜치로 이동하세요)
@ RxJS 7이 공식 릴리즈되었습니다. (rxjs.dev/guide/overview/)
@ deprecated 되는 연산자도 있으니 참고하세요 (7 버전 변경사항 rxjs-dev.firebaseapp.com/deprecations/breaking-changes)
@ 각 연산자마다 타입 정의를 작성하지만, 이 외에도 오버로딩되는 함수들이 여럿있으니 github이나 에디터에서 참고하시길 바랍니다.
@ 각 함수의 마블 다이어그램은 공식 문서 또는 다음 링크에서 확인할 수 있습니다.
RxVisualizer: https://rxviz.com/
RxMarbles: (5 버전) https://rxmarbles.com/
Note
소스 옵저버블: (lift 함수 참고) 연산자를 연결할 때 호출하는 옵저버블
MonoTypeOperatorFunction<T> = 연산자가 적용된 옵저버블을 리턴하는 함수이고 pipe함수에서 이 함수를 호출함으로써 옵저버블 인스턴스를 얻는다.
1] 필터링 연산자
filter
JavaScript의 Array.prototype.filter와 같은 함수이다.
특정한 조건만을 만족하는 것들만 걸러서 옵저버블을 리턴한다
// filter.d.ts filter<T>( predicate: (value: T, index: number) => boolean, thisArg?: any ): MonoTypeOperatorFunction<T>; import { range, pipe } from 'rxjs' import { filter } from 'rxjs/operators' range(1, 10) .pipe(filter(x => x % 5 === 0)) .subscribe(x => console.log(x)) // 5 // 10
first
첫 번째 값 1개만 발행하고 더 이상 값을 발행하지 않는(complete 함수를 호출하는) 연산자
// first.d.ts first<T, D = T>( predicate: (value: T, index: number, source: Observable<T>) => boolean, defaultValue?: D ): OperatorFunction<T, T | D>; import { range, pipe } from 'rxjs' import { first } from 'rxjs/operators' range(1, 10).pipe(first((x, i) => i === 3 || x === 7)).subscribe(x => console.log(x)) // 4
last
마지막 값 1개만 발행하는 연산자다. 마찬가지로 complete함수를 호출한다.
// last.d.ts last<T, D = T>( predicate: (value: T, index: number, source: Observable<T>) => boolean, defaultValue?: D ): OperatorFunction<T, T | D>; import { range, pipe } from 'rxjs' import { first } from 'rxjs/operators' range(1, 10).pipe(last((x, i) => i === 3 || x === 7)).subscribe(x => console.log(x)) // 7
명시적으로 구독 해제하지 않도록 돕는 연산자
- 옵저버 패턴에서는 구독과 구독 해제가 있는데, subscribe와 unsubscribe가 여기저기 흩어져있다면 특정 개수만큼만 옵저버블을 구독하고 실행을 종료하거나 특정 조건을 만족하는 값을 발행할 때만 구독하다가 해당 조건을 만족하지 않을 때 구독 해제할 수 있다. 반대로 특정 조건을 만족하기 전까지 값을 발행하다가 조건을 만족하면 complete 함수를 호출해 구독완료할 수 있도록 만들 수 있다.
take 접두사가 붙은 연산자는 complete 함수를 제어할 수 있는 연산자이다.
take
정해진 개수만큼 구독하고 구독을 해제한다.
// take.d.ts take<T>(count: number): MonoTypeOperatorFunction<T>; import { interval, pipe } from 'rxjs' import { take } from 'rxjs/operators' // codeSandBox에서 한다면, interval을 가장 나중에 입력하세요! interval(1000).pipe(take(5)).subscribe(x => console.log(`result: ${x}`)) // result: 0 // result: 1 // result: 2 // result: 3 // result: 4
takeUntil
특정 이벤트가 발생할 때까지 옵저버블을 구독해야 한다면 이 연산자가 유용하다.
// takeUntil.d.ts takeUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T>; import { interval, fromEvent } from 'rxjs' import { takeUntil } from 'rxjs/operators' const source = interval(1000); const clicks = fromEvent(document, 'click'); const result = source.pipe(takeUntil(clicks)); result.subscribe(x => console.log(x));
takeWhile
소스 옵저버블에서 발행하는 값을 predicate 함수의 인자 사용해 조건을 만족하는 동안 값을 발행하고 조건을 만족하지 않으면 부모 옵저버블에서 더 이상 값을 발행하지 않도록 구독을 해제한다.
// takeWhile.d.ts takeWhile<T>( predicate: (value: T, index: number) => boolean, inclusive?: boolean ): MonoTypeOperatorFunction<T>; import { pipe } from 'rxjs' import { takeWhile } from 'rxjs/operators'
takeLast
마지막에 발행한 값을 기준으로 인수로 설정한 수만큼 값을 발행하고 complete 함수를 호출한다.
// takeLast.d.ts takeLast<T>(count: number): MonoTypeOperatorFunction<T>; import { pipe } from 'rxjs' import { takeLast } from 'rxjs/operators'
필요없는 값을 발행하지 않는 연산자
take와는 반대로 특정 조건을 만족하지 않을 때 값을 버려야 하는 경우에는 skip 접두사가 붙은 연산자를 이용해서 값을 버리고 그 이후부터 값을 발행할 수 있다.
skip
인수(argument)에 숫자를 넣으면 해당 숫자만큼 값들을 건너뛰고 그 다음부터 값을 발행한 후에 complete 함수를 호출한다.
// skip.d.ts skip<T>(count: number): MonoTypeOperatorFunction<T>; import { range } from 'rxjs' import { skip } from 'rxjs/operators' // 1부터 5개 -> 1부터 5까지 값을 발행하고 2개를 건너뛰어 3부터 5까지만 값이 발행된다 range(1, 5).pipe(skip(2)).subscribe(a => console.log(a)) // 3 // 4 // 5
skipUntil
takeUntil처럼 옵저버블을 인자로 사용하지만 인자로 사용한 옵저버블의 값 발행을 시작할 때까지 소스 옵저버블에서 발행하는 값을 건너뛴다.
// skipUntil.d.ts skipUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T>; import { interval } from 'rxjs' import { skipUntil, take } from 'rxjs/operators' interval(1000) .pipe(skipUntil(interval(5000)), take(3)) .subscribe(a => console.log(a)) /* 1 2 3 4 5 (초) 0 1 2 3 4 5 6 (interval이 발행하는 값) 5 이전까지는 무시하고 take로 3개를 받아오면 4 5 6이 출력된다 */
skipWhile
predicate 함수의 조건에 만족할 때 값 발행을 건너뛴다. 조건을 만족하지 않으면 조건과 상관없이 계속 값을 발행한다.
// skipWhile.d.ts skipWhile<T>(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction<T>; import { range } from 'rxjs' import { skipWhile } from 'rxjs/operators' interval(1000) .pipe(skipWhile(x => x > 4), take(3)) .subscribe(a => console.log(a)) // 0 // 1 // 2
값 발행 후 일정 시간을 기다리는 연산자
debounce 접두사가 붙는 연산자들은 다음 특징이 있다.
- 소스 옵저버블의 값을 바로 발행하지 않고 일정 시간 기다린다.
- 일정 시간 동안 소스 옵저버블에서 새 값을 발행하지 않으면 조건에 따라 특정 값을 발행한다.
- 일정 시간동안 새 값을 발행하면 다시 일정 시간 동안 발행하는 값이 없는지 기다린다.
- 조건을 만족할 때는 값 발행을 건너뛰다가 처음으로 조건을 만족하지 않는 순간부터 계속 값을 발행한다.
빠른 비동기 요청에서 오는 응답이나 이벤트(빠른 속도의 키보드 타이핑이나 마우스 클릭) 중 일정 시간 안에 발생한 것만 처리할 때 유용하다.
debounce
Selector함수로 소스 옵저버블에서 발행하는 값을 인자로 사용한다.
해당 선택자 함수에서 리턴하는 옵저버블이나 프로미스는 소스 옵저버블에서 발행한 다음 값을 전달받지 않으면 값을 발행한다.
선택자 함수는 소스 옵저버블에서 어떤 값을 전달받느냐에 따라 그에 상응하는 옵저버블이나 프로미스를 리턴할 수 있다.
// debounce.d.ts debounce<T>( durationSelector: (value: T) => SubscribableOrPromise<any> ): MonoTypeOperatorFunction<T>; import { fromEvent } from 'rxjs' import { debounce, take } from 'rxjs/operators' const clicks = fromEvent(document, 'click') clicks.pipe(take(4), debounce(i => timer(200 * i))).subscribe(x => console.log(x)) /* MouseEvent {isTrusted: true, screenX: 1575, screenY: 387, clientX: 105, clientY: 105…} */
debounceTime
값 발행을 기다리는 일정 시간을 인자로 설정한 후 해당 시간 안에 소스 옵저버블애소 발행한 다음 값을 전달받지 않으면 최근 값을 그대로 발행하는 연산자다. 시간은 ms 단위로 설정한다.
// debounceTime.d.ts debounceTime<T>(dueTime: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>; import { interval } from 'rxjs' import { debounceTime } from 'rxjs/operators' interval(400) .pipe( take(3), debounceTime(300) ) .subscribe((x) => console.log(x)); // 소스 옵저버블보다 발행 간격이 짧으면 모든 값을 발행한다. // 0 // 1 // 2 interval(400) .pipe( take(3), debounceTime(500) ) .subscribe((x) => console.log(x)); // 소스 옵저버블보다 발행 간격이 길면 마지막 1개만 값으로 발행하고 // 발행할 값이 없다면 마지막 값을 complete함수 호출과 동시에 발행한다. // 2
중복 값을 발행하지 않는 연산자
distinct
내부에서 자체 구현한 Set 자료구조로 이미 발행된 값을 중복 없이 저장했다가 같은 값을 전달받으면 발행하지 않는다.
해당 자료구조는 배열 형태이며 해당 배열의 indexOf 함수를 사용하여 얻은 index 값이 -1과 다른지 검사하여 값의 유무를 확인한다.
// distinct.d.ts distinct<T, K>(keySelector?: (value: T) => K, flushes?: Observable<any>): MonoTypeOperatorFunction<T>; import { of } from 'rxjs' import { distinct } from 'rxjs/operators' of(1, 1, 4, 2, 3, 2, 4, 1, 4, 6, 0) .pipe(distinct()) .subscribe((x) => console.log(x)); // 1 // 4 // 2 // 3 // 6 // 0
- keySelector 함수를 이용한 객체 타입 값의 중복 확인
객체는 참조로 식별하기 때문에 새로 만든 객체는 객체 안의 값이 같아도 참조가 다르기 때문에 서로 다른 객체로 인식한다.
따라서 단순히 distinct 함수를 호출하면 모든 객체가 출력된다.
이를 방지하기 위해서는 객체의 키-값으로 구분할 수 있다.
import { of } from 'rxjs' import { distinct } from 'rxjs/operators' of( { id: 1, value: 20 }, { id: 2, value: 50 }, { id: 3, value: 80 }, { id: 1, value: 20 }, { id: 2, value: 50 }, { id: 3, value: 70 }, ) .pipe(distinct(obj => obj.id)) .subscribe((x) => console.log(x)); /* {id: 1, value: 20} {id: 2, value: 50} {id: 3, value: 80} */
- flushes를 이용하여 중복 값 검사를 초기화
import { interval } from 'rxjs' import { take, map, distinct } from 'rxjs/operators' interval(200) .pipe( take(25), map(y => ({ id: y, value: y % 5})), distinct(obj => obj.value, interval(2100)) ).subscribe((x) => console.log(x)); /* {id: 0, value: 0} {id: 1, value: 1} {id: 2, value: 2} {id: 3, value: 3} {id: 4, value: 4} => Set 자료구조를 초기화 {id: 10, value: 0} {id: 11, value: 1} {id: 12, value: 2} {id: 13, value: 3} {id: 14, value: 4} => Set 자료구조를 초기화 {id: 20, value: 0} {id: 21, value: 1} {id: 22, value: 2} {id: 23, value: 3} {id: 24, value: 4} */
distinctUntilChanged
같은 값이 연속적으로 있는지 검사하는 연산자다.
연속해서 중복 값이 있다면 최초 1개만 발행하고 그 외에는 정상적으로 발행한다.
단, 연속해서 중복 값이 있는 것이 아니라면 중복 값 발행을 허용한다.
// distinctUntilChanged.d.ts distinctUntilChanged<T>(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction<T>; distinctUntilChanged<T, K>( compare: (x: K, y: K) => boolean, keySelector: (x: T) => K ): MonoTypeOperatorFunction<T>; import { of } from 'rxjs' import { distinctUntilChanged } from 'rxjs/operators' of(1, 6, 7, 7, 6, 2, 5, 5, 2) .pipe( distinctUntilChanged() ).subscribe((x) => console.log(x)); /* 1 6 7 6 2 5 2 */
- compare 함수로 값 비교
compare 함수로 키 값 a와 b의 값이 같을 때 객체의 값이 같다고 검사한다.
import { of } from 'rxjs' import { distinctUntilChanged } from 'rxjs/operators' of( { id: 1, value: 20 }, { id: 2, value: 40 }, { id: 2, value: 50 }, { id: 1, value: 30 }, ) .pipe( distinctUntilChanged((a, b) => a.id === b.id) ).subscribe((x) => console.log(x)); /* {id: 1, value: 20} {id: 2, value: 40} {id: 1, value: 30} */
- keySelector 함수로 값 비교
키 값으로 발행할 값을 비교한다.
1) compare 함수 없이 keySelector 함수를 사용하면 동등성을 기준으로 값을 비교한다.
import { of } from 'rxjs' import { distinctUntilChanged } from 'rxjs/operators' of( { id: 1, value: 40 }, { id: 2, value: 40 }, { id: 2, value: 50 }, { id: 1, value: 50 }, ) .pipe( distinctUntilChanged(null, n => n.value) ).subscribe((x) => console.log(x)); /* {id: 1, value: 40} {id: 2, value: 50} */
2) compare 함수와 keySelector를 동시에 사용할 때
import { of } from 'rxjs' import { distinctUntilChanged } from 'rxjs/operators' of( { id: 1, value: 40 }, { id: 2, value: 40 }, { id: 2, value: 50 }, { id: 1, value: 50 }, ) .pipe( distinctUntilChanged((a, b) => a.id === b.id, n => n.value) ).subscribe((x) => console.log(x)); /* { id: 1, value: 40 } */
샘플링 연산자
스트림에서 발행하는 값 중 모든 값이 필요하지 않고 일부 샘플만 있어도 충분할 때가 있다.
불필요하게 모든 값을 발행하는 것은 자원 낭비이기에 적절한 시점에 값들을 대표할 수 있는 표본만 발행하도록 하는 연산자가 sample과 sampleTime이다.
sample
notifier라는 옵저버블을 인자로 사용해 notifier 옵저버블에서 값을 발행할 때마다 소스 옵저버블의 가장 최근 값을 발행한다.
notifier에서 complete 함수를 호출해도 소스 옵저버블의 가장 최근 값은 발행한다.
단, 이후에는 소스 옵저버블에서 값을 발행해도 해당 값을 발행하지 않으니 주의해야 한다.
sample 연산자로 값을 발행한 후 소스 옵저버블에서 다음 값을 발행하기 전 notifier에서 또 값을 발행해도 최근 값을 중복해서 발행하지 않는다.
// sample.d.ts sample<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T>; import { interval, fromEvent } from 'rxjs' import { sample } from 'rxjs/operators' const seconds = interval(1000); const clicks = fromEvent(document, 'click'); const result = seconds.pipe(sample(clicks)); result.subscribe(x => console.log(x)) /* 0 4 5 6 8 9 10 40 */ // --------------------------------------------------------------- const sampleSize = 3 const sourceInterval = 200 const sampleDelay = 100 interval(sourceInterval) .pipe( sample( timer(sourceInterval + sampleDelay, sourceInterval * sampleSize) ), take(4) ) .subscribe(res => console.log(res)) /* 0 3 6 9 */
sampleTime (@deprecated) [8 버전에서 삭제 예정, sampleTime === sample(interval(period, scheduler?)) ]
ms 단위의 발행 간격을 인자로 설정한 후 해당 발행 간격 사이에 있는 소스 옵저버블의 최근 값을 확인해 발행하는 연산자
연속해서 발생하는 이벤트 중 일정 간격으로 가장 최근 것 하나만 뽑아 처리할 때 유용하다.
// sampleTime.d.ts sampleTime<T>(period: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>; import { timer } from 'rxjs' import { sampleTime, take } from 'rxjs/operators' // source: 0(300ms) 1(700ms) 2(1100ms) 3(1500ms) 4(1900ms) 5(2300ms) // sample: 800ms 1600ms 2400ms const sourcePoint = 300 const sourceDelay = 400 const sampleCount = 2 const samplePeriod = sourceDelay * sampleCount // 800ms timer(sourcePoint, sourceDelay) // 300ms, 400ms .pipe( sampleTime(samplePeriod), // 800ms take(3) ) .subscribe(res => console.log(res)) /* 1 3 5 */
2] 변환 연산자
map
JavaScript의 Array.prototype.map과 같다.
다만 배열 map 함수의 콜백함수의 매개변수 중에 array 파라미터가 있지만 RxJS에서는 없다.map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> import { interval } from 'rxjs' import { take, map } from 'rxjs/operators' interval(1000) .pipe( take(5), map(x => x * 10) ) .subscribe((x) => console.log(x)); /* 0 10 20 30 40 */
pluck (@deprecated) [8 버전에서 삭제 예정, pluck 대신 map 연산자를 사용하고 optional chaining 권장]
map 연산자처럼 동작하지만 소스 옵저버블에서 객체를 리턴할 때 해당 객체의 속성(property)을 기준으로 변환하는 연산자다. => 객체의 Array.prototype.flat 버전
// pluck.d.ts pluck<T, R>(...properties: string[]): OperatorFunction<T, R>; import { of } from 'rxjs' import { pluck } from 'rxjs/operators' of( { x: { y: 1 } }, { x: { y: 2 } }, { x: { y: 3 } }, { x: { y: 4 } }, { x: { y: 5 } }, { x: { y: 6 } }, ) .pipe( pluck('x', 'y') ) .subscribe((x) => console.log(x)); /* 1 2 3 4 5 6 */
mergeMap
(* 공식문서의 내용을 작성자가 한국어 번역)옵저버블의 각 소스 값에 옵저버블을 합치는 연산자
project 함수를 인자로 사용하여 리턴된 옵저버블 인스턴스를 구독하는 map 연산자
// mergeMap.d.ts mergeMap<T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R, concurrent?: number ): OperatorFunction<T, R>; import { timer, range } from 'rxjs' import { map, mergeMap } from 'rxjs/operators' const timers = [ timer(2000).pipe(map(v => `req1`)), timer(1000).pipe(map(v => `req2`)), timer(1500).pipe(map(v => `req3`)), ] range(0, 3) .pipe( mergeMap((x) => timers[x]) ) .subscribe((x) => console.log(x)); /* req2 req3 req1 */
- 배열이나 유사 배열 사용
range(0, 3) .pipe(mergeMap(x => [x + 1, x + 2])) .subscribe(x => console.log(x)) /* - range: 0 - 1 2 - range: 1 - 2 3 - range: 2 - 3 4 */ // ========================== // 유사 배열 객체 range(0, 3) .pipe(mergeMap(x => ({ length: 2, 0: x + 1, 1: x + 2 }))) .subscribe(x => console.log(x)) // 위와 동일
- 프로미스 사용
range(0, 3) .pipe(mergeMap(x => new Promise(resolve => setTimeout(() => resolve(`req${x + 1}`), Math.floor(Math.random() * 2000))))) .subscribe(x => console.log(x)) // req1, req2, req3 중에 랜덤 순서로 발행
- 이터러블 사용
iterable protocol을 준수한 이터러블이면 이터러블의 next 함수를 호출하면서 (순회하면서) 구독한다.
done값이 true이면 complete 함수를 호출한다.
range(0, 3).pipe(mergeMap(r => { const nextMap = new Map(); nextMap.set('keyName', r) nextMap.set('valueName', r + 1) return nextMap })).subscribe(x => { const [key, value] = x console.log(`key: ${key}, value: ${value}`) }) /* key: keyName, value: 0 => range: 0 key: valueName, value: 1 => range: 0 key: keyName, value: 1 => range: 1 key: valueName, value: 2 => range: 1 key: keyName, value: 2 => range: 2 key: valueName, value: 3 => range: 2 */
- 최대 동시 요청 수 정하기
concurrent를 지정하면 최대로 구독할 수 있는 옵저버블의 수를 제한할 수 있다.
소스 옵저버블에서 발행한 값을 빠른 속도로 전달하더라도 mergeMap 연산자에서 구독 완료되지 않은 옵저버블 수가 concurrent 개수만큼이라면 해당 값을 연산자 내부에 구현해놓은 버퍼(배열)에 잠시 저장해뒀다가 옵저버블 중 하나라도 구독을 해제한다면 버퍼에 저장한 순서대로 값을 하나씩 꺼내서 project 함수에서 새 옵저버블을 만들어 구독한다.
switchMap
mergeMap연산자는 새 옵저버블을 반환하고 구독하는 연산자였다면 switchMap은 새 옵저버블을 반환하고 구독하기 전에 연산자에서 완료되지 않은 작업이 있다면 해당 옵저버블의 구독을 해제하고 새 옵저버블을 구독한다.
// switchMap (resultSelector는 6버전에서 권장되지 않음) switchMap<T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R ): OperatorFunction<T, R>; interval(600).pipe( take(5), switchMap(x => interval(250).pipe( map(y => ({x, y})), take(3) )) ).subscribe(res => console.log(`next x:${res.x}, y: ${res.y}`)) /* next x:0, y: 0 next x:0, y: 1 next x:1, y: 0 next x:1, y: 1 next x:2, y: 0 next x:2, y: 1 next x:3, y: 0 next x:3, y: 1 next x:4, y: 0 next x:4, y: 1 next x:4, y: 2 마지막 4에서는 더 이상 구독할 옵저버블이 없기 때문에 y가 2까지 출력된다 */
concatMap
project 함수에서 리턴하는 옵저버블을 구독한 후 값 발행을 완료해야 다음 옵저버블을 구독하는 연산자다.
동기에서는 mergeMap이나 concatMap이나 동일하게 작동하지만 비동기에서는 concatMap이 구독 중인 옵저버블이 값을 발행해야 다음 옵저버블을 구독하기 때문에 버퍼에 저장할 수 있다.
// concatMap.d.ts concatMap<T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R ): OperatorFunction<T, R>; range(0, 3).pipe( concatMap(x => req[x]) ) .subscribe(res => console.log(`from ${res}`)) /* from req1 from req2 from req3 */
scan
scan 연산자는 reduce 연산자와 매우 유사하다.
reduce 연산자는 최종 값 1개만 발행하지만 scan 연산자는 next 함수로 값을 발행할 때마다 호출해 중간에 누적된 값을 매번 발행한다.
reduce 연산자는 값을 1개만 발행하기 떄문에 소스 옵저버블에서 complete 함수를 호출해야 값을 발행한다.
scan 연산자에 초기값이 원시값으로 설정되어있으면 불변값으로 되기 때문에 다시 구독할 때 변하는 것을 우려하지 않아도 된다.
참조값으로 설정하면 초기값이 변할 수 있으니 주의해야 한다. 불변을 유지하고 싶다면 팩토리 함수를 이용하거나 새 객체를 매번 생성해서 반환하면 된다.
// scan.d.ts scan<T, R>( accumulator: (acc: R, value: T, index: number) => R, seed: R ): OperatorFunction<T, R>; // 초기값이 없는 경우 range(0, 3).pipe( scan((acc, cur) => { console.log(`acc: ${acc}, cur: ${cur}`) return acc + cur }) ) .subscribe(res => console.log(`${res}`)) /* 0 acc: 0, cur: 1 1 acc: 1, cur: 2 3 */ // 초기값이 0인 경우 range(0, 3).pipe( scan((acc, cur) => { console.log(`acc: ${acc}, cur: ${cur}`) return acc + cur }, 0) ) .subscribe(res => console.log(`${res}`)) /* acc: 0, cur: 0 0 acc: 0, cur: 1 1 acc: 1, cur: 2 3 */ // 초기값이 참조인 경우 const observable$ = interval(500).pipe( take(7), scan((acc, cur) => { const a = acc.a acc.a = acc.b acc.b += a return acc }, { a: 1, b: 0 }), pluck('a') ) observable$.subscribe(res => console.log(`${res}`)) setTimeout(() => { observable$.subscribe(res => console.log(`res2: ${res}`)) }, 3000) /* 0 1 1 2 3 5 8 res2: 13 res2: 21 res2: 34 res2: 55 res2: 89 res2: 144 res2: 233 */
partition
filter 연산자처럼 predicate 함수를 호출하면 2개의 옵저버블을 배열로 반환한다.
첫 번째 요소는 predicate 함수의 조건을 만족하는 옵저버블이고
두 번째 요소는 조건에 만족하지 않는 옵저버블이다.
filter 연산자는 소스 옵저버블을 감싸 1개의 옵저버블을 반환한다면
partition 연산자는 같은 소스 옵저버블을 둔 서로 반대되는 조건의 filter 연산자를 적용한 2개의 옵저버블을 배열로 반환한다.
// partition.d.ts partition<T>( predicate: (value: T, index: number) => boolean, thisArg?: any ): UnaryFunction<Observable<T>, [Observable<T>, Observable<T>]>; const ob$ = range(0, 6).pipe(partition(x => x % 2 === 0)) const [first, second] = ob$ console.log(`first!`) first.subscribe(res => console.log(`${res}`)) console.log(`second!`) second.subscribe(res => console.log(`${res}`)) /* first! 0 2 4 second! 1 3 5 */
groupBy
소스 옵저버블에서 발행하는 값을 특정 기준을 정해 같은 그룹에 속해있는 값들을 각각의 옵저버블로 묶어서 발행한다.
이때 그룹을 묶는 기준을 '키'라고 하고 각 값에서 키 값을 만드는 함수를 keySelector라고 한다.
같은 그룹에 속한 값들을 발행하는 옵저버블은 groupedObservable이라고 하며 각 groupedObservable마다 적절한 연산자를 결합해서 사용할 수 있다.
// groupBy.d.ts groupBy<T, K, R>( keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, subjectSelector?: () => Subject<R> ): OperatorFunction<T, GroupedObservable<K, R>>; const ob$ = range(0, 6) .pipe( groupBy(x => x % 2 === 0), mergeMap(grouped => grouped.key === true ? grouped.pipe(map(x => `${x} = 짝수!`)) : grouped.pipe(map(x => `${x} = 홀수!`)) ) ) ob$.subscribe(res => console.log(`${res}`)) /* 0 = 짝수! 1 = 홀수! 2 = 짝수! 3 = 홀수! 4 = 짝수! 5 = 홀수! */
- keySelector
하나 또는 여러 개의 key를 만들 수 있는 함수이다.키에 해당하는 옵저버블이 없어 만들어서 값을 발행하는 인스턴스 => (groupedObservable)
keySelector 함수는 옵저버블 인스턴스 내부에 this.key에 키를 저장한다.
- elementSelector
keySelector 함수로 생성한 groupedObservable 인스턴스에 전달하는 값을 바꿔준다. map 연산자에 인자로 사용하는 함수처럼 소스 옵저버블에서 전달하는 값을 바꿔서 리턴한 후 키에 해당하는 groupedObservable로 값을 발행한다.
- durationSelector
groupedObservable을 사용하는 함수이고 여기서 리턴하는 옵저버블은 키에 해당하는 새 groupedObservable을 발행할 때 함께 구독한다. 여기서 어떤 값이든 발행하는 시점에 키와 groupedObservable의 매핑을 끊고 complete 함수를 호출해 구독을 완료한다. 이 때 durationSelector 함수에서 리턴한 옵저버블의 구독도 완료한다.
이후 같은 키에 해당하는 값을 소스 옵저버블에서 발행하면 키에 관한 매핑이 없으므로 durationSelector 함수에서 리턴하는 옵저버블을 새로 매핑해 구독한다.
- subjectSelector
멀티캐스팅을 공부하고 다시 보도록 하자
buffer
소스 옵저버블에서 발행하는 값을 순서대로 일정 기준으로 묶어서 하나의 배열로 발행하는 연산자다.
묶을 기준에 해당하는 시점에 배열을 만들어 발행하는 값들을 해당 배열에 쌓아둔 후, 일정 조건을 충족했을 때 쌓아둔 배열을 발행하고 다시 새로 배열에 쌓고 발행하는 일을 반복한다.
// buffer.d.ts buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]>; const msg = 'Chifuyu의 개발 블로그입니다.' const ob$ = interval(50) .pipe( take(msg.length), map(x => { const c = msg.charAt(x) console.log(c) return c }), buffer(interval(400)) ) ob$.subscribe(res => console.log(`${res}`)) /* C h i f u y u C,h,i,f,u,y,u 의 개 발 블 로 그 의, ,개,발, ,블,로,그 입 니 다 . */
bufferCount
값 각각을 옵저버블 스트림으로 전달할 때 이를 일정 개수만큼 묶은 후 서버에 한 번의 요청을 하는 상황이 있거나
묶은 값을 풀어서 처리해야 한다면 bufferCount가 적절하다.
// bufferCount.d.ts bufferCount<T>(bufferSize: number, startBufferEvery?: number): OperatorFunction<T, T[]>; const msg = 'Chifuyu의 개발 블로그입니다.' const ob$ = interval(50) .pipe( filter(r => msg.charAt(r) !== ' '), take(msg.length), map(x => { const c = msg.charAt(x) console.log(c) return c }), bufferCount(8) ) ob$.subscribe(res => console.log(`${res}`)) /* C h i f u y u 의 C,h,i,f,u,y,u,의 개 발 블 로 그 입 니 다 개,발,블,로,그,입,니,다 . "" "" .,, */
- bufferSize
몇 개의 값을 버퍼에 저장할지 정하는 정수값을 설정한다.
- startBufferEvery
버퍼 크기만큼 값을 저장한 후 얼마나 시프트 이동해서 값을 자를지 설정한다. (buffer의 첫 원소로부터 이동)
값이 없다면 중복 값 없이 버퍼 크기 수만큼 시프트 이동해 여러 값을 묶는다. (bufferSize만큼 시프트 이동함)
window
buffer 연산자의 배열 대신 중첩 옵저버블(Nested Observable)로 값을 발행하는 연산자다.
windowBoundaries는 묶은 단위의 값을 발행하는 옵저버블이다.
// window window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>>; const msg = 'Chifuyu의 개발 블로그입니다.' const ob$ = interval(50) .pipe( take(msg.length), map(x => { const c = msg.charAt(x) console.log(c) return c }), window(interval(350)), concatMap(x => { console.log(`window 받음`) return x.pipe( filter(x => x !== ' '), take(3), scan((acc, cur) => acc + cur, ''), last() ) }) ) ob$.subscribe(res => console.log(`result: ${res}`)) /* window 받음 C h i result: Chi f u y window 받음 u 의 개 발 result: 의개발 블 window 받음 로 그 입 니 result: 그입니 다 . */
windowCount
bufferCount의 window 버전이다
// windowCount windowCount<T>( windowSize: number, startWindowEvery?: number ): OperatorFunction<T, Observable<T>>;
3] 조합 연산자
merge
mergeMap 연산자와 비슷하지만 merge 연산자는 기존 옵저버블이 발행하는 값을 변환하는 것이 아니라 여러 옵저버블을 인자로 사용해 옵저버블 하나로 만들고 인자로 나열된 각각의 옵저버블을 함께 구독한다.
합쳐진 옵저버블은 각 옵저버블에서 먼저 발행된 값부터 발행한다. 인자로 나열된 순서와 상관없이 여러 옵저버블 중 먼저 값을 발행한 것을 한 곳에서 발행할 때 적절한 연산자다.
마지막에 숫자를 넣으면 동시에 구독할 수 있는 수를 설정할 수 있다.
// merge.d.ts merge<T, R>(...observables: (ObservableInput<any> | SchedulerLike | number)[]): Observable<R>; const req1$ = timer(2000).pipe(map(v => `req1`)) const req2$ = timer(1000).pipe(map(v => `req2`)) const req3$ = timer(1500).pipe(map(v => `req3`)) const ob$ = merge(req1$, req2$, req3$) .pipe( ) ob$.subscribe(res => console.log(`result: ${res}`)) /* result: req2 result: req3 result: req1 */
concat
merge 연산자와 달리 옵저버블의 구독 순서를 보장한다.
연산자 내부에서는 merge 연산자를 이용하고 동시에 구독할 수 있는 concurrent 값을 1로 설정했기 때문에 동시에 하나의 옵저버블만 구독할 수 있다. => merge 연산자의 concurrent 값이 1인 연산자로 동시에 구독할 수 있는 옵저버블의 최대 개수가 1인 연산자
첫 번째 인자로 있는 옵저버블을 먼저 구독하고 그 뒤에 오는 옵저버블은 순서대로 배열에 저장한다.
옵저버블 구독을 완료하면 배열에 저장한 순서대로 1개씩만 꺼내서 구독한다. 따라서 각 옵저버블을 순서대로 구독 완료해야 다음 옵저버블을 구독할 수 있다.
동시성을 보장할 수 없어 모든 옵저버블 구독을 완료할 시간은 각 옵저버블이 완료될 때까지 걸리는 시간의 합이다.
// concat.d.ts function concat<R>(...observables: ObservableInput<any>[]): Observable<R>; const req1$ = timer(2000).pipe(map(v => `req1`)) const req2$ = timer(1000).pipe(map(v => `req2`)) const req3$ = timer(1500).pipe(map(v => `req3`)) const ob$ = concat(req1$, req2$, req3$) .pipe( ) ob$.subscribe(res => console.log(`result: ${res}`)) /* 위 코드는 아래와 같다. merge(req1$, req2$, req3$, 1).subscribe(res => console.log(`result: ${res}`)) */ /* req1 req2 req3 */
forkJoin
6버전에선 함수로 변경되었다
Promise.all은 인자로 나열된 각 프로미스의 결과값을 배열에 담아 반환한다.
마찬가지로 forkJoin도 각 옵저버블이 마지막에 발행한 값을 배열에 저장한다.
// forkJoin.d.ts forkJoin<T>(...sources: ObservableInput<T>[]): Observable<T[]>; const r1 = timer(0, 1000).pipe(take(2), map(v => `r1: ${v}`)) const r2 = timer(0, 2000).pipe(take(3), map(v => `r2: ${v}`)) const r3 = timer(0, 1500).pipe(take(4), map(v => `r3: ${v}`)) forkJoin(r1, r2, r3) .subscribe(res => console.log(`${res}`)) // r1: 1, r2: 2, r3: 3
combineLatest
combineLatest는 forkJoin과 달리 가장 최신 값을 합해서 바로 발행한다.
scan : reduce = combineLatest : forkJoin처럼 생각하면 되겠다.
zip
각 옵저버블을 동시에 구독한 후 발행하는 값 각각을 버퍼에 저장한다. 그리고 같은 순서에 해당하는 값이 모두 준비되었을 때 합한 값을 발행한다. 각 옵저버블에서 값을 발행하는 순서에 맞게 짝을 맞춰 발행한다는 것이 항상 최신의 값만을 합치는 combineLatest와 다르다.
짝이 맞지 않다면 짝이 맞는 것끼리만 합한다.
// zip.d.ts function zip<T, R>( ...observables: Array<ObservableInput<T> | ((...values: Array<T>) => R)> ): OperatorFunction<T, R>;
startWith
구독하는 어떤 옵저버블이 특정 값을 발행하기 전 미리 나열한 인자를 발행하는 역할을 한다.
단, 나열된 값을 그대로 발행하기만 한다. 예를 들어 옵저버블을 넣었다면 옵저버블 자체를 먼저 발행할 뿐 해당 옵저버블을 구독하지 않는다.
4] 수학 및 결합 연산자
reduce
max
min
count
=> ES6의 메서드와 같은 역할(이름에서도 알 수 있는 역할입니다.)을 하므로 세부 내용과 코드는 생략합니다.
5] 유틸리티 연산자
tap
5 버전에서는 do라는 연산자였지만 자바스크립트 예약어와 겹친다는 이유로 이름이 tap으로 변경되었다.
부수 효과(side effect)가 필요할 때 사용하는 연산자다.
map, filter 연산자들은 각각의 고유 기능이 있는 순수함수였지만 tap 연산자는 소스 옵저버블을 다른 옵저버블로 바꾸지 않는다.
소스 옵저버블에서 발행하는 값을 전달받은 후 인자로 사용하는 함수를 호출하고 소스 옵저버블에서 발행한 값을 그대로 발행한다.
// tab.d.ts tap<T>( next: (value: T) => void, error: null | undefined, complete: () => void ): MonoTypeOperatorFunction<T>; /* next(nextOrObserver)는 소스 옵저버블에서 발행하는 다음 값을 next 콜백함수로 동작한다. 객체라면 next, error, complete 함수가 있는 옵저버 객체로 다룬다. */ range(1, 10).pipe( tap( x => console.log(`Stream 1 (range 1, 10) ${x}`), err => console.error(`tab Error: ${err}`), ), filter(x => x % 2 === 0), tap(x => console.log(`Stream 2 (filter x % 2 === 0) ${x}`)), map(x => x + 1), tap(x => console.log(`Stream 3 (map x + 1) ${x}`)) ).subscribe(x => console.log(`result: ${x}`)) /* Stream 1 (range 1, 10) 1 Stream 1 (range 1, 10) 2 Stream 2 (filter x % 2 === 0) 2 Stream 3 (map x + 1) 3 result: 3 Stream 1 (range 1, 10) 3 Stream 1 (range 1, 10) 4 Stream 2 (filter x % 2 === 0) 4 Stream 3 (map x + 1) 5 result: 5 Stream 1 (range 1, 10) 5 Stream 1 (range 1, 10) 6 Stream 2 (filter x % 2 === 0) 6 Stream 3 (map x + 1) 7 result: 7 Stream 1 (range 1, 10) 7 Stream 1 (range 1, 10) 8 Stream 2 (filter x % 2 === 0) 8 Stream 3 (map x + 1) 9 result: 9 Stream 1 (range 1, 10) 9 Stream 1 (range 1, 10) 10 Stream 2 (filter x % 2 === 0) 10 Stream 3 (map x + 1) 11 result: 11 */
finalize
옵저버블 스트림 실행을 완료하거나 에러가 발생했을 때 인자로 사용하는 콜백 함수를 호출허는 연산자다
이 연산자도 tap 연산자와 마찬가지로 5 버전에서는 finally라는 연산자였다가 변경되었다.
finalize는 옵저버블 라이프 사이클을 전달받는 콜백 함수를 등록하는 ReactiveX do 연산자 종류 중 하나다.
기존 구독하는 소스 옵저버블에 영향을 주지 않고 옵저버블 라이프 사이클이 끝날 때 호출되는 콜백 함수를 인자로 사용한다.
// finalize.d.ts finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>; range(1, 3).pipe( finalize(() => console.log(`finalize callback`)) ).subscribe( x => console.log(x), err => console.error(err), () => console.log(`Complete`) ) /* 1 2 3 Complete finalize callback */
toPromise
https://medium.com/@benlesh/rxjs-observable-interop-with-promises-and-async-await-bebb05306875
옵저버블을 구독하여 이를 프로미스로 감싼 형태라고 소개하고 있다.
이전까지 작성된 연산자들은 새로운 옵저버블로 감싸사 리턴하고 이를 구독해서 해당 연산자를 동작시키는 형태였지만 toPromise 함수는 호출 후 독해서 동작하는 것이 아니라 호출하자마자 새로 생성한 프로미스를 리턴해준다.
프로미스 안 함수에서 소스 옵저버블인 this를 사용해 구독하므로 프로미스 생성과 동시에 소스 옵저버블을 구독한다.
그리고 toPromise 함수에서 리턴받은 프로미스는 소스 옵저버블 구독이 완료되었을 때 가장 최근 값을 resolve로 갖는다.
interval(100).pipe( take(10), tap(x => console.log(`interval tap ${x}`)) ).toPromise().then( value => console.log(`프로미스 결과 ${value}`), reason => console.error(`프로미스 에러 ${reason}`) ) /* interval tap 0 interval tap 1 interval tap 2 interval tap 3 interval tap 4 interval tap 5 interval tap 6 interval tap 7 interval tap 8 interval tap 9 프로미스 결과 9 */ interval(100).pipe( take(10), tap(x => console.log(`interval tap ${x < 3 ? x : x.test()}`)) ).toPromise().then( value => console.log(`프로미스 결과 ${value}`), reason => console.error(`프로미스 에러 ${reason}`) ) /* interval tap 0 interval tap 1 interval tap 2 프로미스 에러 TypeError: x.test is not a function */
toArray
소스 옵저버블에서 발행한 값을 내부에 생성한 배열에 저장하다가 소스 옵저버블 구독이 완료되면 해당 배열을 next 함수로 발행하도록 동작하는 연산자다.
toPromise 함수와 이름이 비슷하지만 연산자에 해당한다. 다른 연산자와 마찬가지로 새로운 옵저버블을 리턴하고 이를 구독해야만 옵저버블 안에서 결과를 전달받을 수 있으며 subscribe 함수 호출만으로 구독하는 일은 없기 때문이다.
// toArray.d.ts toArray<T>(): OperatorFunction<T, T[]>; // 소스 옵저버블에서 에러가 발생하면 error 함수를 호출하는 연산자다. // 구독을 완료할 때까지 배열에 값을 저장하므로 무한 스트림에서 사용하지 않도록 해야 한다. // 너무 많은 값을 저장하면 배열이 커지므로 메모리 이슈에 주의! range(1, 3).pipe( toArray() ).subscribe( v => console.log(`isArray: ${Array.isArray(v)}, Array: ${v}`) ) // isArray: true, Array: 1,2,3
timeout
일정 시간동안 소스 옵저버블에서 값을 발행하지 않으면 에러를 발생시키는 연산자다.
서버에 어떤 요청을 하거나 상황에 따라서 기대한 시간보다 오래 걸릴 수 있는 작업에 옵저버블을 사용해야 할 때 유용하다.
인자로 설정한 시간보다 작업 시간이 더 오래 걸리면 타임아웃 에러를 발생시키므로 유용하다.
// timeout.d.ts timeout<T>(due: number | Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>; const source$ = defer( () => fetch(`https://httpbin.org/delay/${3000}`) .then(x => x.json()) ) source$.pipe( timeout(2000) ).subscribe( x => console.log(`${JSON.stringify(x)}`), err => { console.error(err) process.exit(1) } ) // Error {}
6] 조건 연산자
조건 연산자는 특정 조건에 맞는지 알려주는 boolean 값을 리턴해 특정 조건에 해당할 때 정해진 값을 발행하는 연산자다.
이미 발행한 값이 아닌 소스 옵저버블의 특성에 따라서 조건 자체를 분기해야 할 때는 조건 연산자가 유용하다.
defaultIfEmpty
소스 옵저버블이 empty 함수로 생성한 옵저버블(empty 옵저버블)일 떄 인자로 설정한 기본값을 발행해주는 연산자다.
=> 소스 옵저버블이 empty 옵저버블이면 complete 함수를 호출했을 때 기본값을 발행하고 그렇지 않으면 소스 옵저버블의 원래 동작을 실행하는 것이다.
// defaultIfEmpty.d.ts defaultIfEmpty<T, R>(defaultValue?: R): OperatorFunction<T, T | R>; const ranges = count => range(1, count) function isEmpty(count) { ranges(count).pipe( defaultIfEmpty('EMPTY') ).subscribe(value => console.log(`개수(count): ${count}, 값(value): ${value}`)) } isEmpty(0) // 개수(count): 0, 값(value): EMPTY isEmpty(3) /* 개수(count): 3, 값(value): 1 개수(count): 3, 값(value): 2 개수(count): 3, 값(value): 3 */
isEmpty
true / false를 값으로 발행해 소스 옵저버블이 empty 옵저버블인지 아닌지를 알려주고 구독 완료하는 연산자다.
defaultIfEmpty 연산자와 달리 소스 옵저버블이 empty 옵저버블이 아닐 때 소스 옵저버블의 원래 동작을 그대로 실행하지 앟는다.
에러가 발생할 때를 제외하고 소스 옵저버블에서 next나 complete 함수를 호출하면 그 자리에서 empty 옵저버블인지만 확인해 true / false 값을 발행하고 구독을 완료한다.
소스 옵저버블에서 값을 발행해서 next 함수를 호출하면 해당 값을 발행하지 않고 false를 발행하며 구독 완료 후 소스 옵저버블에서 complete 함수를 호출하면 true를 발행하고 완료한다.
// isEmpty.d.ts isEmpty<T>(): OperatorFunction<T, boolean>; const ranges = count => range(1, count) function sourceIsEmpty(count) { ranges(count).pipe( isEmpty() ).subscribe(value => console.log(`개수(count): ${count}, 값(value): ${value}`)) } sourceIsEmpty(0) // 개수(count): 0, 값(value): true sourceIsEmpty(3) // 개수(count): 3, 값(value): false
find
인자로 사용하는 predicate 함수로 소스 옵저버블에서 발행하는 값 중 처음으로 함수 조건을 만족했을 때 true를 리턴하는 값을 발행하고 구독을 완료하는 연산자다.
구독을 완료할 때까지 조건을 만족하는 값이 없었다면 undefined라는 값을 발행한다.
소스 옵저버블에서 에러가 발생하거나 predicate 함수를 호출해 동작하는 도중에 에러가 발생하면 error 함수를 호출해 에러를 전달받는다.
// find.d.ts find<T>( predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any ): OperatorFunction<T, T | undefined>; const ranges = count => range(1, count) function sourceIsEmpty(count) { ranges(count).pipe( find((x) => x > 2) ).subscribe(value => console.log(`개수(count): ${count}, 값(value): ${value}`)) } sourceIsEmpty(5) // 개수(count): 5, 값(value): 3 sourceIsEmpty(2) // 개수(count): 2, 값(value): undefined
7] 에러 처리 연산자
에러가 발생하여 error 함수로 에러가 전달된 후에는 구독하던 스트림을 종료한다.
RxJS에서는 에러 핸들링을 할 수 있는 특별한 연산자들을 제공해 예외 처리를 구현할 수 있다.
catchError
에러가 발생했을 때 인자로 사용하는 선택자 함수로 해당 에러를 전달하여 여기에서 리턴하는 옵저버블 대신 구독하는 연산자다.
try-catch문처럼 에러가 발생했을 때 이를 확인해 예외 처리하는 방식과 같다.
subscribe 함수에 error 함수를 넣어도 catchError 연산자가 에러를 적절히 처리해줘서 error 함수를 호출하지 않는다,
그러나 catchError 연산자의 선택자 함수에서 리턴받아 구독한 옵저버블에서 에러가 발생한다면 error 함수를 호출한다.
// catchError.d.ts catchError<T, O extends ObservableInput<any>>( selector: (err: any, caught: Observable<T>) => O ): OperatorFunction<T, T | ObservedValueOf<O>>; const integers = ['1', '2', '3', 'r', '5'] from(['1', '2', '3', 'r', '5', '6', 'u', '8']).pipe( mergeMap(x => of(x).pipe( tap(value => { if (!Number.isInteger(parseInt(value, 10))) { throw new TypeError(`정수가 아님! ${value}`) } }), catchError(err => of(err.message)) ) ) ).subscribe(x => console.log(x), err => console.error(err))
retry
에러가 발생했을 때 인자로 설정한 정수값만큼 소스 옵저버블 구독을 재시도하는 연산자다.
기본값은 -1이며 에러가 발생하지 않는다면 재시도하지 않는다.
구독을 재시도할 때 소스 옵저버블을 다시 구독해서 처음부터 값을 발행한다.
재시도 후 에러가 발생하지 않으면 해당 값을 발행하고 스트림을 계속 처리한다.
retry 연산자는 재시도할 때 소스 옵저버블을 새로 구독한다.
(책 속의 팁: 소스 옵저버블 구독을 처음부터 재시도하려면 mergeMap 연산자를 사용하지 않아도 되지만 여러 값을 발행하는 스트림 각각에 재시도해야 한다면 mergeMap 연산자를 사용해야 한다.)
// retry.d.ts retry<T>(count?: number): MonoTypeOperatorFunction<T>;
retryWhen
소스 옵저버블 구독을 재시도하는 retry 연산자와 비슷하지만 구독을 재시도하기 전 에러를 전달받아 특정한 옵저버블에서 발행한 후 구독을 재시도하는 연산자다.
notifier 함수는 소스 옵저버블에서 에러가 발생했을 때 해당 에러를 errors라는 옵저버블로 다루는데 이 옵저버블의 스트림을 전달받아 notifier에서 리턴하는 옵저버블을 구독한다.
notifier에서 리턴하는 옵저버블의 값을 발행하면 이어서 소스 옵저버블 구독을 재시도한다.
여기서 에러가 발생하면 전체 스트림의 구독을 종료한다. complete 함수를 호출하면 동일하게 complete 함수를 호출한다.
// retryWhen.d.ts retryWhen<T>( notifier: (errors: Observable<any>) => Observable<any> ): MonoTypeOperatorFunction<T>;
8] 서브젝트
기본개념은 공식문서에 설명이 잘 되어있으니 읽어보면 좋을 것이다. => https://rxjs.dev/guide/subject
콜드 옵저버블(Cold Observable): 멀티캐스팅을 지원하지 않는 옵저버블, 지금껏 작성되었던 모든 것들이 콜드 옵저버블이다.
BehaviorSubject
초기값으르 가지고 있고 언제 구독해도 항상 값을 발행한다.
const a = { next: x => console.log(`a: ${x}`), error: err => console.error(`err a: ${err}`), complete: () => console.log('complete a') } const b = { next: x => console.log(`b: ${x}`), error: err => console.error(`err b: ${err}`), complete: () => console.log('complete b') } const c = { next: x => console.log(`c: ${x}`), error: err => console.error(`err c: ${err}`), complete: () => console.log('complete c') } const bSubject = new BehaviorSubject('초기값') bSubject.subscribe(a) bSubject.next('값1') bSubject.subscribe(b) bSubject.next('값2') bSubject.subscribe(c) bSubject.next('값3') bSubject.next('값4') bSubject.next('값5') /* a: 초기값 a: 값1 b: 값1 a: 값2 b: 값2 c: 값2 <------------ 멀티캐스팅 시작 -----------> a: 값3 b: 값3 c: 값3 a: 값4 b: 값4 c: 값4 a: 값5 b: 값5 c: 값5 */
ReplaySubject
AsyncSubject
9] 멀티캐스팅 연산자
핫 옵저버블(Hot Observable) & 콜드 옵저버블(Cold Observable)
핫 옵저버블은 옵저버블이 푸시하는 값을 여러 옵저버에 멀티캐스팅하는 옵저버블이다. -> 커넥터블 옵저버블
서브젝트처럼 멀티캐스팅을 지원하지만 옵저버블이기에 next, error, complete 함수를 제공하지 않고 옵저버블 내부에서 멀티캐스팅할 값을 푸시한다.
multicast
소스 옵저버블로부터 multicast를 호출할 때 서브젝트 팩토리 함수를 사용해 커넥터블 옵저버블을 만들어 핫 옵저버블을 다룰 수 있다.
publish
multicast 연산자는 서브젝트나 서브젝트의 팩토리 함수를 직접 사용해야 하지만 publish는 서브젝트나 서브젝트 팩토리 함수를 사용할 필요가 없도록 추상화된 연산자다.
selector 함수를 제공하지 않을 때,
(1. 같은 서브젝트 객체를 공유하므로 소스 옵저버블 구독을 완료하면 내부에 생성한 서브젝트도 사용할 수 없다.
2. connect 함수를 호출한 후 소스 옵저버블을 구독하다 완료하면 다시 connect 함수를 호출해도 이후 구독하는 옵저버들이 값을 받을 수 없다. => 서브젝트를 사용할 수 없기 때문에 이를 구독하는 옵저버들은 값을 받을 수 없다.)
-> publish 연산자 대신 multicast 연산자와 매번 서브젝트를 새로 생성하는 서브젝트 팩토리 함수를 사용하거나, share 연산자를 사용한다.
publishBehavior, publishReplay, publishLast
특정 서브젝트 자체를 멀티캐스팅하는 연산자들이다.
publishBehavior는 BehaviorSubject를, publishReplay는 ReplaySubject, publishLast는 AsyncSubject를 multicast 연산자에서 사용해 커넥터블 옵저버블을 만들어준다. publish와 다른 점은 선택자 함수를 사용하지 않고, 해당 서브젝트를 만드는데 필요한 것만 사용한다.
3가지 연산자는 무조건 커넥터블 옵저버블을 반환한다.
refCount (deprecated)
커넥터블 옵저버블을 구독하는 옵저버의 수를 센 후 최초로 1이 되면 connect 함수를 자동으로 호출한다.
옵저버블 구독을 1개 해제할 때마다 count를 1씩 줄이다가 0이 되면 unsubscribe 함수까지 자동으로 호출해준다.
* 7 버전 문서에서는 share 연산자로 대체된다는 deprecation note가 있다.
share
share는 pipe(publish(), refCount())를 추상화한 연산자다.
기존 옵저버블을 커넥터블 옵저버블로 바꾸고 refCount 연산자를 사용해 connect 함수를 호출할 필요없는 핫 옵저버블로 바꾼다.
pipe(publish(), refCount()) 와는 다른 점은 재구독이 가능하다는 것인데, 구현 코드에서 서브젝트가 없다면 팩토리 함수로 서브젝트를 만들기 때문에 가능하다. 마치 multicast 연산자에 팩토리 함수를 넣은 것처럼 재구독이 가능하다. 반면 publish는 new Subject()로 팩토리 함수가 아니기 떄문에 재구독 시도를 하면 실패한다.
아래는 7버전에서의 구현코드이다.
export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> { options = options || {}; const { connector = () => new Subject<T>(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options; let connection: SafeSubscriber<T> | null = null; let subject: SubjectLike<T> | null = null; let refCount = 0; let hasCompleted = false; let hasErrored = false; const reset = () => { connection = subject = null; hasCompleted = hasErrored = false; }; return operate((source, subscriber) => { refCount++; subject = subject ?? connector(); subscriber.add(() => { refCount--; if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) { const conn = connection; reset(); conn?.unsubscribe(); } }); subject.subscribe(subscriber); if (!connection) { connection = new SafeSubscriber({ next: (value: T) => subject!.next(value), error: (err: any) => { hasErrored = true; const dest = subject!; if (resetOnError) { reset(); } dest.error(err); }, complete: () => { hasCompleted = true; const dest = subject!; if (resetOnComplete) { reset(); } dest.complete(); }, }); from(source).subscribe(connection); } }); }
10] 스케줄러
스케줄러는 옵저버가 옵저버블을 구독할 때 값을 전달받는 순서와 실행 컨텍스트를 관리하는 역할을 하는 자료구조다.
공식문서에서는 자료구조, 실행 컨텍스트, 가상 시계 3가지 요소로 정의하고 있다.
자료구조(data structure): 우선순위나 다른 기준에 따라서 저장하고 task를 저장하고 큐에 넣는다.
실행 컨텍스트(execution context): task가 언제 실행되고 어디에서 실행되는지를 나타낸다. 예를 들면 즉시 실행, setTimeout, process.nextTick, requestAnimationFrame 같은 콜백 메커니즘을 말한다.
가상 시계(virtual clock): now 함수라는 스케줄러의 시간을 가리키는 getter 메서드를 제공한다. 특정 스케줄러에 스케줄한 작업들은 클락으로 설정한 시간에 맞춰 동작한다.
스케줄러를 사용하는 방법은 크게 2가지인데,
1. subscribeOn과 observeOn이란 연산자나 스케줄러를 인자로 사용하는 연산자를 사용하는 것이다.
2. 직접 연산자를 구현할 때 스케줄러에 있는 schedule 함수를 호출하는 방법이다. 내부에서 스케줄러가 어떤 순서로 작업을 처리할지 직접 정할 수 있다.
스케줄러는 schedule 함수를 호출해서 동작한다. schedule 함수를 직접 호출하지 않고 연산자의 인자로 스케줄러를 사용할 수 있지만 내부 구현은 해당 스케줄러의 schedule 함수를 호출한다.
schedule 함수는 해당 스케줄러와 매칭하는 액션 객체를 생성해 해당 액션을 실행한다.
AsyncScheduler
(asyncScheduler로 import 해야 한다.)
AsyncScheduler는 스케줄러(대표 스케줄러: AsapScheduler, QueueScheduler...)들이 상속받는 부모 스케줄러라고 할 수 있다.
각 작업 단위 기준으로 보면 setTimeout처럼 동작한다.
(공식문서에서 setTimeout의 시간을 0으로 설정하는 것은 asyncScheduler 대신 asapScheduler를 사용하라고 권고하고 있다.)
asyncScheduler.schedule(function work(value) { value = value || 0 console.log(`value: ${value}`) const selfAction = this; // console.log(selfAction) selfAction.schedule(value + 1, 1000) }, 1000) /* value: 0 value: 1 value: 2 . . . */
AsapScheduler
(asapScheduler로 import 해야 한다.)
각 플랫폼에 맞게 동기로 작업을 처리한 후 가능하면 빠르게 비동기로 작업을 처리하는 스케줄러다.
현재 이벤트 처리의 끝이나 현재 실행 로직 다음에 실행해야 할 이벤트 처리보다 더 빠르게 처리해야 하는 작업이 있을 때 사용한다.
QueueScheduler
(queueScheduler로 import 해야 한다.)
동기방식의 스케줄러이다. actions 배열을 반복 실행하며 먼저 들어온 값을 먼저 사용하는 큐 자료구조를 사용한다.
delay를 지정하면 asyncScheduler처럼 동작한다. null을 넣으면 delay를 넣지 않겠다는 뜻이다.
(도서의 필자: 큐를 사용해서 순서를 동기로 조절해야 하는 특수한 상황이 아니면 사용할 일이 없어 실용성이 다소 낮은 스케줄러)
AnimationFrameScheduler
(animationFrameScheduler로 import 해야 한다.)
애니메이션 구현 시 프레임 손실을 막는 window.requestAnimationFrame으로 구현한 스케줄러다.
마찬가지로 delay를 사용하면 asyncScheduler처럼 동작한다.
내부에서는 AnimationFrame.js라는 유틸리티 성격의 함수를 참조해 플랫폼에 적당한 애니메이션 프레임 객체를 선택해 사용한다.
따라서 화면에 프레임별로 손실없이 애니메이션을 그려줄 때(프레임별로 공식을 적용해 CSS 설정을 바꿔야 할 때) 사용할 수 있다.
이 동영상은 해당 스케줄러를 사용하여 CSS 변환 방법을 알려준다. youtu.be/jKqWMvdTuE8
subscribeOn
구독하는 옵저버블 자체를 인자로 사용할 스케줄러로 바꿔준다.
만약 여러 개의 subscribeOn 연산자가 있다면 맨 처음 호출한 subscribeOn 연산자에 있는 스케줄러를 선택해 사용한다.
그러므로 특수 상황(이미 subscribeOn으로 스케줄러가 바뀌어 이를 덮어쓰는 상황)을 제외하고는 subscribeOn 연산자를 여러 번 사용할 이유는 없으니 적절한 위치에 한 번만 사용하면 된다.
const source = Observable.create(observer => { console.log(`begin source`) observer.next(1) observer.next(2) observer.next(3) observer.complete() console.log(`end source`) }) console.log(`before subscribe`) source.pipe(subscribeOn(asyncScheduler, 1000)).subscribe(x => console.log(x)) console.log(`end subscribe`) /* before subscribe end subscribe begin source 1 2 3 end source */
observeOn
subscribeOn 연산자를 사용하면 모든 스트림이 스케줄러로 바뀌는데, 중간에 다른 스케줄러로 바꿔주고 싶을 때 observeOn 연산자를 사용하여 observeOn이 호출된 이후부터 스케줄러를 바꿔 실행할 수 있다.
여러 개의 observeOn 연산자가 연속으로 연결되었을 때 가장 마지막에 호출된 연산자의 스케줄러가 우선 적용된다.
- AsyncScheduler 사용
- AsapScheduler 사용
- QueueScheduler 사용
11] 테스트
모카
RxJS 테스트
createHotObservable / ColdObservable 메서드
12] 프로젝트
React
13] redux-observable
14] 사용자 정의 생성 함수 및 연산자
사용자 정의 파이퍼블 연산자
사용자 정의 생성 함수
References
RxJS: https://rxjs-dev.firebaseapp.com/ (7 버전), v6.rxjs.dev/resources (6 버전)
RxJS 프로그래밍: https://www.hanbit.co.kr/media/books/book_view.html?p_code=B2578596304
Redux-Observable: https://redux-observable.js.org/
@@@
'RxJS' 카테고리의 다른 글
RxJS 기본개념 정리 (0) 2021.04.26