In most cases you probably won’t need to write own operator for RxJS. Version 5 of RxJS has nearly 100 operators built in from very default operator like map
to complex one like mergeScan
supports various use cases. If you came to feel to write custom operator lot of cases it could be resolved by simple combination of existing operators. But still, there are some edge cases default operators can’t cover and require to write complete new operator.
I sometimes get proposals of adding those custom operators into core library and mostly closing without accepting it. RxJS 5 and beyond trying to accomplish smaller set of library in terms of physical byte size and number of api surface both, for those reason strongly recommend to create custom operator as own separate module to opt-in as needed. Even operator is not included in core set you can still use those new operators as similar to default operators - wrote small guides to create / consume operators without upstream PR to modify core. If you’d like to read additional details, refer RxJS’s guideline.
Writing operator as a function
RxJS 5’s lead Ben Lesh wrote post Learning Observable By Building Observable
in detail about this worth to read. In short, in RxJS operator is a function takes an Observable
, and returns new Observable
subscribes to source when it’s being subscribed. Consider function noop
just returns new Observable
.
function noop<T>(source: Observable<T>): Observable<T> {
return Observable.create(
(subscriber: Subscriber<T>) => source.subscribe(subscriber)
);
}
Executing it and compare to original observable will have same results.
const source = Observable.of(1);
source.subscribe(console.log.bind(console)); // 1
noop(source).subscribe(console.log.bind(console)); // 1
Now you just created noop
operator which does nothing. Since we defined input and output of operator we can freely define actual behavior of operator. Let’s try to change noop
into one of simplest operator mapTo
.
function mapTo<T, R = T>(source: Observable<T>, toValue: R): Observable<R> {
return Observable.create(
(subscriber: Subscriber<T>) => source.subscribe(
subscriber.next.bind(subscriber)(toValue),
subscriber.error.bind(subscriber),
subscriber.complete.bind(subscriber)
)
);
}
mapTo(noop(source), 'x').subscribe(console.log.bind(console)); // 'x'
Not much differences, only changing subscription behavior makes operator behaves differently.
Internally RxJS uses class for implementing operator and uses lift
interface to create new Observable
. You can use those approach instead of functions, RxJS guideline covers it briefly. Usually if you’d like to extend Observable
class itself and would like to add new operators you may need override lift
, otherwise using a function would be bit more simpler.
let
the operator works
Function we wrote above is a legit operator of RxJS but ergonomics are slightly different to how default operator is being used. Say new custom operator is being called multiple time,
noop(noop(noop(source))).map(...).subscribe();
source.map(...).filter(...).subscribe();
it becomes multiple wrapped calls to function compare to default operator’s operator chaining via .
notation. RxJS have a specific operator for this uses cases named let
, can wrap function and chain it.
source.let(noop).let(noop).map(...).subscribe();
For more complex operator requires other input parameter than source observable operator implementation signature need to be changed a bit. First change operator implementation to conform let
’s input parameter signature (o: Observable<T>) => Observable<R>
,
(source: Observable<T>) =>
Observable.create(
(subscriber: Subscriber<T>) => source.subscribe(
subscriber.next.bind(subscriber)(toValue),
...
)
);
then create a higher order function returns this function.
const mapTo = <T, R = T>(toValue: R) => (source: Observable<T>) =>
Observable.create(
(subscriber: Subscriber<T>) => source.subscribe(
subscriber.next.bind(subscriber)(toValue),
subscriber.error.bind(subscriber),
subscriber.complete.bind(subscriber)
)
);
Now you can chain this function as operator via let
.
source.let(mapTo('x')).subscribe(...)
If you’d like to use .
chain notation without let
operator you may need to patch prototype of Observable
and augment type definition as similar to core RxJS implementation as mentioned in guideline. I do not recommend prototype patching in usual cases unless there’s specific need to do it. Most cases chaining via let
works quite well.
Once operator’s implemented it can be published into npm
as separate module. Application code can opt-in this operator if new operator is needed. I have created example operator implementation endWith
for reference.
Few small additional notes
Regardless of way to write custom operator, ensure you follow guidelines
. It’s mostly contract should follow such as managing internal resources or subscription
, or managing exception to forward Observer::error
as handled manner. Otherwise it’ll cause memory leaks, or unexpected subscription termination, or any other unexpected side effects really hard to debug.
Writing test cases for newly written operator can follow exact same approach as RxJS core does using marble diagram. endWith
implementation have those examples using rxjs-testscheduler-bootstrapper
to bootstrap RxJS’s TestScheduler
.