Observables represent a progressive way of handling events, async activity, and multiple values. Another way to phrase it is that observables are functions that throw values.
Objects call observers define callback functions for next()
, error()
, and complete()
.
The following is an Observable
that pushes the values (1, 2, 3) synchronously when subscribed, pushes the value 4 after one second has passed since the subscribe call, and then completes:
import { Observable } from 'rxjs';const observable = new Observable(subscriber => {subscriber.next(1);subscriber.next(2);subscriber.next(3);setTimeout(() => {subscriber.next(4);subscriber.complete();}, 1000);});
Observables are not like EventEmitters
, nor are they like Promises for multiple values. Observables may act like EventEmitters
in some cases, namely when they are multicasted using RxJS Subjects, but they don’t typically act like EventEmitters
.
import { Observable } from 'rxjs';const foo = new Observable(subscriber => {console.log('Hi');subscriber.next(21);});foo.subscribe(x => {console.log(x);});foo.subscribe(y => {console.log(y);});
"Hi"
21
"Hi"
21
Observables are:
new Observable
or a creation operatornext
/ error
/ complete
notifications to the Observer, and their execution may be disposed.These aspects are all encoded in an Observable instance, but some of these aspects are related to other types, like Observer and Subscription.
Core Observable
concerns:
Observable
Observer
is a consumer of values delivered by an Observable
. Observers are simply a set of callbacks. There is one for each type of notification produced by the Observable
: next
, error
, and complete
.
const observer = {next: n => console.log('Observer got next value: ' + n),error: err => console.error('Observer got an error: ' + err),complete: () => console.log('Observer got a complete notification'),};
To use the Observer
, provide it to the subscribe of an Observable
:
observable.subscribe(observer);
Observers are just objects with three callbacks, one for each type of notification that an Observable
may deliver.
Observers in RxJS may also be partial. If you don’t provide one of the callbacks, the execution of the Observable
will still happen normally, except some types of notifications will be ignored. This is because they don’t have a corresponding callback in the Observer
.
The example below is an Observer
without the complete callback:
const observer = {next: x => console.log('Observer got next value: ' + x),error: err => console.error('Observer got an error: ' + err),};
Operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner.
Operators are functions. There are two kinds of operators:
Pipeable operators can be piped to observables using the syntax observableInstance.pipe(operator())
. This includes filter(...)
and mergeMap(...)
.
When called, pipeable operators do not change the existing Observable instance. Instead, they return a new Observable
whose subscription logic is based on the first Observable
.
A Pipeable Operator is a function that takes an Observable
as its input and returns another Observable
. It is a pure operation, so the previous Observable
stays unmodified.
Creation operators can be called as standalone functions to create a new Observable.
import { interval } from 'rxjs';const observable = interval(1000 /* number of milliseconds */);
Free Resources