Vai al contenuto

Configuration

The Flow Manager needs some environment variables, and a configurations file to work.

Moreover, it requires a service to interact with to get and store saga information during the flow.

Following the explanation of the service configurations, or:

Environment variables

Most configurations (almost all actually) are taken from the configurations file; for this the Flow Manager needs two environment variables only, or:

  • SAGA_ID_PREFIX: the prefix to be prepended to the ID of the Saga generated by the service
  • CONFIGURATIONS_FILE_PATH: the file path of the service configurations file

Configurations file

The configurations file required by the Flow Manager is in the JSON format and is divided in three sections:

  • communicationProtocols: the section dedicated to the Communication protocols used to communicate with other services
  • persistencyManagement: the section that contains the configurations of the Persistency manager used to get and update the saga
  • machineDefinition: the core section, with the Finite state machine configurations, used by the Flow Manager to drive the saga through the flow

The configurations file is validated by the Flow Manager, pay attention to the schema rules, or the service will not be deployed.

Following the details of each section.

Communication Protocols

It contains a list of JSON Objects with the configurations of the communication protocol, that will be called channels from now.

NB. each channel could have a custom configurations schema.

Each channel type must be one of the supported types. Following the supported channel types:

  • kafka: the channel that will be connected to an Apache Kafka server and will allow the service to publish/consume messages (commands/events)
  • REST: the channel that will allow the service to send commands and receive events through REST calls

Kafka communication protocol

This channel allows the service to connect to a kafka server and consume/produce messages on it.

The channels are the following (the bold are the required ones, the italic are the mandatory ones):

  • id: the id of the channel, will be used in the Machine Definitions to choose the channel to use for messages
  • type: the type of the channel, must be kafka
  • configurations: all other kafka configurations:
    • brokers: a list of strings that represent the kafka brokers
    • inputTopics: a list of strings that contains the topics to subscribe to consume messages; NB. this property is required if the property outputTopics is missing
    • outputTopics: a list of strings that contains the topics to send messages; NB. this property is required if the property inputTopics is missing
    • consumerGroup: the consumer group id, will be used by the Kafka Consumer to consume messages; NB. this property is required if there's the inputTopics one
    • authenticationProperties: a JSON Object with the kafka authentication properties. It must be KafkaJs compliant

The Flow Manager behavior will be the following:

  • for each channel with the property outputTopics valorized, it will create a Kafka Producer to send messages on, when the channel is the choosen one to provide a message
  • for each channel with the property inputTopics valorized, it will create a Kafka Consumer to subscribe and will start listen to it

Following some examples with explanation.

Kafka valid channel - consumer only - ssl auth

{
  "id": "myKafkaConsumer",
  "type": "kafka",
  "configurations": {
    "brokers": [
      "http://myKafkaConsumer:9092"
    ],
    "inputTopics": [
      "topic-to-subscribe"
    ],
    "consumerGroup": "myConsumer",
    "authenticationProperties": {
      "ssl": true,
      "sasl": {
        "mechanism": "plain",
        "username": "username",
        "password": "password"
      }
    }
  }
}

Kafka valid channel - producer only - no auth

{
  "id": "myKafkaProducer",
  "type": "kafka",
  "configurations": {
    "brokers": [
      "http://myKafkaProducer:9092"
    ],
    "outputTopics": [
      "topic-to-publish-on"
    ]
  }
}

Kafka invalid channel - invalid configurations

{
  "id": "myKafkaConsumer",
  "type": "kafka",
  "configurations": {
    "brokers": [
      "http://myKafkaConsumer:9092"
    ]
  }
}

The JSON above is NOT valid because both inputTopics and outputTopics are missing.

Kafka valid channel - complete configurations

{
  "id": "myKafka",
  "type": "kafka",
  "configurations": {
    "brokers": [
      "http://myKafka:9092"
    ],
    "inputTopics": [
      "topic-to-subscribe"
    ],
    "outputTopics": [
      "topic-to-publish-on"
    ],
    "consumerGroup": "myConsumer",
    "authenticationProperties": {
      "ssl": true,
      "sasl": {
        "mechanism": "plain",
        "username": "username",
        "password": "password"
      }
    }
  }
}

