Wednesday, November 11, 2015

Saga vs. Process Manager

As I already blogged recently - I build application using Domain Driven Design, Command Query Responsibility Segregation and Event Sourcing patterns.

I want to post implementation details of Sagas and Process Managers

In my implementation both - Sagas and Process Managers consume event messages and execute some actions in response.

Saga has no state while Process Manager has.

Another difference - Process Manager is a state machine, Saga is not.

So - Saga does not have
- state machine state
- data state (some persisted data)

And Process Manager has
- state machine state
- data state (some persisted data)

When event is passed to Saga / Process Manager:

Saga can decide what to do only based on content of incoming event.

Process Manager can decide what to do based on
- Content of incoming message
- Current state of state machine
- Current data state

Both Saga and Process Manager has one of two outcomes after processing incoming event:
- Processing was successful
- Exception was thrown

Both Saga and Process Manager are fed with events by Saga / Process Manager Event Provider.

Event provider in both cases
- Marks event as processed if event processing was successful
- Copies event to dead event stream and marks event as processed if event processing raised and exception

Both Saga and Process Manager can use durable event scheduler, the scheduler allows to send event to Saga / Process Manager instance in future - Event Provider will wake up the instance and pass that event to it. Future events can contain information we might be interested in future.

Both Saga and Process Manager are responsible for successful processing - if they cannot process the event - they should use scheduler to schedule another attempt in future, or they should fire compensating transaction logic.

To continue with more technical implementation details - let me introduce two basic structures I use in the framework.

They are
- Event Stream
- Aggregate Root

Event Stream is a class having public API methods (command handlers use them after validating commands). When calling methods on Event Stream - it produces one or more event. Event Stream can be persisted to event store by Event Stream Repository. Event Stream has no state - Event Stream cannot reconstitute event history from event store once repository has written them there. Effectively - Event Stream is write only. (Actually there is a way of reading stream events - just they are not needed to Event Stream itself - because Event Stream does not have state and does not validate against state)

Aggregate Root is Event Sourced Aggregate implementation which has API methods (command handlers use them after validating commands) and based on called API produces events. Aggregate Root should be initialized before usage - initialization is done by fetching whole history of an aggregate using Aggregate Root Repository and reconstituting state. When calling methods on Aggregate Root - they validate against state and if the action is doable - they produce one or more events. After performing some operation on Aggregate Root - it's state is persisted in event store by Aggregate Root Repository.

Aggregate Root class inherits Event Stream class.

Event Stream Repository saves events to event store without optimistic concurrency check.

Aggregate Root Repository does optimistic concurrency check.

So returning to Saga and Process Manager.

Saga class inherits Event Stream class.

Process Manager class inherits Aggregate Root class.

As Saga does not have state - we can run many instances of Saga with the same correlation id but different events - I mean we can process Saga events in parallel - even if they correlate to the same instance.
This also means we can create new instance - fire handler on it and forget right after that - as we dont need to cache anything

You cannot run two Process Managers in parallel if they resolve to same correlation id. The instances will have conflict writing to store, and there will also be non-deterministic racing condition - resulting on random progressing to one state or another based on who wins. Also if from a given state there were two outcome states and only one path should have taken - when run in parallel - there is a possibility of performing both actions and after doing that action one of two would result in error, but it cannot undo action.
The best way to run Process Manager I know - is to make it single instance per system and that single instance would be responsible for caching a state (to not having to reconstitute it before each event). If you use actor framework - that could be some actor with address containing Process Manager name and correlationId.
And if you already read my previous post - Process Managers and Event Sourcing - you can see how I solved this by using Retlang Fibers and Channels per Process Manager instance.

How to model business process using Saga:

Saga defines list of events it is interested in and a lambda expression for each of event - which returns correlation id based on incoming event.

Saga event processing method should use try - catch code construct - and try to do action in response to incoming event by analysing it's content. Action can be of any type - it can be issuing of a new command or sending an email. Saga can be subscribed to many events - so it could wait for successful command execution by waiting for events resulting from it. Also in try block - Saga could use scheduler to wake itself. For example - we want to query for result of a command we issued. Scheduler time can be set to desired timeout value. In a catch block - Saga can reschedule an event - or schedule new - richer event - for example if we model email retry logic - we can schedule new event similar to incoming but enriched with one extra property - retryNumber.

How to model business process using Process Manager:

