Building an actor with RxJS

The actor model - an actor in RxJS
profile photo
Julien Zolli
An actor starts with an adress, an initial state and a onMessage function to deal with incoming messages. It processes one message at a time.
The function that deals with incoming messages can
  • send new messages to known actors adresses
  • create new actors (which adress will then be known)
  • adapt the reaction to future messages (i.e. change its state)

Actor

An actor is a unit of computation that communicates through messages.
We decide to design it with
a system : holds the channel$ stream passing messages and other convenient functions common to multiple actors like logging.
a _state$ behaviorSubject with an initial value
_behaviors : a dictionnary of behavior$ per messageType.
A behavior$(message, state$, system) returns an observable that is triggered when receiving a message with the right messageType and that completes when it is done dealing with this message.
If the behavior$ doesn’t complete, then the actor is crashed, as it can’t process any new message.
For instance a reducer function that takes a message, a state and a system and returns a newState is turned into a behavior$ this way.
javascript
reducer$ = reducer => (message, state$, system) => { return new Observable(subscriber => { state$.next(reducer(message, state$.value, system)) subscriber.complete() }); }
Hooking an actor to a system is therefore equivalent to the following subscription
javascript
system.channel$.filterTypes(Object(_behaviors$).keys()).concatMap( message => _behaviors$[message.type](message, _state$, system)).subscribe()
Where filterTypes is a simple operator that filters types of messages.
javascript
filterTypes = types => {filter(message => types.includes(message.type))}
An actor has a subscribe methods that subscribes to the _state$ as a new observable.

Examples

The usual counter example !
javascript
import {createActor, send} from ./system import {reducer$} from ./utils const counter = createActor( "COUNTER", { INCREMENT: reducer$((_, state) => state+1), DECREMENT: reducer$((_, state) => state-1), }, 0 ) counter.subscribe(console.log) send({type:INCREMENT}) // Logs 1
NOTE : It is possible to “get” an actor that isn’t set yet, as the actor(actorName) method returns an empty actor.
javascript
import {get, create, send} from ./system import {COUNTER, INCREMENT, DECREMENT} from ./constants get(COUNTER).subscribe(console.log) const counter = create( COUNTER, { INCREMENT: reducer$((_, state) => state+1), DECREMENT: reducer$((_, state) => state-1), }, 0 ) send({type:INCREMENT}) // Logs 1

A simple actor

A new actor requires an adress, optional initial_state and a onMessage function.
An actor has a mailbox$ (RxJS subject), a private _state$ (RxJS BehaviorSubject) and a private list of _adressBook (adresses of actors that it has created)
Sending a message to an actor is therefore equivalent to pushing next in its mailbox.
Once the actor is created, it can only respond to messages. Even to call itself, it should in theory go through a message.
Most of the time
  • the initial_state will be an RxJS BehaviourSubject with a given initial value.
  • the onMessage function will be a function that takes a state and a message and returns a new state. This function CAN be impure, render, send messages, create actors, fetch content…
The onMessage function is used in an observable operation that ensures that each message is dealt iteratively.
javascript
mailbox$.concatMap( message => _state$.pipe( map(state => onMessage(message, state)), tap(newState => _state$.next(newState)) )
An actor has a createChild method that creates a new actor and stores its adress in the _adressBook property.

Huge simplification toward pubsub

System

Like in Akka we need a “system” that will be able to pass messages from one actor to the other and create actors.
The channel could have simply been an actor, in charge of spawning the actors and passing through messages to the right ones… But this would have killed all concurrency in message passing if we kept the idea of one message at a time !
Our channel is a singleton that isn’t accessible directly.
In a near future this channel will be able to dispatch and receive message by connecting to Browser API channels

Actors are publisher/subscriber

We decided that every actor will send messages to all the other actors and each actors will only keep the one it likes !!!
Therefore we no longer need adresse.
Like in the akka actors, we decided to replace the onMessage (single reducer) function by a dictionnary of behaviors where the key is the name of the message that triggers the behavior.

Actor mixins

Let’s say we want all our actors to react to the same message the same way.
For instance, we would like them to returnState to the sender when they receive a returnState message.
{”returnState” : {to:[sender_address]}}
Enabling a child actor to send a message to its parent actor is fairly common.
How do we do that ? With actor mixins !!

Limits

If you try to create a pure actor model, you’ll rapidly feel constrained by vizualising and maintaining the “actor reference” tree.
Sending onne message to one adress is also a huge pain and in the end as it requires knowledge of each actor reference, to send multiple time the same message… A bit like in the imperative world.
Related posts
post image
Core concepts of Potions Reactive framework
post image
Reactive framework
Choosing RxJS
Observable subscription is the glue missing to Javascript
post image
Reactive frameworks : state of the art
Powered by Notaku