REST communication protocol

This channel allows the service to exchange commands and events with external services through REST calls.

The channels are the following (the bold are the required ones, the italic are the optional ones):

  • id: the id of the channel, will be used in the Machine Definitions to choose the channel to use for messages
  • type: the type of the channel, must be rest
  • configurations: all other rest configurations:
    • protocol: the protocol to be used for; it must be one of the allowed values → [http, https]
    • endpoint: the endpoint of the service, without the final slash (/)
    • method: the method to use to send the command; it must be one of the allowed values → [POST, PUT, PATCH]
    • path: the path to interact with the service (the default one is the root, the simple slash /)
    • headers: the headers to use
    • port: the port to use

The Flow Manager behavior will be the following:

  • for each channel, it will create a proxy to send commands to the specified service
  • regardless of the number of channels, it will expose a route to receive events form the external services

More details in the following sections.

Sending commands

The Flow Manager sends commands to the services through the following route:

{{method}} - {{protocol}}://{{endpoint}}:{{port}}{{path}}

with the following body:

{
    "key": "The id of the saga",
    "value": {
      "messageLabel": "The label of the command",
      "messagePayload": "The payload needed by the called service to execute the command"
    }
}

The Flow Manager expects a successful response (code 2xx), and it does not care about the response payload.

Receiving events

To receive events, the Flow Manager exposes the route

POST - /event

that accepts the following body:

{
    "type": "object",
    "properties": {
      "key": {
        "type": "string",
        "description": "The id of the saga"
      },
      "value": {
        "type": "object",
        "description": "The data about the event",
        "properties": {
          "messageLabel": {
            "type": "string",
            "description": "The label of the event"
          },
          "messagePayload": {
            "description": "The payload of the event"
          }
        },
        "required": ["messageLabel", "messagePayload"],
        "additionalProperties": false
      }
    },
    "required": ["key", "value"],
    "additionalProperties": false
  }

Persistency Manager

This section contains the configurations for the Persistency Manager, that is responsible for:

  • the insert/upload of a saga
  • the retrieval of a saga

The persistency manager type must be one of the supported types. Following the supported types:

  • rest: the only supported channel right now, that will use the REST protocol to interact with the persistency manager service

The persistencyManagement section must contain a JSON object with the configurations. See the following sections for the details by type.

For more about the Persistency Manager look the dedicated documentation.

REST Persistency Manager

The REST peristency manager uses the REST protocol to contact the service for read and insert/update the sagas.

Following the properties for this type (the bold are the required one, the italic are the mandatory one):

  • type: the type of the persistency manager, it must be rest
  • configurations: the other configurations of the manager
    • protocol: the protocol to be used for; it must be one of the allowed values → [http, https]
    • endpoint: the endpoint of the service, without the final slash (/)
    • method: the method to use to upsert the saga (same endpoint/path for insert and upsert)
    • path: the path to interact with the service (the default one is the root, the simple slash /)
    • headers: the headers to use
    • port: the port to use

NB. the interactions between the Flow Manager and the Persistency Manager are the following:

  • insert/update: the Flow Manager calls the route

{{method}} - {{protocol}}://{{endpoint}}:{{port}}{{path}}/{{sagaId}}

with the following body:

{
  "latestEvent": "The last event received by the manager",
  "currentState": "The state of the saga (took from the *Finite state machine* configurations)",
  "associatedEntityId": "The ID of the entity associated to the saga (e.g. the ID of a food delivery order, or of a policy and so on)",
  "isFinal": "Boolean that indicates if the new state is a final state (DEPRECATED)",
  "businessStateId": "The ID of the business state (explained later)",
  "businessStateDescription": "The description of the business state (explained later)",
  "metadata": "The metadata of the saga, a JSON Object with all the business stuffs related to the saga, that are unknown to the Flow Manager"
}
  • get: the Flow Manager calls the route

