Sunday, November 8, 2015

Process Managers and Event Sourcing

In my latest application I use Domain Driven Design, Event Sourcing and Command-Query Responsibility Segregation patterns.

I implemented ES infrastructure layer for SQL Server.

To read events back from event store I use polling mechanism and also "fast channel" (you can read details here)

I need to read events for two reasons:
1) To build read model
2) To process events by using Process Managers

So let me explain first that in my implementation - in database there is a global index of all events written to a database. This means when event store writes an event - it is written into some event stream (implemented as SQL Table in infrastructure) and also that event is indexed in some central index called "All".

The index contains
- Global sequence number called Position
- Stream name
- Version

The poller polls that index and if there are new events after some global position - fetches them and feeds to event handlers.

As I said there are two types of event handlers - for rebuilding RM and for running business processes.

There is very important difference between those two handlers though -

- Read model should be rebuilt in a sequential order - event after event. We cannot parallelize this process - otherwise we will never have consistent snapshot of read model - which must be eventually consistent. (I still think about this "must", I hope there is a solution - otherwise it is really not a good thing to rely on sequential processing in distributed world)

- On the other hand business processes are modeled using Process Managers which are effectively state machines. Process Managers are ready for asynchronous and out of sequence events - because they have own state. For example given we have Order Processing Process Manager and two events came into it - out of order: CreditCardChargedEvent and ItemReservedEvent, - it can use its state to collect this information and fire "ShipItemsCommand" command when all necessary events are consumed.

Until now I had simplistic "Process Manager" processor which executes Process Managers in sequential order,
What I wanted - is to use some kind of queue and competitive consumers + actor model using existing infrastructure, - I mean without adding dependency on some queuing system or actor model frameworks.

So I sketched my thoughts how I would implement this.

Rings on a pic are sequential event providers (using Disruptor structure), "all" is global index of all Process Manager "inbox" streams (similar to actor mailboxes in actor model - they play very important role in deduplication process and restarts of in-memory actors along with loss of in-memory inbox queue), subscriber is persistent table containing Process Manager stream positions (for deduplication) and global "all" position - for catching up with "all" for top right handler. After top right ring handler gets events - it should somehow create actor-like objects, one for each Process Manager instance (differentiated by correlationId + Process Manager name),
for creating actor-like objects I used Retlang library,

The actors are in fact simple class instances having state and working one event at a time - - while in reality they work in thread pool and in parallel,

They persist their state (not shown here) and finally submit their positions
1) to subscriber table
2) to bottom right ring which has one handler persisting lowest sequential processed "all" position

(Everything what's shown on the pic above actually is persisted in separate event store to the original - from where events are coming. The only part of pic which involves original event store - is two arrows on the left of the pic - where fast and slow channels bring new events to process through in memory/in process notifications(fast) and persistent catch up subscription (slow))

So the process has the following flow:
Left Ring gets events from global list (original event store) and feeds them back into event store (worker event store) - per Process Manager CorrelationId - given the Process Managers define the logic of retrieving correlation by event - this is easily doable. - For example if there is some event named "UserCreatedEvent" and there are three Process Managers defined in the system and all three are interested in an event - three streams will be created with single event copied three times (in worker event store). Stream names will include correlation identifier calculated using given event.
During this step it is important to write event to stream only once - so I used unique index to check whether eventId was already written to a stream - that ensures I have idempotently written event for processing only once.
Once persisted again with new stream version in a new stream (e.g. old in original ES it was 26974 - in new it could be 0 - given it is first event to be processed in Process Manager named "identity-user-processManager-ffcf1c15-4bf1-4eaf-ad0d-6d2857cd5b09") - the event is in a durable storage and is ready for eventually be processed by corresponding Process Manager instance.

There is a global index of all events written to per-Process Manager streams - here it is called "all" (it is actually different All to original event store - I call it worker store - which is index of second-time written per PM stream events)

Top right ring reads events from that stream (worker.All) in a sequence and asynchronously feeds them into per-actor "fibers" (see retlang unit test and docs - fibers are just blocking queues for actors) - which are in-memory actor inboxes.

Actors work by processing events in FIFO inboxes and persisting per PM stream version of an event into worker.Subscriber - thus marking farthest position in the stream where they progressed.

There is one more ring - bottom right ring which just gets the lowest sequential processed Position of PM global index - which is used by top-right ring to fetch events after that position after restarts/failures.

For better performance handler of right-bottom ring does not persist lowest sequential processed Position after each notification, - it persists it once a second (actually this is configurable)

If after restart events (because of throttling lowest position persisting) appear second time to the same actor - actors ignore it - as they have their own position which is event version of their inboxes.

Subjective feeling and teem mates feedback are very satisfying - system is really fast

We can control what to parallelize by creating separate instance of Process Manager using new correlationId.

As last thought I would like to explain why I used Process Manager term versus Saga which is often used instead.

Actually I use another construct in the framework which is more like a Saga but I will leave this to next post. I just want to call things their real name - Process Manager is a known messaging pattern which describes a messaging system element which can persist it's state - it receives events, modifies its state and issues commands or other messages based on the state it has. Saga  -  does not have central state - it relies fully on state inside incoming message to decide what to do next.







No comments: