보통 RxJS를 사용하면서 연산자(operator)를 추가로 작성해야 하는 경우는 거의 없다. 5 버전을 기준으로 RxJS의 기본 연산자 수는 거의 100여개에 가까우며 map
과 같은 기본적인 연산자부터 mergeScan
과 같은 복잡한 연산자에 이르기까지 많은 작업에 필요한 동작을 지원한다. 코드를 작성하면서 지원되지 않는 연산자를 새로 작성해야 한다는 생각이 드는 경우 이미 지원되는 연산자의 조합을 이용하면 해결되는 경우가 대부분이다. 그러나 모든 상황을 만족할 수 있는 연산자를 구비하는 것이 불가능한 까닭에 완전히 새로운 연산자를 작성해야 할 때도 있다.
이런 연산자들을 RxJS의 기본 라이브러리에 추가해 달라는 요청을 종종 받게 되는데 어지간한 경우 수락하지 않고 있다. RxJS 5 및 이후의 버전은 기본 라이브러리의 연산자 숫자를 줄이는 대신 기본 라이브러리에 포함되지 않은 연산자들은 별도의 모듈로 작성하여 필요한 경우 추가로 설치하도록 권장하기 때문이다. 연산자가 기본 라이브러리에 직접 포함되지 않아도 불편함 없이 Observable
및 기본 연산자들과 함께 사용할 수 있으며 이 과정을 간략하게 다루어 본다.
연산자를 함수로 작성해보기
RxJS 5의 리드인 Ben Lesh가 포스팅 Learning Observable By Building Observable
을 통해 이 주제를 이미 잘 설명해 두었으니 이를 먼저 읽어보는것을 권장한다. 요약하면 RxJS에서의 연산자는 Observable
을 입력으로 받고 사용자가 Subscribe
할 때 원래의 Observable
에 Subscribe
하는 새 Observable
을 반환하는 함수이다. 아래와 같은 함수를 생각해 보자.
function noop<T>(source: Observable<T>): Observable<T> {
return Observable.create(
(subscriber: Subscriber<T>) => source.subscribe(subscriber)
);
}
noop
함수는 단순하게 새 Observable
을 생성하고 반환한다.
const source = Observable.of(1);
source.subscribe(console.log.bind(console)); // 1
noop(source).subscribe(console.log.bind(console)); // 1
실행 결과를 비교해 보면 둘의 결과가 동일한 것을 확인할 수 있다. 아무것도 하지 않는 noop
연산자를 구현한 셈이다.
기본적인 입력과 출력을 구현했으니 이제 연산자의 동작을 자유로이 정의할 수 있다. 기본 연산자 가운데 가장 단순한 연산자인 mapTo
연산자를 위에서 작성한 noop
연산자를 변경해서 구현해 보면 아래와 같다.
function mapTo<T, R = T>(source: Observable<T>, toValue: R): Observable<R> {
return Observable.create(
(subscriber: Subscriber<T>) => source.subscribe(
subscriber.next.bind(subscriber)(toValue),
subscriber.error.bind(subscriber),
subscriber.complete.bind(subscriber)
)
);
}
mapTo(noop(source), 'x').subscribe(console.log.bind(console)); // 'x'
실제 RxJS의 내부 구현은 함수가 아닌 class
를 이용해 연산자를 구현하고 새 Observable
을 생성하기 위해 lift
인터페이스를 사용한다. 이 방식으로 사용자 정의 연산자를 구현하는 것도 가능하며 RxJS의 가이드라인에서 간략하게 다루고 있다. 보통 Observable
자체를 확장할 경우 lift
를 오버라이드하는 경우가 많고, 단순히 연산자만을 추가하는 경우라면 함수를 이용하는 쪽이 조금 더 간편하다.
조금 더 쉽게 연산자를 연결하기
지금까지 작성한 함수들은 분명 RxJS의 연산자이고 원하는 대로 동작하지만 사용 방법은 기본 연산자와 차이가 있다. 새로 작성한 연산자를 반복적으로 이용하는 경우 코드는
noop(noop(noop(source))).map(...).subscribe();
source.map(...).filter(...).subscribe();
과 같이 기본 연산자의 .
연결과 달리 중첩된 함수 호출의 형태로 나타나게 된다. RxJS의 let
연산자는 이러한 함수 연산자들을 다른 연산자들과 연결할 수 있게 도와주는 연산자이다. 위 예제의 noop
함수를 let
을 이용해 다시 작성하면 다른 기본 연산자들처럼 연결해서 이용할 수 있다.
source.let(noop).let(noop).map(...).subscribe();
mapTo
연산자와 같이 연산자에 추가적인 입력이 필요하다면 연산자의 구현을 조금 변경해 주어야 한다. 연산자 함수 구현을 먼저 let
연산자의 입력값인 (o: Observable<T>) => Observable<R>
형태로 변경하고,
(source: Observable<T>) =>
Observable.create(
(subscriber: Subscriber<T>) => source.subscribe(
subscriber.next.bind(subscriber)(toValue),
...
)
);
이 함수를 반환값으로 하는 고차함수를 만든다.
const mapTo = <T, R = T>(toValue: R) => (source: Observable<T>) =>
Observable.create(
(subscriber: Subscriber<T>) => source.subscribe(
subscriber.next.bind(subscriber)(toValue),
subscriber.error.bind(subscriber),
subscriber.complete.bind(subscriber)
)
);
이제 이 고차함수를 다른 연산자들과 함께 사용할 수 있다.
source.let(mapTo('x')).subscribe(...)
let
연산자를 이용하지 않고 다른 기본 연산자들 처럼 .
연결을 그대로 이용하고 싶을 경우 RxJS의 기본 연산자들이 현재 사용하는 것과 마찬가지로 Observable.prototype
과 타입 정보를 수정해 주면 되는데, 특별한 경우가 아니라면 권장하지 않는다. 이에 대한 설명은 가이드라인에 좀 더 상세히 기재되어 있다.
npm
에 위 방법을 통해 구현한 예제 연산자인 endWith
를 참조용으로 업로드 해 두었다. 마찬가지 방법으로 새 연산자를 npm
에 업로드 하면 기본 라이브러를 수정하지 않고 새로운 연산자를 추가로 필요할 때 마다 설치하여 사용가능하다.
몇 가지 추가 사항들
연산자 작성시 RxJS의 가이드라인에서 강조하고 있는 부분들을 유의해야 한다. 제대로 관리되지 않은 subscription
이나 리소스들은 메모리 유출을 유발하고, Observer:error
로 전달되지 않는 예외는 전체 subscription
을 예기치 않은 방법으로 멈추게 만들고 디버깅을 어렵게 한다.
더불어 작성한 연산자의 테스트 케이스는 RxJS의 연산자와 동일한 방법으로 작성할 수 있다. endWith
연산자의 경우 rxjs-testscheduler-bootstrapper
를 이용해 TestSchduler
를 설정하고 RxJS와 동일한 방법으로 테스트를 작성하였다.