GET - {{protocol}}://{{endpoint}}:{{port}}{{path}}/{{sagaId}}

and must return this body:

{
  "currentState": "The state of the saga",
  "metadata": "All the metadata of the saga"
}
REST Persistency Manager use case

With the following configurations:

{
  "type": "rest",
  "configurations": {
    "protocol": "http",
    "endpoint": "my-persistency-manager",
    "port": 3000,
    "method": "POST"
  }
}

and with the sagaId EXAMPLE_123456_SAGA, the operations will be:

  • insert/update:

    POST - http://my-persistency-manager:3000/EXAMPLE_123456_SAGA

  • get:

    GET - http://my-persistency-manager:3000/EXAMPLE_123456_SAGA

Machine Definition

This is the core section of the Flow Manager configurations, because it contains the Final State Machine definition.

Following the Machine Definition configurations (the bold are the required one, the italic are the mandatory one):

  • initialState: the saga starting point state; when a new saga is created, the state will be the initial state
  • creationEvent: the event who triggers the saga creation, is just a placeholder to be passed to the Persistency Manager on the saga creation (it can be useful if the Persistency Manager stores a history of the saga's events)
  • states: an array that contains the list of the states of the Finite state machine (explained below)
  • businessStates: an array that contains the list of the states that matters for business (explained below)

States of the machine

The states of the machine are used by the Flow Manager to know how to move forward the saga through the flow.

Each state must have the following configurations (the bold are the required one, the italic are the mandatory one):

  • id: a string that contains the ID of the state (usually is a camel case human readable string that tells what the state represents)
  • description: the state description
  • isFinal: a boolean that indicates if the state is a final one (DEPRECATED)
  • businessStateId: the ID of the business state (NB. must exist in the businessStates list)
  • outputCommand: a JSON Object that contains the details of the command to send in output when the saga lands on the state(it is mandatory because a state may not need to send any command):
    • channel: the ID of the channel used to send the command (NB. must exist into the communicationProtocols configurations)
    • label: the label of the command (usually is a imperative sentence, e.g. updateTheRequest)
  • outgoingTransitions: a list of JSON Objects that contain the details of events available to be received on the current state (if a unexpected event is received, the Flow Manager will log an error and ignore it; it is mandatory because a state may not point to other states (e.g. a final one)):
    • inputEvent: a string with the event that will trigger the transition from the current state
    • targetState: the ID of the state to land on (NB. must exist a state with this ID);

Business states of the machine

The business states of the machine are a superset of the machine that represents the states of the saga that matter for the business.

Each business state can contain one or more states of the machine (defined above).

Following the configurations of the business states (the bold are the required one, the italic are the mandatory one):

  • id: the ID of the business state (is usually a integer code)
  • description: a full description of the business state

NB. the id and the description of the business state will be stored by the Persistency Manager.

Configuration example

The following example configurations can be used as templates.

Kafka example

{
  "communicationProtocols": [
    {
      "id": "mainKafka",
      "type": "kafka",
      "configurations": {
        "brokers": [
          "https://kafkaCloud:9092"
        ],
        "inputTopics": [
          "common-events",
          "orders-events"
        ],
        "outputTopics": [
          "commands"
        ],
        "consumerGroup": "flow-manager-consumer",
        "authenticationProperties": {
          "ssl": true,
          "sasl": {
            "mechanism": "plain",
            "username": "username",
            "password": "password"
          }
        }
      }
    }
  ],
  "persistencyManagement": {
    "type": "rest",
    "configurations": {
      "protocol": "https",
      "endpoint": "saga-store",
      "port": 3000,
      "path": "/saga",
      "method": "POST",
      "headers": {
        "managerId": "flowManager"
      }
    }
  },
  "machineDefinition": {
    "initialState": "sagaCreated",
    "creationEvent": "__created__",
    "states": [
      {
        "id": "sagaCreated",
        "description": "The saga has been created",
        "isFinal": false,
        "businessStateId": 0,
        "outputCommand": {
          "channel": "mainKafka",
          "label": "redirectTheUserToThePaymentPage"
        },
        "outgoingTransitions": [
          {
            "inputEvent": "userRedirectedToThePaymentPage",
            "targetState": "waitingForPayment"
          },
          {
            "inputEvent": "redirectTheUserToThePaymentPageError",
            "targetState": "sagaFailed"
          }
        ]
      },
      {
        "id": "waitingForPayment",
        "description": "The user is paying",
        "isFinal": false,
        "businessStateId": 0,
        "outgoingTransitions": [
          {
            "inputEvent": "paymentCompleted",
            "targetState": "paymentCompleted"
          },
          {
            "inputEvent": "paymentFailed",
            "targetState": "paymentFailed"
          }
        ]
      },
      {
        "id": "paymentCompleted",
        "description": "The payment is completed",
        "isFinal": true,
        "businessStateId": 1
      },
      {
        "id": "paymentFailed",
        "description": "Payment error",
        "isFinal": true,
        "businessStateId": 2
      },
      {
        "id": "sagaFaled",
        "description": "The saga had a fatal error",
        "isFinal": true,
        "businessStateId": 3
      }
    ],
    "businessStates": [
      {
        "id": 0,
        "description": "The saga has been created"
      },
      {
        "id": 1,
        "description": "Payment received"
      },
      {
        "id": 2,
        "description": "Payment failed"
      },
      {
        "id": 3,
        "description": "Saga error"
      }
    ]
  }
}

Rest example

{
  "communicationProtocols": [
    {
      "id": "paymentPageRedirect",
      "type": "rest",
      "configurations": {
        "protocol": "https",
        "endpoint": "localhost",
        "port": 3001,
        "path": "/payment-page",
        "method": "POST",
        "headers": {}
      }
    },
    {
      "id": "pay",
      "type": "rest",
      "configurations": {
        "protocol": "https",
        "endpoint": "localhost",
        "port": 3001,
        "path": "/pay",
        "method": "POST",
        "headers": {}
      }
    }
  ],
  "persistencyManagement": {
    "type": "rest",
    "configurations": {
      "protocol": "https",
      "endpoint": "saga-store",
      "port": 3000,
      "path": "/saga",
      "method": "POST",
      "headers": {
        "managerId": "flowManager"
      }
    }
  },
  "machineDefinition": {
    "initialState": "sagaCreated",
    "creationEvent": "__created__",
    "states": [
      {
        "id": "sagaCreated",
        "description": "The saga has been created",
        "isFinal": false,
        "businessStateId": 0,
        "outputCommand": {
          "channel": "paymentPageRedirect",
          "label": "redirectTheUserToThePaymentPage"
        },
        "outgoingTransitions": [
          {
            "inputEvent": "userRedirectedToThePaymentPage",
            "targetState": "waitingForPayment"
          },
          {
            "inputEvent": "redirectTheUserToThePaymentPageError",
            "targetState": "sagaFailed"
          }
        ]
      },
      {
        "id": "waitingForPayment",
        "description": "The user is paying",
        "isFinal": false,
        "businessStateId": 0,
        "outputCommand": {
          "channel": "pay",
          "label": "pay"
        },
        "outgoingTransitions": [
          {
            "inputEvent": "paymentCompleted",
            "targetState": "paymentCompleted"
          },
          {
            "inputEvent": "paymentFailed",
            "targetState": "paymentFailed"
          }
        ]
      },
      {
        "id": "paymentCompleted",
        "description": "The payment is completed",
        "isFinal": true,
        "businessStateId": 1
      },
      {
        "id": "paymentFailed",
        "description": "Payment error",
        "isFinal": true,
        "businessStateId": 2
      },
      {
        "id": "sagaFaled",
        "description": "The saga had a fatal error",
        "isFinal": true,
        "businessStateId": 3
      }
    ],
    "businessStates": [
      {
        "id": 0,
        "description": "The saga has been created"
      },
      {
        "id": 1,
        "description": "Payment received"
      },
      {
        "id": 2,
        "description": "Payment failed"
      },
      {
        "id": 3,
        "description": "Saga error"
      }
    ]
  }
}