Part 2: Operators
Our very first example returned a stream of click events on a particular element. But, most of the time, the only part of a pointer event that you care about is where somebody clicked. Let’s make an operator that takes a stream of pointer events and returns a stream of points.
As you’ve likely noticed, an instance of an Observable is called a stream. A stream represents a series of values over time, but we often need to refer to those values individually too. Thus, we have a convention: a stream’s name ends in
$
, but its individual values do not. In our case, the stream is calledpointerEvent$
, but each individual value is justpointerEvent
. This makes it clear to someone reading our code when we’re referring to a stream, and when we’re referring to a value from that stream.
First, let’s write the connect function. The event source that it’s going to connect the observer to is the pointer event stream.
Note: To make the code easier to follow, I’m going to pass an anonymous
function into subscribe
. The connect function expects an observer, but
subscribe
knows to wrap a function with one before passing it along.
function connectObserverToPointerEvent$(observer) {
pointerEvent$.subscribe(
(pointerEvent) => {
observer.next({
x: pointerEvent.pageX,
y: pointerEvent.pageY
})
}
});
}
This is pretty similar to the connect
functions we wrote earlier. Instead
of calling addEventListener
, we’re calling pointerEvent$.subscribe
, but
the concept is identical.
Let’s wrap that connect
function in another function that returns our
point$
:
function createPoint$(pointerEvent$) {
return new IndefiniteObservable(
(observer) => {
const subscription = pointerEvent$.subscribe(
(pointerEvent) => {
observer.next({
x: pointerEvent.pageX,
y: pointerEvent.pageY
})
}
);
return subscription.unsubscribe;
}
);
}
createPoint$(pointerEvent$).subscribe(
({ x, y }) => {
console.log(`The pointer is at (${ x }, ${ y }).`);
}
);
Pretty cool, huh? We can take any stream of pointer events (click, down,
move, up, etc.) and turn it into a stream of { x, y }
. The observer
doesn’t know (or care) that those x
and y
s came from a pointer event. We
can write observers that think of the world in terms of points and let our
little operator worry about morphing pointer events into simple points.
I must admit, though: createPoint$
is a pretty dense function. Operators
are what make streams powerful, but that was a lot of code for what was
effectively (event) => ({ x: event.pageX, y: event.pageY })
. Let’s write
a higher order function that lets us reuse all that boilerplate for other
operators:
// source is our input stream; transform is the function we want
// to apply to all the values on that input stream.
function makeOperator(source, transform) {
return new IndefiniteObservable(
(observer) => {
const subscription = source.subscribe({
next(value) {
observer.next(
transform(value)
);
}
});
return subscription.unsubscribe;
}
);
}
const point$ = makeOperator(
pointerEvent$,
pointerEvent => ({
x: pointerEvent.pageX,
y: pointerEvent.pageY
})
);
Great - now we can write just our transformation function and let
makeOperator
do all the subscribing work for us.
But if we wanted to add another transformation function, we’d have to call
makeOperator
again. That’s hard to read:
const x$ = makeOperator(
makeOperator(
pointerEvent$,
pointerEvent => ({
x: pointerEvent.pageX,
y: pointerEvent.pageY
})
),
point => point.x
)
Let’s make our own class to store our operators on:
class CustomObservable extends IndefiniteObservable {
// makeOperator applies a function to every item in a
// sequence and returns the transformed sequence, just
// like map on an Array. So, let's call it `map`.
map(transform) {
return new CustomObservable(
observer => {
const subscription = this.subscribe({
next(value) {
observer.next(
transform(value)
)
}
});
return subscription.unsubscribe;
}
);
}
// If you have a stream of objects and you just care
// about one value in each object, use pluck.
pluck(key) {
return this.map(dict => dict[key]);
}
// tap lets us inspect the pipeline by calling a function
// on every value without affecting the rest of the
// pipeline. It's really useful for logging.
tap(transform) {
return this.map(
value => {
transform(value);
return value;
}
);
}
}
See how much nicer it is to write operators like pluck
and tap
now that
we’ve abstracted all the stream-creation boilerplate into map
? They can
focus on just their own logic and not even have to think about streams.
And since we’ve stored the operators in a class, chaining them together is easier too:
const x$ = pointerEvent$.map(
pointerEvent => ({
x: pointerEvent.pageX,
y: pointerEvent.pageY
})
).pluck('x');