Configuration
In order to configure the service, a set of environment variables are adopted for describing basic service needs, alongside a main configuration file.
Environment Variables
| Variable | Description | Default |
|---|---|---|
| LOG_LEVEL | the maximum log level to emit. Accepted levels are trace|debug|info|warn|error | info |
| HTTP_PORT | the HTTP port on which kubernetes status routes and metrics are exposed | 3000 |
| CONFIGURATION_FOLDER | the filepath to the folder under which configuration file is located | <HOME>/.df/mongezium |
| OTEL_EXPORTER_OTLP_ENDPOINT | specify the OpenTelemetry OTLP endpoint where traces and metrics should be pushed. When not set, telemetry is not exported |
Currently <HOME> value is set to /home/mongezium, which is based on how the service image is built.
Configuration File
The application needs a configuration file, named config.json, which respects
the following JSON schema configuration.
- Schema Viewer
- Raw JSON Schema
- Example
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MongeziumConfiguration",
"type": "object",
"properties": {
"collections": {
"description": "Describe which collection should be monitored for changes",
"type": "array",
"items": {
"$ref": "#/definitions/CollectionConfiguration"
}
},
"persistence": {
"description": "Describe how to connect to the persistence layer, that is a MongoDB cluster",
"allOf": [
{
"$ref": "#/definitions/PersistenceConfiguration"
}
]
},
"stream": {
"description": "Describe how to connect to the stream layer, that is a Kafka cluster",
"allOf": [
{
"$ref": "#/definitions/StreamConfiguration"
}
]
},
"connections": {
"description": "Includes the connection configurations to be used by stream and persistence layers.",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/ConnectionConfig"
},
"default": {}
}
},
"required": [
"collections",
"persistence",
"stream"
],
"definitions": {
"CollectionConfiguration": {
"description": "Contains the properties that describe which MongoDB collection should be monitored\nand how its changes should be processed.",
"type": "object",
"properties": {
"topic": {
"description": "Name of the Kafka topic onto which change events related to this collection will be produced",
"type": "string"
},
"namespace": {
"description": "MongoDB namespace representing a collection within a database. It is expected in the form `<database-name>.<collection-name>`.",
"type": "string"
},
"snapshot": {
"description": "Describe how the service should address the _snapshot_ operation, that is the whole collection content\nis loaded onto the Kafka topic before producing events related to changes occurring to the collection records.\n\n_Snapshot_ operations are usually launched the first time the service starts monitoring a collection,\nwhen the business requires, such as when a new clean state is needed onto the Kafka topic.\n\nBy default, _snapshot_ operation is carried only when no message has\never been written to the selected topic for this collection.",
"default": "initial",
"allOf": [
{
"$ref": "#/definitions/SnapshotMode"
}
]
},
"tombstone": {
"description": "Specifies whether for each DELETE event also a _Tombstone_ event is generated.\nA _Tombstone_ event is a Kafka message with a non-null key and a null payload. This\ntype of event enables deleting old messages not needed anymore when using the Kafka\n[log compaction](https://docs.confluent.io/kafka/design/log_compaction.html#compaction-enables-deletes) policy.",
"type": "boolean",
"default": false
}
},
"required": [
"topic",
"namespace"
]
},
"SnapshotMode": {
"description": "Describe when the snapshot operation should be carried out.",
"oneOf": [
{
"description": "Execute snapshot operation when no message\nhas ever been produced to the topic or when no Change Stream resume token is found",
"type": "string",
"const": "when_needed"
},
{
"description": "Execute snapshot operation only when no message\nhas ever been produced to the topic",
"type": "string",
"const": "initial"
}
]
},
"PersistenceConfiguration": {
"description": "Configuration of the persistence layer, that is MongoDB connection string.",
"type": "object",
"properties": {
"url": {
"description": "MongoDB connection string",
"anyOf": [
{
"$ref": "#/definitions/Secret"
},
{
"type": "null"
}
]
},
"database": {
"description": "MongoDB database where events will be written to. To be included if not specified in the url.",
"type": [
"string",
"null"
]
},
"connectionName": {
"description": "Reference to the MongoDB configuration stored inside the Connection (`MongodbConnectionConfig`)",
"type": [
"string",
"null"
]
},
"appName": {
"description": "Client application name, which is used to identify the application in MongoDB logs.",
"type": [
"string",
"null"
]
}
}
},
"Secret": {
"examples": [
"my-secret",
{
"type": "env",
"key": "CUSTOM_ENV_VAR"
},
{
"type": "env",
"key": "CUSTOM_ENV_VAR",
"encoding": "base64"
},
{
"type": "file",
"path": "/path/to/file"
}
],
"anyOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"type": {
"const": "env"
},
"key": {
"type": "string"
},
"encoding": {
"description": "Define which type of encoding the library supports when it needs to read the actual secret value.",
"type": "string",
"enum": [
"base64"
]
}
},
"required": [
"type",
"key"
]
},
{
"type": "object",
"properties": {
"type": {
"const": "file"
},
"key": {
"type": "string"
},
"path": {
"type": "string"
},
"encoding": {
"description": "Define which type of encoding the library supports when it needs to read the actual secret value.",
"type": "string",
"enum": [
"base64"
]
}
},
"required": [
"type",
"path"
]
}
]
},
"StreamConfiguration": {
"description": "Configuration of the stream layer, that is Kafka clients.",
"type": "object",
"properties": {
"consumer": {
"description": "Configurations related to the Kafka Consumer employed to retrieve\nthe latest state produced by the service alongside previous change events.\n\nIt must include either the `connectionName` property or the `bootstrap.servers` property. If included,\nthe property MUST have the value of the corresponding property in the `producer` configuration.",
"allOf": [
{
"$ref": "#/definitions/KafkaConfiguration"
}
]
},
"producer": {
"description": "Configurations related to the Kafka Producer for sending to Kafka topics\nchange events occurring on the persistence layer.\n\nIt must include either the `connectionName` property or the `bootstrap.servers` property. If included,\nthe property MUST have the value of the corresponding property in the consumer` configuration.",
"allOf": [
{
"$ref": "#/definitions/KafkaConfiguration"
}
]
}
},
"required": [
"consumer",
"producer"
]
},
"KafkaConfiguration": {
"description": "librdkakfa client configuration, as specified by library documentation: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html",
"type": "object",
"properties": {
"connectionName": {
"description": "Reference to the Kafka configuration stored inside the Connection (`KafkaConnectionConfig`)",
"type": [
"string",
"null"
]
}
},
"additional_properties": {
"$ref": "#/definitions/Secret"
},
"required": [
"bootstrap.servers"
]
},
"ConnectionConfig": {
"description": "Describes the possible type of connections that can be defined in the configuration file",
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "kafka"
},
"config": {
"$ref": "#/definitions/KafkaConnectionConfig"
}
},
"required": [
"type",
"config"
]
},
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "mongodb"
},
"config": {
"$ref": "#/definitions/MongodbConnectionConfig"
}
},
"required": [
"type",
"config"
]
}
]
},
"KafkaConnectionConfig": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Secret"
}
},
"MongodbConnectionConfig": {
"type": "object",
"properties": {
"url": {
"description": "MongoDB connection string",
"allOf": [
{
"$ref": "#/definitions/Secret"
}
]
},
"database": {
"description": "MongoDB database where events will be written to. To be included if not specified in the url.",
"type": [
"string",
"null"
]
},
"appName": {
"description": "The application name employed by MongoDB driver when performing queries.\nThis is useful for debugging purposes, such as recognizing which application\nis launching a query towards the database.",
"type": [
"string",
"null"
],
"default": null
}
},
"required": [
"url"
]
}
}
}
{
"$schema": "https://docs.mia-platform.eu/schemas/fast_data/mongezium.0.4.3.schema.json",
"connections": {
"mongo-connection": {
"type": "mongo",
"url": {
"type": "file",
"path": "/run/secrets/mongodb/url"
}
},
"kafka-connection": {
"type": "kafka",
"bootstrap.servers": {
"type": "file",
"path": "/run/secrets/kafka/bootstrap.servers"
}
}
},
"persistence": {
"connectionName": "mongo-connection"
},
"collections": [
{
"topic": "fd.topic-1",
"snapshot": "when_needed",
"namespace": "fd.collection-1"
},
{
"topic": "fd.topic-2",
"snapshot": "initial",
"namespace": "fd.collection-2"
},
{
"topic": "fd.topic-3",
"snapshot": "when_needed",
"namespace": "fd.collection-3"
}
],
"stream": {
"consumer": {
"connectionName": "kafka-connection",
"group.id": "fd.mongezium"
},
"producer": {
"connectionName": "kafka-connection",
"message.timeout.ms": "5000",
"compression.type": "snappy"
}
}
}
Starting from version v0.4.3, the Mongezium configuration file supports a connections property.
This property allows you to create references to Kafka and MongoDB connections, which can then be reused within the stream and persistence properties, respectively.
This feature, already present in other Fast Data services like Stream Processor,
Farm Data, and Kango,
simplifies the configuration file by eliminating the need to repeat connection details.
Although you can currently define Kafka or MongoDB connections either within the connections property or directly in their respective configuration sections, please be aware that support for the latter is deprecated.
Future versions of Mongezium will exclusively support connection definitions inside the connections property. Therefore, it is highly recommended to adopt this approach as soon as you upgrade to Mongezium v0.4.3 or a later version.
In addition, Kafka configurations and MongoDB persistence properties support secret resolution.
The raw JSON schema can also be found here.
Kubernetes
Resources
When the plugin is deployed on Kubernetes, it is advised to set its resources requests and limits. Here are provided which are the recommended ones, although they can be changed according to your needs:
Recommended
- requests:
CPU: 250m
Memory: 50MB - limits:
CPU: 1000m
Memory: 150MB
Status Probes
The service exposes the liveness and readines status probes as HTTP endpoint, which
helps Kubernetes when the service is successfully started and when it may need to be restarted.
The endpoints are:
livenessprobe:/-/healthzreadinessprobe:/-/ready