Ever since the reactive paradigm arrived on the scene, it immediately started to gain popularity and become one of the most favoured choice for building distributed and highly concurrent cloud-native backends. The performance, scalability and reliability benefits with reactive has inspired the community to do more innovations in this area and those concerted efforts are emerging in the form of new specifications , drivers and protocols to further support reactive adoption across stacks and layers.
One of the reason for popularity of reactive is because it allows to program at higher level of abstraction by using reusable operators to form composable pipelines.
Threads and schedulers that run the pipeline are used efficiently behind the scenes as enablers and abstracted away from developers.
However, in order to develop a deeper understanding of reactive it's very important to un-peel the abstractions and unravel how composability works under the hood.
This post focuses on explaining reactive stream specifications and the pattern that is used to form composable pipelines. The pattern described here is also relevant to understand how ProjectReactor implements and provides reusable operators.
Reactive Streams
In order to arrive at standardization on reactive paradigm for different languages, reactive streams specifications was formulated.
The specification consists of following major APIs which perhaps is a fusion of observer and iterator design pattern. The reactive streams specifications allow for push-pull model with back-pressure control; a concept that was hitherto missing in webserver and webclient libraries.
Publisher
Publisher , as the name suggests, is the source of the stream of events that flow to the subscriber(s)
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber
Subscriber is the entity in the interaction which subscribes to publisher, makes the demand and then receive the stream of the events through onNext
.
Completion of stream is signaled by onComplete
and error is reported by onError.
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscription
Subscription is sort of handle given by publisher to subscriber through which subscriber can control the rate of data emitted to it and cancel/terminate the demand.
public interface Subscription {
public void request(long n);
public void cancel();
}
The richness of reactive is in its range of out-of-the-box operators which are used to form composable pipelines. The operators are essentially various implementations of publisher and subscriber interfaces - explained further.
Reactive Lifecycle
The reactive pipeline is composition of various operators and the pipeline has two phases - assembly and subscription.
Assembly phase:
In this phase the operators are chained together (think LinkedList ).Each operator takes a source publisher as input, wraps it and passes itself to the next operator in the chain.
Creating a fluent pattern for reactive pipeline is not a must-have but it gives it DSL kind of readability and makes it more intuitive and intention revealing.
In order to create fluent pattern, we will put DSL constructs inside a class. In ProjectReactor , this ask is accomplished by Mono and Flux - they both contain the DSL constructs for various operators.
Lets implement a simple Mono to understand the principle
public abstract class MyMono<T> implements Publisher<T> {
static <T> MyMono<T> from(T[] a){
return new ArrayPublisher(a);
}
<R> MyMono<T> map(Function<T,R> function){
return new MapPublisher<T, R>(this,function);
}
MyMono<T> filter(Predicate<T> function){
return new FilterPublisher<T>(this,function);
}
}
With this simplest Mono, we can compose a pipeline like
MyMono.from(IntStream.range(1,10).boxed().toArray(Integer[]::new))
.map(i -> i *2)
.filter(x -> x%4==0)
.subscribe(System.out::println)
map
is resolved to aMapPublisher
which wraps the sourceArrayPublisher
.filter
wrapsMapPublisher
and returnsFilterPublisher
- finally, subscribe is actually called on
FilterPublisher
The above is the assembly phase. It is only weaving the chain of publisher but, the logic (i.e. lambda's inside each operator) is weaved and executed the subscription happens.
We will see this in next section.
Subscription phase
Subscription phase is where the execution happens, like in java streams where nothing happens till we add a terminal operation, similarly, in reactive pipeline nothing happens until a subscriber arrives.
From analogy standpoint, we can think of assembly phase as setting up a conveyer belt and subscription phase is when an item is put on that conveyer belt.
In the pipeline defined above, subscription happens immediately because we have called subscribe with an executable instruction ( i.e System.out::println
) immediately without any delay.
However, if we think of web server scenario then subscription phase begins when a request is made to the server and socket is opened. Reactor netty is an example of this and we will see it in detail in our next blog
For, now lets understand how business logic gets weaved in during subscription phase. We will take the sample pipeline defined below
MyMono.<Integer>from(IntStream.range(1,10).boxed().toArray(Integer[]::new))
.map(i-> i*2)
.filter(i -> i%4==0)
.subscribe(new PrintSubscriber());
For this pipeline, following diagram illustrates the sequence of actions
- The publisher at the root of the operator chain (in this case
ArrayPublisher
) is the one that is responsible to emit/publish the data down the operator chain. It is also the source of subscription instance which is created and passed to subscriber.
public static class ArrayPublisher<T> extends MyFlux<T>{
private T[] a;
public ArrayPublisher(T[] a) {
this.a = a;
}
@Override
public void subscribe(Subscriber<? super T> s) {
s.onSubscribe(new Subscription() {
private AtomicInteger read = new AtomicInteger();
@Override
public void request(long n) {
if(read.get()==a.length){
s.onComplete();
return;
}
while(read.get()<=a.length-1 && n >0){
s.onNext(a[read.getAndIncrement()]);
n--;
}
}
@Override
public void cancel() {
s.onComplete();
}
});
}
}
- The intermediate operators (like
map
,filter
etc) use decorator pattern to augment the actual subscriber with specific business logic pertaining to that operator.
public static class MapPublisher<T,R> extends MyFlux<T>{
Publisher<T> source;
Function<T,R> mapper;
public MapPublisher(Publisher<T> source, Function<T, R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
public void subscribe(Subscriber<? super T> s) {
source.subscribe(new MapSubscriber(s,mapper));
}
}
public static class MapSubscriber<T,R> implements Subscriber<T>{
Subscriber<R> destination;
Function<T,R> mapper;
boolean done;
public MapSubscriber(Subscriber<R> destination,Function<T,R> mapper) {
this.destination = destination;
this.mapper = mapper;
}
@Override
public void onSubscribe(Subscription s) {
destination.onSubscribe(s);
}
@Override
public void onNext(T integer) {
destination.onNext(mapper.apply(integer));
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
if(done){
return;
}
done=true;
System.out.println("Calling from Map");
destination.onComplete();
}
}
Everything starts when you subscribe
Following are the sequence of steps with respect to the diagram above
Step 1 - Chained Subscribes
PrintSubscriber
subscribes toFilterPublisher
FilterPublisher
createsFilterSubscriber
and with that subscribes toMapPublisher
MapPublisher
createsMapSubscriber
and with that subscribes toArrayPublisher
Step 2 - Receive Subscription
ArrayPublisher
creates and passes subscription toMapSubscriber
MapSubscriber
propagates subscription toFilterSubscriber
FilterSubscriber
propagates subscription toPrintSubscriber
Step 3 - Request Data
PrintSubscriber
requests data fromArrayPublisher
by callingrequest(1)
onsubscription
Step 4 - OnNext
ArrayPublisher
emit an element and callsonNext
onMapSubscriber
MapSubscriber
applies mapper function on the element and callsonNext
onFilterSubscriber
FilterSubscriber
uses predicate on received element and iftrue
the callsonNext
onPrintSubscriber
. Iffalse
then requests next element by calling request() on subscriptionPrintSubscriber
prints the element and callsrequest()
on subscription to requestArrayPublisher
for next element
Step 4 - OnComplete
- When all the elements are emitted from
ArrayPublisher
then it callsonComplete
on the subscriber onComplete
is then propagated to all subsequent subscribers
The code for this post is at : github.com/lruchandani/learning-reactive.git
In the next post, we will see an example of reactive webserver using netty and see what happens under the hood when client makes a request to the server. How client establishes the subscription and how server handles and responds to the request in reactive parlance.