Configuration
In order to configure the service, a set of environment variables are adopted for describing basic service needs, alongside a main configuration file.
Environment Variables
Variable | Description | Default |
---|---|---|
LOG_LEVEL | the maximum log level to emit. Accepted levels are trace |debug |info |warn |error | info |
HTTP_PORT | the HTTP port on which kubernetes status routes and metrics are exposed | 3000 |
CONFIGURATION_FOLDER | the filepath to the folder under which configuration files are located | <HOME>/.df/kango |
OTEL_EXPORTER_OTLP_ENDPOINT | specify the OpenTelemetry OTLP endpoint where traces and metrics should be pushed. When not set, telemetry is not exported |
Currently <HOME>
value is set to /home/kango
, which is based on how the service image is built.
Configuration File
The application needs a configuration file, named config.json
, which respects
the following JSON schema configuration.
- Schema Viewer
- Raw JSON Schema
- Example
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Configuration",
"description": "Main Kango service configuration struct",
"type": "object",
"properties": {
"consumer": {
"$ref": "#/$defs/ConsumerConfig"
},
"controlPlane": {
"anyOf": [
{
"$ref": "#/$defs/ControlPlaneConfiguration"
},
{
"type": "null"
}
]
},
"persistence": {
"$ref": "#/$defs/PersistenceConfiguration"
},
"settings": {
"$ref": "#/$defs/Settings",
"default": {
"writeMode": "strict"
}
}
},
"examples": [
{
"consumer": {
"config": {
"bootstrap.servers": "localhost:9092",
"client.id": "kango",
"group.id": "kango"
},
"topic": "test-topic-1",
"type": "kafka"
},
"controlPlane": {
"grpcAddress": "http://localhost:50052"
},
"persistence": {
"collection": "test-collection-1",
"database": "test",
"url": "mongodb://localhost:27017/test?replicaSet=local"
}
}
],
"required": [
"persistence",
"consumer"
],
"$defs": {
"ConsumerConfig": {
"description": "Describes the configuration of streaming layer, that is Kafka, to be adopted by the service",
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "kafka"
}
},
"$ref": "#/$defs/KafkaConsumerConfig",
"required": [
"type"
]
}
]
},
"ControlPlaneConfiguration": {
"description": "Describes the service configuration to enable the interoperation with Control Plane",
"type": "object",
"properties": {
"feedbackInterval": {
"description": "Interval in milliseconds that must elapse between two feedback events sent to Control Plane Operator.\nIt defaults to `3000` ms when not provided during deserialization.",
"type": "integer",
"format": "uint64",
"default": 3000,
"minimum": 0
},
"grpcAddress": {
"description": "Address to the gRPC server that should receive service feedback events",
"type": "string",
"examples": [
"http://control-plane-operator:50052"
]
},
"resumeAfterMs": {
"description": "The number of milliseconds to wait before running the processing logic\nwhen connection with control plane operator failed\nand no desired fast data state was ever received.",
"type": [
"integer",
"null"
],
"format": "uint64",
"default": null,
"minimum": 0
}
},
"required": [
"grpcAddress"
]
},
"KafkaConsumerConfig": {
"type": "object",
"properties": {
"commitIntervalMs": {
"description": "number of milliseconds between one commit and another",
"type": "integer",
"format": "uint64",
"default": 500,
"minimum": 0
},
"config": {
"description": "librdkafka Kafka consumer configuration properties | https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md",
"type": "object",
"additionalProperties": {
"$ref": "#/$defs/Secret"
}
},
"topic": {
"description": "name of the Kafka topic from which the consumer will read messages",
"type": "string"
}
},
"required": [
"config",
"topic"
]
},
"PersistenceConfiguration": {
"description": "Describes the configuration of persistence layer, that is MongoDB, to be adopted by the service",
"type": "object",
"properties": {
"appName": {
"description": "The application name employed by MongoDB driver when performing queries.\nThis is useful for debugging purposes, such as recognizing which application\nis launching a query towards the database.",
"type": [
"string",
"null"
],
"default": null
},
"collection": {
"description": "MongoDB collection where events will be written to",
"type": "string"
},
"database": {
"description": "MongoDB database where events will be written to",
"type": "string"
},
"maxBatchSize": {
"description": "Number of change events that should be buffered before\noffloading them in bulk onto the persistence layer",
"type": "integer",
"format": "uint",
"default": 1000,
"minimum": 0
},
"url": {
"description": "MongoDB connection string",
"$ref": "#/$defs/Secret"
},
"writeIntervalMs": {
"description": "Number of milliseconds that should elapse between two writes\non persistence layer when the maximum batch size is not reached\nwithin the interval",
"type": "integer",
"format": "uint64",
"default": 1000,
"minimum": 0
}
},
"required": [
"url",
"database",
"collection"
]
},
"Secret": {
"anyOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"encoding": {
"description": "Define which type of encoding the library supports when it needs to read the actual secret value.",
"type": "string",
"enum": [
"base64"
]
},
"key": {
"type": "string"
},
"type": {
"const": "env"
}
},
"required": [
"type",
"key"
]
},
{
"type": "object",
"properties": {
"encoding": {
"description": "Define which type of encoding the library supports when it needs to read the actual secret value.",
"type": "string",
"enum": [
"base64"
]
},
"key": {
"type": "string"
},
"path": {
"type": "string"
},
"type": {
"const": "file"
}
},
"required": [
"type",
"path"
]
}
],
"examples": [
"my-secret",
{
"key": "CUSTOM_ENV_VAR",
"type": "env"
},
{
"encoding": "base64",
"key": "CUSTOM_ENV_VAR",
"type": "env"
},
{
"path": "/path/to/file",
"type": "file"
}
]
},
"Settings": {
"description": "Behavior settings",
"type": "object",
"properties": {
"writeMode": {
"$ref": "#/$defs/WriteMode"
}
},
"required": [
"writeMode"
]
},
"WriteMode": {
"oneOf": [
{
"description": "_Insert_ and _update_ operations ensure that fields not found in the `after` payload are\nremoved from the potentially existing document on the persistence layer.\n\n_Inserts_ are executed as replace or insert operations, while _updates_ remove\nthe fields at first level that are not present in the `after` payload,\nbut are found in the `before` payload (use `\"$unset\"` clause).",
"type": "string",
"const": "strict"
},
{
"description": "_Insert_ operations are treated as upserts, while _Update_ ones do not add the `\"$unset\"` clause.\nThus fields not mentioned in the after payload will be left untouched.",
"type": "string",
"const": "partial"
}
]
}
}
}
{
"$schema": "https://docs.mia-platform.eu/schemas/fast_data/kango.0.5.1.schema.json",
"persistence": {
"url": {
"type": "file",
"path": "/run/secrets/mongodb/url"
},
"database": "test",
"collection": "fd_kango_output"
},
"consumer": {
"type": "kafka",
"topic": "fd.kango-topic.input",
"config": {
"bootstrap.servers": {
"type": "file",
"path": "/run/secrets/kafka/bootstrap.servers"
},
"group.id": "fd.kango",
"client.id": "fd.kango.consumer"
}
}
}
The raw JSON schema can also be found here.
In addition, Kafka configurations and MongoDB persistence properties support secret resolution.
Control Plane Support
The service implements the interface for connecting towards a Control-Plane Operator.
However, complete Runtime Management support, that is with Control Plane UI and
a central Control Plane instance will be added in the future.
Recommended Kafka Configuration
When configuring Kafka consumer, it is advised to set appropriate values for constraining the consumer internal queue. In this manner:
- the maximum amount of memory employed by the service can be finely tuned to avoid wasting resources, since only the number of messages that can effectively be processed in real-time should be pulled in memory;
- it is ensured that consumer continuously poll the broker to avoid it exiting the consumer group, since a lower number of buffered messages can trigger a new fetch to replenish it;
The main values to tune are:
queued.max.messages.kbytes
: maximum number of kilobytes of queued pre-fetched messages in the local consumer queue;queued.min.messages
: minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue;
It is recommended to set queued.min.messages
to a value greater, but close to the
average message consumption rate. It is possible to observer:
kafka_consumer_rx_msgs_total
→ messages readka_flushed_messages
→ messages written to the persistence layer
to check the average values.
For Kango service, when connected to a MongoDB M50 cluster instance, an example of configuration can be the following one:
{
"queued.max.messages.kbytes": "32840",
"queued.min.messages": "5000"
}
Another important property that might need to be tuned is fetch.message.max.bytes
,
which however should be changed only in case queued.max.messages.kbytes
is set to
a value lower than 1024
.
Kubernetes
Resources
When the plugin is deployed on Kubernetes, it is advised to set its resources' requests and limits. Here are provided which are the recommended ones, although they can be changed according to your needs:
Recommended
- requests:
CPU: 25m
Memory: 30MB - limits:
CPU: 300m
Memory: 90MB
Status Probes
The service exposes the liveness
and readines
status probes as HTTP endpoint, which
helps Kubernetes when the service is successfully started and when it may need to be restarted.
The endpoints are:
liveness
probe:/-/healthz
readiness
probe:/-/ready