I want to post implementation details of Sagas and Process Managers
In my implementation both - Sagas and Process Managers consume event messages and execute some actions in response.
Saga has no state while Process Manager has.
Another difference - Process Manager is a state machine, Saga is not.
So - Saga does not have
- state machine state
- data state (some persisted data)
And Process Manager has
- state machine state
- data state (some persisted data)
Saga can decide what to do only based on content of incoming event.
Process Manager can decide what to do based on
- Content of incoming message
- Current state of state machine
- Current data state
Both Saga and Process Manager has one of two outcomes after processing incoming event:
- Processing was successful
- Exception was thrown
Both Saga and Process Manager are fed with events by Saga / Process Manager Event Provider.
Event provider in both cases
- Marks event as processed if event processing was successful
- Copies event to dead event stream and marks event as processed if event processing raised and exception
Both Saga and Process Manager can use durable event scheduler, the scheduler allows to send event to Saga / Process Manager instance in future - Event Provider will wake up the instance and pass that event to it. Future events can contain information we might be interested in future.
Both Saga and Process Manager are responsible for successful processing - if they cannot process the event - they should use scheduler to schedule another attempt in future, or they should fire compensating transaction logic.
To continue with more technical implementation details - let me introduce two basic structures I use in the framework.
They are
- Event Stream
- Aggregate Root
Event Stream is a class having public API methods (command handlers use them after validating commands). When calling methods on Event Stream - it produces one or more event. Event Stream can be persisted to event store by Event Stream Repository. Event Stream has no state - Event Stream cannot reconstitute event history from event store once repository has written them there. Effectively - Event Stream is write only. (Actually there is a way of reading stream events - just they are not needed to Event Stream itself - because Event Stream does not have state and does not validate against state)
Aggregate Root is Event Sourced Aggregate implementation which has API methods (command handlers use them after validating commands) and based on called API produces events. Aggregate Root should be initialized before usage - initialization is done by fetching whole history of an aggregate using Aggregate Root Repository and reconstituting state. When calling methods on Aggregate Root - they validate against state and if the action is doable - they produce one or more events. After performing some operation on Aggregate Root - it's state is persisted in event store by Aggregate Root Repository.
Aggregate Root class inherits Event Stream class.
Event Stream Repository saves events to event store without optimistic concurrency check.
Aggregate Root Repository does optimistic concurrency check.
So returning to Saga and Process Manager.
Saga class inherits Event Stream class.
Process Manager class inherits Aggregate Root class.
As Saga does not have state - we can run many instances of Saga with the same correlation id but different events - I mean we can process Saga events in parallel - even if they correlate to the same instance.
This also means we can create new instance - fire handler on it and forget right after that - as we dont need to cache anything
You cannot run two Process Managers in parallel if they resolve to same correlation id. The instances will have conflict writing to store, and there will also be non-deterministic racing condition - resulting on random progressing to one state or another based on who wins. Also if from a given state there were two outcome states and only one path should have taken - when run in parallel - there is a possibility of performing both actions and after doing that action one of two would result in error, but it cannot undo action.
The best way to run Process Manager I know - is to make it single instance per system and that single instance would be responsible for caching a state (to not having to reconstitute it before each event). If you use actor framework - that could be some actor with address containing Process Manager name and correlationId.
And if you already read my previous post - Process Managers and Event Sourcing - you can see how I solved this by using Retlang Fibers and Channels per Process Manager instance.
How to model business process using Saga:
Saga defines list of events it is interested in and a lambda expression for each of event - which returns correlation id based on incoming event.
Saga event processing method should use try - catch code construct - and try to do action in response to incoming event by analysing it's content. Action can be of any type - it can be issuing of a new command or sending an email. Saga can be subscribed to many events - so it could wait for successful command execution by waiting for events resulting from it. Also in try block - Saga could use scheduler to wake itself. For example - we want to query for result of a command we issued. Scheduler time can be set to desired timeout value. In a catch block - Saga can reschedule an event - or schedule new - richer event - for example if we model email retry logic - we can schedule new event similar to incoming but enriched with one extra property - retryNumber.
How to model business process using Process Manager:
Process Managers define state machine structure using expression very similar to Given, When Then - known in BDD tests.
Given
- state machine state is "New"
When
- event "UserCreatedEvent" happened
Then
- Execute method named OnUserCreated(UserCreatedEvent e)
Process Manager event processing method should similarly use try - catch block. And in a similar way use scheduler. Just in case of Process Manager - it additionally can
- persist data (Process Manager class inherits Aggregate Root class)
- access persisted data (Process Manager class inherits Aggregate Root class)
- move to state machine state (and thus narrow down what are next events it is interested in, - this is picked up by Event Handler feeding it with events)
In my experience both with Process Managers and Sagas - it is easy to split Process Manager to several smaller Process Managers, but with Sagas - it is easier.
And if you already read my previous post - Process Managers and Event Sourcing - you can see how I can parallelise work by splitting single Process Manager into three small Process Managers.
I find useful to use UML Sequence Diagram to visualise business process between many Sagas or Process Managers.
3 comments:
This doesn't really sound like a saga to me.
See: http://vasters.com/clemensv/2012/09/01/Sagas.aspx
It is implemented in context of what was available to me in terms of infrastructure (I don't have dependency on queueing system).
So essentially - it is not different - I can create Saga named CarHotelFlightSaga,
create an event named StartBookingEvent
Subscribe Saga to two events - StartBookingEvent and SagaProgressEvent
SagaProgressEvent can contain complete routing slip
StartBookingEvent handler will issue initial SagaProgressEvent with shiny new routing slip
SagaProgressEvent handler will inspect message content including routing slip and do compensating or proceed to next step.
Sending to queue addresses can be done using message content manipulation right before re-issuing event.
Or - another possibility - address of queue channel can be saga stream name which is saga name + correlationid.
There could be many small non-correlated sagas (in classic terms - working as single event handlers - aka endpoints) instead of one - and they could have different correlation ids - like having different queue channel addresses, and they could share event with routing slip inside it.
Would be awesome to actually see your code for your implementation.
Post a Comment