Process Managers define state machine structure using expression very similar to Given, When Then - known in BDD tests.

- state machine state is "New"
- event "UserCreatedEvent" happened
- Execute method named OnUserCreated(UserCreatedEvent e)

Process Manager event processing method should similarly use try - catch block. And in a similar way use scheduler. Just in case of Process Manager - it additionally can
- persist data (Process Manager class inherits Aggregate Root class)
- access persisted data (Process Manager class inherits Aggregate Root class)
- move to state machine state (and thus narrow down what are next events it is interested in, - this is picked up by Event Handler feeding it with events)

In my experience both with Process Managers and Sagas - it is easy to split Process Manager to several smaller Process Managers, but with Sagas - it is easier.

And if you already read my previous post - Process Managers and Event Sourcing - you can see how I can parallelise work by splitting single Process Manager into three small Process Managers.

I find useful to use UML Sequence Diagram to visualise business process between many Sagas or Process Managers.

Sunday, November 8, 2015

Process Managers and Event Sourcing

In my latest application I use Domain Driven Design, Event Sourcing and Command-Query Responsibility Segregation patterns.

I implemented ES infrastructure layer for SQL Server.

To read events back from event store I use polling mechanism and also "fast channel" (you can read details here)

I need to read events for two reasons:
1) To build read model
2) To process events by using Process Managers

So let me explain first that in my implementation - in database there is a global index of all events written to a database. This means when event store writes an event - it is written into some event stream (implemented as SQL Table in infrastructure) and also that event is indexed in some central index called "All".

The index contains
- Global sequence number called Position
- Stream name
- Version

The poller polls that index and if there are new events after some global position - fetches them and feeds to event handlers.

As I said there are two types of event handlers - for rebuilding RM and for running business processes.

There is very important difference between those two handlers though -

- Read model should be rebuilt in a sequential order - event after event. We cannot parallelize this process - otherwise we will never have consistent snapshot of read model - which must be eventually consistent. (I still think about this "must", I hope there is a solution - otherwise it is really not a good thing to rely on sequential processing in distributed world)

- On the other hand business processes are modeled using Process Managers which are effectively state machines. Process Managers are ready for asynchronous and out of sequence events - because they have own state. For example given we have Order Processing Process Manager and two events came into it - out of order: CreditCardChargedEvent and ItemReservedEvent, - it can use its state to collect this information and fire "ShipItemsCommand" command when all necessary events are consumed.

Until now I had simplistic "Process Manager" processor which executes Process Managers in sequential order,
What I wanted - is to use some kind of queue and competitive consumers + actor model using existing infrastructure, - I mean without adding dependency on some queuing system or actor model frameworks.

So I sketched my thoughts how I would implement this.

Rings on a pic are sequential event providers (using Disruptor structure), "all" is global index of all Process Manager "inbox" streams (similar to actor mailboxes in actor model - they play very important role in deduplication process and restarts of in-memory actors along with loss of in-memory inbox queue), subscriber is persistent table containing Process Manager stream positions (for deduplication) and global "all" position - for catching up with "all" for top right handler. After top right ring handler gets events - it should somehow create actor-like objects, one for each Process Manager instance (differentiated by correlationId + Process Manager name),
for creating actor-like objects I used Retlang library,

The actors are in fact simple class instances having state and working one event at a time - - while in reality they work in thread pool and in parallel,

They persist their state (not shown here) and finally submit their positions
1) to subscriber table
2) to bottom right ring which has one handler persisting lowest sequential processed "all" position

