Flow Manager Configuration
The Flow Manager needs some environment variables, and a configurations file to work.
Moreover, it requires a service to interact with to get and store saga information during the flow.
Environment variables
Most configurations (almost all actually) are taken from the configurations file; for this the Flow Manager needs two environment variables only, or:
- SAGA_ID_PREFIX: the prefix to prepend to the ID of the Saga generated by the service
- CONFIGURATIONS_FILE_PATH: the file path of the service configurations file
Configurations file
The configurations file required by the Flow Manager is in the JSON format and is divided in three sections:
- communicationProtocols: the section dedicated to the Communication protocols used to communicate with other services
- persistencyManagement: the section that contains the configurations of the Persistency manager used to get and update the saga
- machineDefinition: the core section, with the Finite state machine configurations, used by the Flow Manager to drive the saga through the flow
- settings: the section dedicated to general Settings. If not defined, no additional settings are enabled.
The configurations file is validated by the Flow Manager, pay attention to the schema rules, or the service will not be deployed. The first three sections above are required and must not be empty when you deploy.
Following the details of each section.
Communication Protocols
It contains a list of JSON Objects with the configurations of the communication protocol (channel, from now on).
NB. each channel could have a custom configurations schema.
Each channel type must be one of the supported types. Following the supported channel types:
- kafka: the channel that will be connected to an Apache Kafka server and will allow the service to publish/consume messages (commands/events)
- REST: the channel that will allow the service to send commands and receive events through REST API
Kafka communication protocol
This channel type allows the service to connect to a kafka server and consume/produce messages on it.
The channel properties are the following:
- id (required): the id of the channel, will be used in the Machine Definitions to choose the channel to use for messages
- type (required): the type of the channel, must be kafka
- configurations (required): all other kafka configurations:
- brokers (required):
- a list of strings that represent the kafka brokers, or:
- a string with the comma separated list of brokers
- inputTopics: a list of strings that contains the topics to subscribe to consume messages; NB. this property is required if the property outputTopics is missing
- outputTopics: a list of strings that contains the topics to send messages; NB. this property is required if the property inputTopics is missing
- consumerGroup: the consumer group id, will be used by the Kafka Consumer to consume messages; NB. this property is required if there's the inputTopics one
- authenticationProperties: a JSON Object with the kafka authentication properties. It must be KafkaJs compliant
- brokers (required):
The Flow Manager behavior will be the following:
- for each channel with the property outputTopics valorized, it will create a Kafka Producer to send messages on, when the channel is the chosen one to provide a message
- for each channel with the property inputTopics valorized, it will create a Kafka Consumer to subscribe and will start listen to it
Following some examples with explanation.
Kafka - consumer only - ssl auth - brokers as list
{
"id": "myKafkaConsumer",
"type": "kafka",
"configurations": {
"brokers": [
"myKafkaNode1:9092",
"myKafkaNode2:9092",
],
"inputTopics": [
"topic-to-subscribe"
],
"consumerGroup": "myConsumer",
"authenticationProperties": {
"ssl": true,
"sasl": {
"mechanism": "plain",
"username": "username",
"password": "password"
}
}
}
}
Kafka - producer only - no auth - brokers as string
{
"id": "myKafkaProducer",
"type": "kafka",
"configurations": {
"brokers": "myKafkaNode1:9092,myKafkaNode2:9092",
"outputTopics": [
"topic-to-publish-on"
]
}
}
Kafka - complete configurations
{
"id": "myKafka",
"type": "kafka",
"configurations": {
"brokers": [
"myKafka:9092"
],
"inputTopics": [
"topic-to-subscribe"
],
"outputTopics": [
"topic-to-publish-on"
],
"consumerGroup": "myConsumer",
"authenticationProperties": {
"ssl": true,
"sasl": {
"mechanism": "plain",
"username": "username",
"password": "password"
}
}
}
}
Sending commands
The Flow Manager sends commands via Kafka messages using the Saga id as key, while the message payload should be in the form:
{
"messageLabel": "The label of the command",
"messagePayload": "The payload needed by the called service to execute the command"
}
Receiving events
The Flow Manager receives events via Kafka messages using the Saga id as key, while the message payload should be in the form:
{
"messageLabel": "The label of the event",
"messagePayload": "The payload of the event"
}
REST communication protocol
This channel type allows the service to exchange commands and events with external services through REST API.
The channel properties are the following:
- id (required): the id of the channel, will be used in the Machine Definitions to choose the channel to use for messages
- type (required): the type of the channel, must be rest
- configurations (required): all other rest configurations:
- protocol (required): the protocol to be used for; it must be one of the allowed values --> [http, https]
- endpoint (required): the endpoint of the service, without the final slash (/)
- method (required): the method to use to send the command; it must be one of the allowed values --> [POST, PUT, PATCH]
- path: the path to interact with the service (the default one is the root, the simple slash /)
- headers: the headers to use
- port: the port to use
The Flow Manager behavior will be the following:
- for each channel, it will create a proxy to send commands to the specified service
- regardless of the number of channels, it will expose a route to receive events form the external services
More details in the following sections.
Sending commands
The Flow Manager sends commands to the services through the following route:
{{method}} - {{protocol}}://{{endpoint}}:{{port}}{{path}}
with the following body:
{
"key": "The id of the saga",
"value": {
"messageLabel": "The label of the command",
"messagePayload": "The payload needed by the called service to execute the command"
}
}
The Flow Manager expects a successful response (code 2xx), and it does not care about the response payload.
Receiving events
To receive events, the Flow Manager exposes the route POST - /event
that accepts a body that must be structured as follows:
{
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "The id of the saga"
},
"value": {
"type": "object",
"description": "The data about the event",
"properties": {
"messageLabel": {
"type": "string",
"description": "The label of the event"
},
"messagePayload": {
"description": "The payload of the event"
}
},
"required": ["messageLabel", "messagePayload"],
"additionalProperties": false
}
},
"required": ["key", "value"],
"additionalProperties": false
}
Persistency Manager
This section contains the configurations for the Persistency Manager, that is responsible for:
- inserting/updating a saga
- retrieving a saga
The persistency manager type must be one of the supported types. Following the supported types:
- rest: will use the REST API to interact with an external persistency manager service
- mongo: will directly connect to a MongoDB instance to interact with the persistency manager collection.
- crud: will use CRUD Service REST API to persist saga information.
The main use cases of the two methods are:
rest
: allows to decouple flow manager from the underlying data store and enables post-processing of metadata before saving them persistently.mongo
: avoids the creation of a custom service, coupling the flow manager with a specific database to save data directly.crud
: avoids the creation of a custom service, coupling the flow manager with the CRUD service to save data.
The persistencyManagement section must contain a JSON object with the configurations. See the following sections for the details by type.
For more about the Persistency Manager look the dedicated documentation.
REST Persistency Manager
The REST persistency manager uses the REST protocol to contact the service for read and insert/update the sagas.
Following the properties for this type:
- type (required): the type of the persistency manager, it must be rest
- configurations (required): the other configurations of the manager
- protocol (required): the protocol to be used for; it must be one of the allowed values --> [http, https]
- endpoint (required): the endpoint of the service, without the final slash (/)
- method (required): the method to use to upsert the saga (same endpoint/path for insert and upsert)
- path: the path to interact with the service (the default one is the root, the simple slash /)
- headers: the headers to use
- port: the port to use
NB. the interactions between the Flow Manager and the Persistency Manager are the following:
insert/update: the Flow Manager calls the route
{{method}} - {{protocol}}://{{endpoint}}:{{port}}{{path}}/{{sagaId}}
with the following body:
{
"latestEvent": "The last event received by the manager",
"history": "The updated history of the saga",
"currentState": "The state of the saga (took from the *Finite state machine* configurations)",
"associatedEntityId": "The ID of the entity associated to the saga (e.g. the ID of a food delivery order, or of a policy and so on)",
"isFinal": "Boolean that indicates if the new state is a final state (DEPRECATED)",
"businessStateId": "The ID of the business state (explained later)",
"businessStateDescription": "The description of the business state (explained later)",
"metadata": "The metadata of the saga, a JSON Object with all the business stuffs related to the saga, that are unknown to the Flow Manager"
}get: the Flow Manager calls the route
GET - {{protocol}}://{{endpoint}}:{{port}}{{path}}/{{sagaId}}
and must return this body:
{
"currentState": "The state of the saga",
"metadata": "All the metadata of the saga",
"history": "The history of the saga"
}
REST Persistency Manager use case
With the following configurations:
{
"type": "rest",
"configurations": {
"protocol": "http",
"endpoint": "my-persistency-manager",
"port": 3000,
"method": "POST"
}
}
and with the sagaId EXAMPLE_123456_SAGA, the operations will be:
insert/update:
POST - http://my-persistency-manager:3000/EXAMPLE_123456_SAGA
get:
GET - http://my-persistency-manager:3000/EXAMPLE_123456_SAGA
Mongo Persistency Manager
The Mongo persistency manager directly connects to a MongoDB cluster for reading and inserting/updating the saga information.
The properties for this type, which are all required, follows:
- type: the type of the persistency manager, it must be mongo
- configurations: an object containing the other configurations of the manager
- connectionUri: the connection string of the Mongo instance (e.g.
mongodb://...
) - collectionName: the collection in which the sagas will be stored
- connectionUri: the connection string of the Mongo instance (e.g.
NB. the interactions between the Flow Manager and the Persistency Manager are the following:
at boot: the Flow Manager connects to the
MongoDB
instance using theconnectionUri
insert/update: the Flow Manager performs a
replaceOne
operation with the following content:{
"latestEvent": "The last event received by the manager",
"history": "The updated history of the saga",
"currentState": "The state of the saga (took from the *Finite state machine* configurations)",
"associatedEntityId": "The ID of the entity associated to the saga (e.g. the ID of a food delivery order, or of a policy and so on)",
"isFinal": "Boolean that indicates if the new state is a final state",
"businessStateId": "The ID of the business state (explained later)",
"businessStateDescription": "The description of the business state (explained later)",
"metadata": "The metadata of the saga, a JSON Object with all the business stuffs related to the saga, that are unknown to the Flow Manager",
"sagaId": "The generated saga identification code"
}get: the Flow Manager performs a
findOne
operation that must return this body:{
"currentState": "The state of the saga",
"metadata": "All the metadata of the saga",
"history": "The history of the saga"
}
Mongo Persistency Manager use case
With the following configurations:
{
"type": "mongo",
"configurations": {
"connectionUri": "mongodb://localhost:27017/myDB",
"collectionName": "my-collection"
}
}
and with the sagaId EXAMPLE_123456_SAGA, the operations will be:
insert/update:
collection(my-collection)
replaceOne(EXAMPLE_123456_SAGA)
get:
collection(my-collection)
findOne(EXAMPLE_123456_SAGA)
CRUD Persistency Manager
The CRUD persistency manager connects to a CRUD service instance for reading and inserting/updating the saga information.
To configure the CRUD Persitency Manager use the following properties:
- type (required): the type of the persistency manager, it must be crud
- configurations (required): an object containing the other configurations of the manager
- protocol: the protocol to be used fo (it must be one of http or https, it defaults to http)
- host: the host of the CRUD service (it defaults to crud-service)
- collectionName (required): the URL path of the collection in which the sagas will be stored
- headers: the headers to use
- port: the port to use (it defaults to 80)
Furthermore, you need to create a CRUD collection with a URL path named as collectionName from the console using this schema
Remember to create a unique index for the collection on the sagaId
field and to set the default state for new documents to PUBLIC
.
NB. the interactions between the Flow Manager and the Persistency Manager are the following:
insert/update: the Flow Manager calls the route
POST - {{protocol}}://{{host}}:{{port}}/{{collectionName}}/upsert-one?sagaId={{sagaId}}
with the following body:
{
"$set": {
"sagaId": "The generated saga identification code",
"latestEvent": "The last event received by the manager",
"history": "The updated history of the saga",
"currentState": "The state of the saga (took from the *Finite state machine* configurations)",
"associatedEntityId": "The ID of the entity associated to the saga (e.g. the ID of a food delivery order, or of a policy and so on)",
"isFinal": "Boolean that indicates if the new state is a final state (DEPRECATED)",
"businessStateId": "The ID of the business state (explained later)",
"businessStateDescription": "The description of the business state (explained later)",
"metadata": "The metadata of the saga, a JSON Object with all the business stuffs related to the saga, that are unknown to the Flow Manager"
}
}get: the Flow Manager calls the route
GET - {{protocol}}://{{host}}:{{port}}/{{collectionName}}/?sagaid={{sagaId}}
CRUD Persistency Manager use case
With the following configurations:
{
"type": "crud",
"configurations": {
"protocol": "http",
"host": "my-crud-service",
"port": 80,
"collectionName": "my-collection-path"
}
}
and with the sagaId EXAMPLE_123456_SAGA, the operations will be:
insert/update:
POST - http://my-crud-service:80/collectionName/upsert-one?sagaId=EXAMPLE_123456_SAGA
get:
GET - http://my-crud-service:80/?sagaId=EXAMPLE_123456_SAGA
Machine Definition
This is the core section of the Flow Manager configurations, because it contains the Final State Machine definition.
Following the Machine Definition configurations:
- initialState (required): the saga starting point state; when a new saga is created, the state will be the initial state
- creationEvent (required): the event who triggers the saga creation, is just a placeholder to be passed to the Persistency Manager on the saga creation (it can be useful if the Persistency Manager stores a history of the saga's events)
- states (required): an array that contains the list of the states of the Finite state machine (explained below)
- businessStates (required): an array that contains the list of the states that matters for business (explained below)
- businessEvents: an array that contains the list of the events that matter for business (explained below)
States of the machine
The states of the machine are used by the Flow Manager to know how to move forward the saga through the flow.
Each state must have the following configurations:
- id (required): a string that contains the ID of the state (usually is a camel case human readable string that tells what the state represents)
- description: the state description
- isFinal (required): a boolean that indicates if the state is a final one
- businessStateId (required): the ID of the business state (NB. must exist in the businessStates list)
- outputCommand: a JSON Object that contains the details of the command to send in output when the saga lands on the state (it is not mandatory since a state may not need to send any command):
- channel (required): the ID of the channel used to send the command (NB. must exist into the communicationProtocols configurations)
- label (required): the label of the command (usually is a imperative sentence, e.g. updateTheRequest)
- hook: a JSON Object that contains the details of the custom function to generate the payload of the command
- type (required): only
file
is accepted - encoding: accepted values are
plain
andbase64
. By default it isplain
- content (required): the custom function implementation encoded accordingly, the signature is described in the Command Hook section
- type (required): only
- sideEffects: an array containing
outputCommand
objects to execute alongside the main Command in a fire-and-forget manner - outgoingTransitions: a list of JSON Objects that contain the details of events available to be received on the current state (if a unexpected event is received, the Flow Manager will log an error and ignore it; it is mandatory because a state may not point to other states (e.g. a final one)):
- inputEvent (required): a string with the event that will trigger the transition from the current state
- targetState (required): the ID of the state to land on (NB. must exist a state with this ID)
- businessEventId: the ID of the business event (NB. must exist in the businessEvents list)
Business states of the machine
The business states of the machine are a superset of the machine that represents the states of the saga that matter for the business.
Each business state can contain one or more states of the machine (defined above).
Following the configurations of the business states:
- id (required): the ID of the business state (is usually a integer code)
- description: a full description of the business state
NB. the id and the description of the business state will be stored by the Persistency Manager.
Business events of the machine
The business events of the machine are a superset of the machine that represents the events of the saga that matter for the business.
Each business event can be associated with one or more outgoingTransitions of the machine (defined above).
Following the configurations of the business states:
- id (required): the ID of the business event (is usually a integer code)
- description: a full description of the business event
Command hook
The Command hook is a custom function to generate command payload.
It takes as input an object with two fields: the entire Saga entity and the message label of the command; the output is an object with the payload to use.
The signature is the following:
type HookFunc = ({
saga: Record<string, unknown>,
messageLabel: string,
}) => {
// Body of REST call or value of Kafka message
payload: Record<string, unknown>
}
Here is an example:
export default ({ saga, messageLabel }) => { return { payload: { sagaId: saga.sagaId, metadata: saga.metadata } }}
Settings
Here is the list of supported additional settings:
- deepMergeMetadata: a JSON Object that defines if Metadata must be deep merged or not. This operation is implemented with the Lodash mergeWith function, by default the
customizer
function is undefined- enabled: a boolean value that defines if this feature is enabled or not. By default it is
false
- hook: a JSON Object that contains the details of the custom function to be used as
customizer
- type (required): only
file
is accepted - encoding: accepted values are
plain
andbase64
. By default it isplain
- content (required): the
customizer
function implementation encoded accordingly
- type (required): only
- enabled: a boolean value that defines if this feature is enabled or not. By default it is
Configuration example
The following example configurations can be used as templates.
Kafka example
{
"communicationProtocols": [
{
"id": "mainKafka",
"type": "kafka",
"configurations": {
"brokers": [
"kafkaCloud:9092"
],
"inputTopics": [
"common-events",
"orders-events"
],
"outputTopics": [
"commands"
],
"consumerGroup": "flow-manager-consumer",
"authenticationProperties": {
"ssl": true,
"sasl": {
"mechanism": "plain",
"username": "username",
"password": "password"
}
}
}
}
],
"persistencyManagement": {
"type": "rest",
"configurations": {
"protocol": "https",
"endpoint": "saga-store",
"port": 3000,
"path": "/saga",
"method": "POST",
"headers": {
"managerId": "flowManager"
}
}
},
"machineDefinition": {
"initialState": "sagaCreated",
"creationEvent": "__created__",
"states": [
{
"id": "sagaCreated",
"description": "The saga has been created",
"isFinal": false,
"businessStateId": 0,
"outputCommand": {
"channel": "mainKafka",
"label": "redirectTheUserToThePaymentPage"
},
"outgoingTransitions": [
{
"inputEvent": "userRedirectedToThePaymentPage",
"targetState": "waitingForPayment"
},
{
"inputEvent": "redirectTheUserToThePaymentPageError",
"targetState": "sagaFailed",
"businessEventId": 0
}
]
},
{
"id": "waitingForPayment",
"description": "The user is paying",
"isFinal": false,
"businessStateId": 0,
"outgoingTransitions": [
{
"inputEvent": "paymentCompleted",
"targetState": "paymentCompleted",
"businessEventId": 1
},
{
"inputEvent": "paymentFailed",
"targetState": "paymentFailed",
"businessEventId": 1
}
]
},
{
"id": "paymentCompleted",
"description": "The payment is completed",
"isFinal": true,
"businessStateId": 1
},
{
"id": "paymentFailed",
"description": "Payment error",
"isFinal": true,
"businessStateId": 2
},
{
"id": "sagaFailed",
"description": "The saga had a fatal error",
"isFinal": true,
"businessStateId": 3
}
],
"businessStates": [
{
"id": 0,
"description": "The saga has been created"
},
{
"id": 1,
"description": "Payment received"
},
{
"id": 2,
"description": "Payment failed"
},
{
"id": 3,
"description": "Saga error"
}
],
"businessEvents": [
{
"id": 0,
"description": "Saga failure"
},
{
"id": 1,
"description": "Payment successful"
},
{
"id": 2,
"description": "Payment unsuccessful"
}
]
}
}
Rest example
{
"communicationProtocols": [
{
"id": "paymentPageRedirect",
"type": "rest",
"configurations": {
"protocol": "https",
"endpoint": "localhost",
"port": 3001,
"path": "/payment-page",
"method": "POST",
"headers": {}
}
},
{
"id": "pay",
"type": "rest",
"configurations": {
"protocol": "https",
"endpoint": "localhost",
"port": 3001,
"path": "/pay",
"method": "POST",
"headers": {}
}
}
],
"persistencyManagement": {
"type": "rest",
"configurations": {
"protocol": "https",
"endpoint": "saga-store",
"port": 3000,
"path": "/saga",
"method": "POST",
"headers": {
"managerId": "flowManager"
}
}
},
"machineDefinition": {
"initialState": "sagaCreated",
"creationEvent": "__created__",
"states": [
{
"id": "sagaCreated",
"description": "The saga has been created",
"isFinal": false,
"businessStateId": 0,
"outputCommand": {
"channel": "paymentPageRedirect",
"label": "redirectTheUserToThePaymentPage",
"hook": {
"type": "file",
"encoding": "plain",
"content": "export default ({ saga, messageLabel }) => { return { payload: { saga } }}"
}
},
"sideEffects": [
{
"channel": "paymentPageRedirect",
"label": "additionalCommand"
}
],
"outgoingTransitions": [
{
"inputEvent": "userRedirectedToThePaymentPage",
"targetState": "waitingForPayment"
},
{
"inputEvent": "redirectTheUserToThePaymentPageError",
"targetState": "sagaFailed",
"businessEventId": 0
}
]
},
{
"id": "waitingForPayment",
"description": "The user is paying",
"isFinal": false,
"businessStateId": 0,
"outputCommand": {
"channel": "pay",
"label": "pay"
},
"outgoingTransitions": [
{
"inputEvent": "paymentCompleted",
"targetState": "paymentCompleted",
"businessEventId": 1
},
{
"inputEvent": "paymentFailed",
"targetState": "paymentFailed",
"businessEventId": 1
}
]
},
{
"id": "paymentCompleted",
"description": "The payment is completed",
"isFinal": true,
"businessStateId": 1
},
{
"id": "paymentFailed",
"description": "Payment error",
"isFinal": true,
"businessStateId": 2
},
{
"id": "sagaFailed",
"description": "The saga had a fatal error",
"isFinal": true,
"businessStateId": 3
}
],
"businessStates": [
{
"id": 0,
"description": "The saga has been created"
},
{
"id": 1,
"description": "Payment received"
},
{
"id": 2,
"description": "Payment failed"
},
{
"id": 3,
"description": "Saga error"
}
],
"businessEvents": [
{
"id": 0,
"description": "Saga failure"
},
{
"id": 1,
"description": "Payment successful"
},
{
"id": 2,
"description": "Payment unsuccessful"
}
]
},
"settings": {
"deepMergeMetadata": {
"enabled": true,
"hook": {
"type": "file",
"encoding": "plain",
"content": "export default (objValue, srcValue) => { return objValue.concat(srcValue) }"
}
}
}
}
Troubleshooting
In this section there are common cases of wrong configurations.
Communication Protocol configurations
Kafka - missing topics
{
"id": "myKafkaConsumer",
"type": "kafka",
"configurations": {
"brokers": [
"myKafkaConsumer:9092"
]
}
}
The JSON above is NOT valid because both inputTopics and outputTopics are missing.
Kafka - missing consumer group
{
"id": "myKafkaConsumer",
"type": "kafka",
"configurations": {
"brokers": [
"myKafkaConsumer:9092"
],
"inputTopics": [
"inputTopic"
]
}
}
The JSON above is NOT valid because there are inputTopics but the consumerGroup property is missing.
REST - missing protocol
{
"id": "rest",
"type": "rest",
"configurations": {
"endpoint": "rest-service",
"port": 80,
"path": "/command",
"method": "POST",
"headers": {}
}
}
The JSON above is NOT valid because the protocol property inside the configuration section is missing.
REST - missing method
{
"id": "rest",
"type": "rest",
"configurations": {
"protocol": "https",
"endpoint": "rest-service",
"port": 80,
"path": "/command",
"headers": {}
}
}
The JSON above is NOT valid because the method property inside the configuration section is missing.