(Everything what's shown on the pic above actually is persisted in separate event store to the original - from where events are coming. The only part of pic which involves original event store - is two arrows on the left of the pic - where fast and slow channels bring new events to process through in memory/in process notifications(fast) and persistent catch up subscription (slow))

So the process has the following flow:
Left Ring gets events from global list (original event store) and feeds them back into event store (worker event store) - per Process Manager CorrelationId - given the Process Managers define the logic of retrieving correlation by event - this is easily doable. - For example if there is some event named "UserCreatedEvent" and there are three Process Managers defined in the system and all three are interested in an event - three streams will be created with single event copied three times (in worker event store). Stream names will include correlation identifier calculated using given event.
During this step it is important to write event to stream only once - so I used unique index to check whether eventId was already written to a stream - that ensures I have idempotently written event for processing only once.
Once persisted again with new stream version in a new stream (e.g. old in original ES it was 26974 - in new it could be 0 - given it is first event to be processed in Process Manager named "identity-user-processManager-ffcf1c15-4bf1-4eaf-ad0d-6d2857cd5b09") - the event is in a durable storage and is ready for eventually be processed by corresponding Process Manager instance.

There is a global index of all events written to per-Process Manager streams - here it is called "all" (it is actually different All to original event store - I call it worker store - which is index of second-time written per PM stream events)

Top right ring reads events from that stream (worker.All) in a sequence and asynchronously feeds them into per-actor "fibers" (see retlang unit test and docs - fibers are just blocking queues for actors) - which are in-memory actor inboxes.

Actors work by processing events in FIFO inboxes and persisting per PM stream version of an event into worker.Subscriber - thus marking farthest position in the stream where they progressed.

There is one more ring - bottom right ring which just gets the lowest sequential processed Position of PM global index - which is used by top-right ring to fetch events after that position after restarts/failures.

For better performance handler of right-bottom ring does not persist lowest sequential processed Position after each notification, - it persists it once a second (actually this is configurable)

If after restart events (because of throttling lowest position persisting) appear second time to the same actor - actors ignore it - as they have their own position which is event version of their inboxes.

Subjective feeling and teem mates feedback are very satisfying - system is really fast

We can control what to parallelize by creating separate instance of Process Manager using new correlationId.

As last thought I would like to explain why I used Process Manager term versus Saga which is often used instead.

Actually I use another construct in the framework which is more like a Saga but I will leave this to next post. I just want to call things their real name - Process Manager is a known messaging pattern which describes a messaging system element which can persist it's state - it receives events, modifies its state and issues commands or other messages based on the state it has. Saga  -  does not have central state - it relies fully on state inside incoming message to decide what to do next.

Wednesday, November 4, 2015

Reducing Latency in CQRS Applications

Its almost one year and a half since I started practicing event sourcing architecture. The application I am working on is implemented using domain driven design, event sourcing, and CQRS (Command-Query Responsibility Segregation).

One of the most challenging tasks for me during this journey was minimizing latency between write and read models.

The classical model which we implemented worked really slow - the latency was around one second, while recommended latency should be considerably lower.

The model is shown of the picture:

The reason of a high latency mainly was the path that event messages had to travel to get to backend server - which then updated read model accordingly.

We use our own implementation of event store which runs on SQL Server.

In my implementation the backend server was using catch-up subscription implemented as poller  to global sequenced list of events in event store. So the backend server was polling events arrived to global list after the global "position" which was last position the backend server already processed.

The system experienced the lag during this process, because event was firs persisted and then eventually polled.

To get rid of the lag, I started to think how to notify backend server about new events using alternative, faster mechanism.

If command processor and backend event handler are hosted in a single process - that faster mechanism could be in-process queue.

If they are on different servers I could use some fast in-memory (not persisted) queue.

But what should I do when messages are lost - because for example in-memory queue was down, or what should I do when backend process restarts - in that moment of time there are some unprocessed events in event store so Read Model should catch up with it.

I came up with the solution - where there will be two channels by which events are provided to backend event processor.

One is durable channel as on the picture above, - another is fast in-memory or in-process channel.

When system operates normally - fast channel will always be ahead of durable channel, when system is restarted, or when there are lost messages - those are delivered through catch up subscription.

To implement this I needed somehow join those two channels into one, and subscribe backend event handler to that one joined channel.

Given you know event global position (sequence number) at the moment events are saved to event store - this is possible.

The last thing to solve at this point was doing it in efficient and thread safe way.

I found great library named Disruptor, which is open source, and is exactly what I needed - LMAX Disruptor

The library provided ring buffer construct, which allows multiple publishers to push messages to predefined sequenced cell in a thread safe way, and also have multiply subscribers getting those messages in a strict sequential way - one cell a time. This way whichever channel insert message into next cell, that would trigger subscriber to progress to that cell and process message inside it.

The implementation was really simple.
Old catch up subscription was modified to insert entries into Disruptor instead of passing to event handlers directly

Event handler was modified to become subscriber of Disruptor, it also was modified to persist global position of processed events after processing.

Event store interface's Append method was modified to try to push messages to Disruptor right after events are persisted.

Using this solution - the lag between write model and read model has practically disappeared - as events are processed almost instantly after they are persisted.