Configuration
Environment Variables
Variable | Description | Default |
---|---|---|
LOG_LEVEL | the maximum log level to emit. Accepted levels are trace |debug |info |warn |error | info |
CONFIGURATION_FOLDER | the filepath to the folder under which configuration file is located | <HOME>/.df/aggregation |
FARM_DATA_ALGORITHM | which type of aggregation algorithm should be employed. Values can be default (incremental aggregation) and full (aggregation only on the HEAD ) | default |
OTEL_EXPORTER_OTLP_ENDPOINT | specify the OpenTelemetry OTLP endpoint where traces and metrics should be pushed. When not set telemetry is not exported |
Currently <HOME>
value is set to /home/farm_data
, which is based on how the service image is built.
Configuration File
The Farm Data service configuration is stored in config.json
file, and
it specifies various aspects of the system, including connections, consumers,
producers, and data processing.
Its content respects the JSON schema specification provided below.
In the next sections, its properties description are further explained and clarified.
- Schema Viewer
- Raw JSON Schema
- Example
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Configuration",
"type": "object",
"properties": {
"id": {
"type": "string",
"minLength": 8,
"maxLength": 16,
"pattern": "^[A-Za-z][A-Za-z0-9_]*$"
},
"consumers": {
"$ref": "#/definitions/ConsumersConfig"
},
"producer": {
"$ref": "#/definitions/ProducerConfig"
},
"connections": {
"description": "map of connection names to connection configurations",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/ConnectionConfig"
}
},
"processor": {
"$ref": "#/definitions/AggregationConfig"
},
"controlPlane": {
"anyOf": [
{
"$ref": "#/definitions/ControlPlaneConfig"
},
{
"type": "null"
}
]
},
"server": {
"default": {
"ip": "0.0.0.0",
"port": 3000,
"apiPrefix": "/"
},
"allOf": [
{
"$ref": "#/definitions/ServerSettings"
}
]
}
},
"required": [
"id",
"consumers",
"producer",
"connections",
"processor"
],
"definitions": {
"ConsumersConfig": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "kafka"
},
"config": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/KafkaConsumerConfig"
}
}
},
"required": [
"type",
"config"
]
}
]
},
"KafkaConsumerConfig": {
"type": "object",
"properties": {
"topic": {
"description": "name of the Kafka topic from which the consumer will poll",
"type": "string"
},
"connectionName": {
"description": "connection name to use from the connections map (must be type kafka)",
"type": "string"
},
"commitIntervalMs": {
"description": "number of milliseconds between one commit and another",
"type": "integer",
"format": "uint64",
"minimum": 0,
"default": 500
}
},
"required": [
"topic",
"connectionName"
],
"additionalProperties": {
"description": "librdkafka consumer properties | https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md",
"$ref": "#/definitions/Secret"
}
},
"Secret": {
"examples": [
"my-secret",
{
"type": "env",
"key": "CUSTOM_ENV_VAR"
},
{
"type": "env",
"key": "CUSTOM_ENV_VAR",
"encoding": "base64"
},
{
"type": "file",
"path": "/path/to/file"
}
],
"anyOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"type": {
"const": "env"
},
"key": {
"type": "string"
},
"encoding": {
"description": "Define which type of encoding the library supports when it needs to read the actual secret value.",
"type": "string",
"enum": [
"base64"
]
}
},
"required": [
"type",
"key"
]
},
{
"type": "object",
"properties": {
"type": {
"const": "file"
},
"key": {
"type": "string"
},
"path": {
"type": "string"
},
"encoding": {
"description": "Define which type of encoding the library supports when it needs to read the actual secret value.",
"type": "string",
"enum": [
"base64"
]
}
},
"required": [
"type",
"path"
]
}
]
},
"ProducerConfig": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "kafka"
},
"config": {
"description": "librdkafka producer config | https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md",
"$ref": "#/definitions/KafkaProducerConfig"
}
},
"required": [
"type",
"config"
]
}
]
},
"KafkaProducerConfig": {
"type": "object",
"properties": {
"connectionName": {
"description": "connection name to use from the connections map (must be type kafka)",
"type": "string"
},
"topic": {
"description": "name of the Kafka topic to which the producer will send messages",
"type": "string"
}
},
"required": [
"connectionName",
"topic"
],
"additionalProperties": {
"$ref": "#/definitions/Secret"
}
},
"ConnectionConfig": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "kafka"
},
"config": {
"$ref": "#/definitions/KafkaConnectionConfig"
}
},
"required": [
"type",
"config"
]
}
]
},
"KafkaConnectionConfig": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Secret"
}
},
"AggregationConfig": {
"type": "object",
"properties": {
"graph": {
"$ref": "#/definitions/AggregationGraph"
},
"persistence": {
"$ref": "#/definitions/PersistenceConfig"
},
"internalUpdates": {
"$ref": "#/definitions/InternalUpdatesConfig"
},
"mode": {
"default": "read-delete",
"allOf": [
{
"$ref": "#/definitions/OutputMode"
}
]
}
},
"required": [
"graph",
"persistence",
"internalUpdates"
]
},
"AggregationGraph": {
"description": "Configuration given by the user of a DAG of nodes, linked by each other\nwith edges that contains filter to retrieve records from/to a given record",
"type": "object",
"properties": {
"edges": {
"type": "array",
"items": {
"$ref": "#/definitions/Edge"
}
},
"nodes": {
"type": "array",
"items": {
"$ref": "#/definitions/Node"
}
}
},
"required": [
"edges",
"nodes"
]
},
"Edge": {
"type": "object",
"properties": {
"id": {
"description": "identifier of an edge in the graph. must be unique",
"type": "string"
},
"filter": {
"description": "filter to retrieve data from node A to node B",
"default": true,
"allOf": [
{
"$ref": "#/definitions/GenericFilter"
}
]
}
},
"required": [
"id"
]
},
"GenericFilter": {
"title": "DownwardFilter",
"description": "A filter for constraining how two nodes are linked together",
"anyOf": [
{
"description": "Logical and operator between two filters",
"type": "object",
"properties": {
"$and": {
"type": "array",
"items": {
"$ref": "#/definitions/GenericFilter"
}
}
},
"required": [
"$and"
],
"additionalProperties": false
},
{
"description": "Logical or operator between two filters",
"type": "object",
"properties": {
"$or": {
"type": "array",
"items": {
"$ref": "#/definitions/GenericFilter"
}
}
},
"required": [
"$or"
],
"additionalProperties": false
},
{
"description": "Logical negation of the filter",
"type": "object",
"properties": {
"$not": {
"$ref": "#/definitions/GenericFilter"
}
},
"required": [
"$not"
],
"additionalProperties": false
},
{
"description": "Equality operation among two fields",
"type": "object",
"properties": {
"$eq": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$eq"
],
"additionalProperties": false
},
{
"description": "Inequality operation among two fields",
"type": "object",
"properties": {
"$ne": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$ne"
],
"additionalProperties": false
},
{
"description": "Greater than operator among two fields",
"type": "object",
"properties": {
"$gt": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$gt"
],
"additionalProperties": false
},
{
"description": "Greater than or equal operator among two fields",
"type": "object",
"properties": {
"$gte": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$gte"
],
"additionalProperties": false
},
{
"description": "Lower than operator among two fields",
"type": "object",
"properties": {
"$lt": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$lt"
],
"additionalProperties": false
},
{
"description": "Lower than or equal operator among two fields",
"type": "object",
"properties": {
"$lte": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$lte"
],
"additionalProperties": false
},
{
"description": "Length equality operator among two array fields",
"type": "object",
"properties": {
"$sizeEq": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$sizeEq"
],
"additionalProperties": false
},
{
"description": "Length inequality operator among two array fields",
"type": "object",
"properties": {
"$sizeNe": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$sizeNe"
],
"additionalProperties": false
},
{
"description": "Length greater than operator among two array fields",
"type": "object",
"properties": {
"$sizeGt": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$sizeGt"
],
"additionalProperties": false
},
{
"description": "Length greater than or equal operator among two array fields",
"type": "object",
"properties": {
"$sizeGte": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$sizeGte"
],
"additionalProperties": false
},
{
"description": "Length lower than operator among two array fields",
"type": "object",
"properties": {
"$sizeLt": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$sizeLt"
],
"additionalProperties": false
},
{
"description": "Length lower than or equal operator among two array fields",
"type": "object",
"properties": {
"$sizeLte": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$sizeLte"
],
"additionalProperties": false
},
{
"description": "Local field must be contained in foreign array fields",
"type": "object",
"properties": {
"$in": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$in"
],
"additionalProperties": false
},
{
"description": "Local field must not be contained in foreign array fields",
"type": "object",
"properties": {
"$nin": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$nin"
],
"additionalProperties": false
},
{
"description": "Local array field must contain in foreign fields",
"type": "object",
"properties": {
"$contains": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$contains"
],
"additionalProperties": false
},
{
"description": "Local array field must not contain in foreign fields",
"type": "object",
"properties": {
"$ncontains": {
"type": "array",
"maxItems": 2,
"minItems": 2,
"items": [
{
"$ref": "#/definitions/RepositoryFilter"
},
{
"$ref": "#/definitions/DownwardRight"
}
]
}
},
"required": [
"$ncontains"
],
"additionalProperties": false
},
{
"description": "Verifies whether field is of type array",
"type": "object",
"properties": {
"$isArray": {
"$ref": "#/definitions/RepositoryFilter"
}
},
"required": [
"$isArray"
],
"additionalProperties": false
},
{
"description": "Matches a field against a regular expression",
"type": "object",
"properties": {
"$regexMatch": {
"$ref": "#/definitions/RegexMatchArgs"
}
},
"required": [
"$regexMatch"
],
"additionalProperties": false
},
{
"description": "Either prevent any link among two nodes (`false`) or perform\nthe cartesian product among records of the two nodes (`true`)",
"type": "boolean"
}
]
},
"RepositoryFilter": {
"type": "object",
"properties": {
"foreign": {
"$ref": "#/definitions/FieldPath"
}
},
"required": [
"foreign"
]
},
"FieldPath": {
"description": "A path of fields that identifies the position of a value in a generic struct",
"type": "array",
"items": {
"$ref": "#/definitions/FieldSegment"
}
},
"FieldSegment": {
"anyOf": [
{
"type": "string"
},
{
"type": "integer",
"format": "uint",
"minimum": 0
}
]
},
"DownwardRight": {
"anyOf": [
{
"$ref": "#/definitions/LocalFilter"
},
{
"$ref": "#/definitions/FilterValue"
}
]
},
"LocalFilter": {
"type": "object",
"properties": {
"local": {
"$ref": "#/definitions/FieldPath"
}
},
"required": [
"local"
]
},
"FilterValue": {
"anyOf": [
{
"type": "string"
},
{
"type": "integer",
"format": "int64"
},
{
"type": "number",
"format": "double"
},
{
"type": "boolean"
},
{
"type": "array",
"items": {
"type": "string"
}
},
{
"type": "array",
"items": {
"type": "number",
"format": "double"
}
},
{
"type": "array",
"items": {
"type": "boolean"
}
},
{
"type": "null"
}
]
},
"RegexMatchArgs": {
"type": "object",
"properties": {
"input": {
"$ref": "#/definitions/RepositoryFilter"
},
"regex": {
"type": "string"
},
"options": {
"type": [
"string",
"null"
]
}
},
"required": [
"input",
"regex"
]
},
"Node": {
"description": "A node represents a data stream appearing in the aggregation.",
"type": "object",
"properties": {
"edges": {
"$ref": "#/definitions/EdgeLinks"
},
"id": {
"description": "identifier of a node. must be unique",
"type": "string"
}
},
"required": [
"edges",
"id"
]
},
"EdgeLinks": {
"description": "Defines lists of inner and outer nodes of an edge.",
"type": "object",
"properties": {
"in": {
"description": "This represents and array of edges necessary to reach a node, as in, to reach X you need both Y and Z",
"type": "array",
"items": {
"type": "string"
}
},
"out": {
"description": "This represents and array of edges that can be traversed to reach other nodes",
"type": "array",
"items": {
"type": "string"
}
}
},
"required": [
"in",
"out"
]
},
"PersistenceConfig": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "mongo"
},
"config": {
"$ref": "#/definitions/MongoConfig"
}
},
"required": [
"type",
"config"
]
}
]
},
"MongoConfig": {
"type": "object",
"properties": {
"url": {
"$ref": "#/definitions/Secret"
},
"database": {
"type": [
"string",
"null"
]
},
"appName": {
"type": [
"string",
"null"
]
}
},
"required": [
"url"
]
},
"InternalUpdatesConfig": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "kafka"
},
"config": {
"$ref": "#/definitions/KafkaInternalUpdatesConfig"
}
},
"required": [
"type",
"config"
]
}
]
},
"KafkaInternalUpdatesConfig": {
"type": "object",
"properties": {
"topic": {
"description": "name of the Kafka topic from which the consumer will poll\nand the producer will send messages",
"type": "string"
},
"connectionName": {
"description": "connection name to use from the connections map (must be type kafka)",
"type": "string"
},
"compressionWindowMs": {
"description": "the timespan of the window to attempt internal update messages compression",
"type": "integer",
"format": "uint64",
"minimum": 0,
"default": 250
},
"consumer": {
"default": {
"commitIntervalMs": 0
},
"allOf": [
{
"$ref": "#/definitions/KafkaInternalUpdatesConsumerConfig"
}
]
},
"producer": {
"default": {},
"allOf": [
{
"$ref": "#/definitions/KafkaInternalUpdatesProducerConfig"
}
]
}
},
"required": [
"topic",
"connectionName"
]
},
"KafkaInternalUpdatesConsumerConfig": {
"type": "object",
"properties": {
"commitIntervalMs": {
"description": "number of milliseconds between one commit and another",
"type": "integer",
"format": "uint64",
"minimum": 0,
"default": 500
}
},
"additionalProperties": {
"$ref": "#/definitions/Secret"
}
},
"KafkaInternalUpdatesProducerConfig": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Secret"
}
},
"OutputMode": {
"description": "Select which strategy is employed to generate the output event from HEAD aggregation units.",
"oneOf": [
{
"description": "The [`OutputMode::OperationForwarding`] mode, ensures that Debezium operations occurring on the\nHEAD unit are forwarded, while operations occurring on internal nodes are produced as updates.\n\nNote: before and after properties of the payload are set accordingly to the selected DebeziumOperation",
"type": "string",
"const": "operation-forwarding"
},
{
"description": "[`OutputMode::ReadDelete`], which is default, transforms Debezium create/update operations into read operations, so that\n`before` field is never set in the payload.\n\nNote: this mode reduces the size of the output payload.",
"type": "string",
"const": "read-delete"
},
{
"description": "[`OutputMode::KeyOnly`], transforms Debezium create/update operations into read operations, also\ndoes NOT send the payload.\n\nPayload can be recovered by third-party consumer using 'farm-data' Rest API `/heads/{node}/items/{key}`\nwhere the key is the received base64 URL-safe not padded UTF-8 string representing the message key.\n\nNote: this mode reduces the size of the output payload to few bytes.",
"type": "string",
"const": "key-only"
}
]
},
"ControlPlaneConfig": {
"type": "object",
"properties": {
"grpcAddress": {
"description": "Address to the gRPC server that should receive service feedback events",
"type": "string",
"examples": [
"http://control-plane-operator:50052"
]
},
"feedbackInterval": {
"description": "Interval in milliseconds that must elapse between two feedback events sent to Control Plane Operator.\nIt defaults to `3000` ms when not provided during deserialization.",
"type": "integer",
"format": "uint64",
"minimum": 0,
"default": 3000
},
"resumeAfterMs": {
"description": "The number of milliseconds to wait before running the processing logic\nwhen connection with Control Plane Operator failed\nand no desired fast data state was ever received.",
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0,
"default": null
}
},
"required": [
"grpcAddress"
]
},
"ServerSettings": {
"type": "object",
"properties": {
"ip": {
"description": "Server bind IP",
"type": "string",
"format": "ipv4",
"default": "0.0.0.0"
},
"port": {
"description": "Server port",
"type": "integer",
"format": "uint16",
"minimum": 0,
"maximum": 65535,
"default": 3000
},
"apiPrefix": {
"description": "Server REST api prefix",
"default": "/",
"allOf": [
{
"$ref": "#/definitions/ApiPrefix"
}
]
}
}
},
"ApiPrefix": {
"title": "string",
"type": "string",
"pattern": "^/(?:[a-zA-Z0-9._~-]+(?:/[a-zA-Z0-9._~-]+)*)?$"
}
}
}
{
"$schema": "https://docs.mia-platform.eu/schemas/fast_data/farm-data.0.5.3.schema.json",
"id": "toy_model",
"connections": {
"local": {
"type": "kafka",
"config": {
"bootstrap.servers": {
"type": "file",
"path": "/run/secrets/kafka/bootstrap.servers"
},
"auto.offset.reset": "earliest",
"queued.max.messages.kbytes": "8192",
"queued.min.messages": "500"
}
}
},
"consumers": {
"type": "kafka",
"config": {
"users": {
"topic": "fd.toy-model.users.raw",
"connectionName": "local",
"group.id": "fd.toy-model.sv_users"
},
"posts": {
"topic": "fd.toy-model.posts.raw",
"connectionName": "local",
"group.id": "fd.toy-model.sv_users"
},
"comments": {
"topic": "fd.toy-model.comments.raw",
"connectionName": "local",
"group.id": "fd.toy-model.sv_users"
}
}
},
"producer": {
"type": "kafka",
"config": {
"topic": "fd.toy-model.sv_users.aggregated",
"connectionName": "local"
}
},
"processor": {
"persistence": {
"type": "mongo",
"config": {
"url": {
"type": "file",
"path": "/run/secrets/mongodb/url"
}
}
},
"internalUpdates": {
"type": "kafka",
"config": {
"topic": "fd.toy-model.sv_users.internal-updates",
"connectionName": "local",
"consumer": {
"group.id": "fd.toy-model.sv_users",
"client.id": "fd.toy-model.internal-updates",
"queued.max.messages.kbytes": "96",
"queued.min.messages": "160",
"fetch.message.max.bytes": "40320"
}
}
},
"graph": {
"nodes": [
{
"id": "users",
"edges": {
"in": [],
"out": [
"users->posts"
]
}
},
{
"id": "posts",
"edges": {
"in": [
"users->posts"
],
"out": [
"posts->comments"
]
}
},
{
"id": "comments",
"edges": {
"in": [
"posts->comments"
],
"out": []
}
}
],
"edges": [
{
"id": "users->posts",
"filter": {
"$eq": [
{
"foreign": [
"userId"
]
},
{
"local": [
"id"
]
}
]
}
},
{
"id": "posts->comments",
"filter": {
"$eq": [
{
"foreign": [
"postId"
]
},
{
"local": [
"id"
]
}
]
}
}
]
}
}
}
The raw JSON schema can also be found here.
In addition, Kafka configurations and MongoDB persistence properties support secret resolution.
Connections
The connections
field is a map where each key is a connection name and its value is
a ConnectionConfig
. Currently, ConnectionConfig
can only be of type kafka
.
A Kafka connection configuration can include additional properties defined by the Secret
schema.
Consumers
The consumers
configuration defines how the system consumes data. Currently,
ConsumersConfig
can only be of type kafka
. The config
property within consumers
is an object where each key can represent a specific Kafka consumer configuration. Each
KafkaConsumerConfig
requires:
topic
: The name of the Kafka topic from which the consumer will poll.connectionName
: The name of the Kafka connection to use from theconnections
map.commitIntervalMs
: An optional property defining the number of milliseconds between commits, with a default of 500ms and a minimum of 0. In case the need to change this value should arise, we recommend to assign the same value to all the consumers.
Additional properties for Kafka consumer configuration can be found in the librdkafka documentation.
When configuring the service, it is important to configure the following Kafka consumer properties:
queued.min.messages
queued.max.messages.kbytes
Since they regulate the amount of memory consumed by the service, these values should be tuned depending on the resources (CPU and memory) that can assigned to the service and the size of the underlying database. Indeed, those external factors affect the processing throughput; therefore, retaining more messages in the consumer fetch queue than the amount the service can process may risk to waste service memory.
For example, when using a MongoDB M30 instance with a maximum of 2400 IOPS for read+write, we recommend to start by setting these values to:
queued.min.messages=1000
queued.max.messages.kbytes=16384
When configuring consumers, it is important to know that different configurations (e.g. group.id
)
would trigger the creation of different consumer clients. This may be necessary to enable
the same stream to be aggregated in different manners.
Please, notice that the instantiation of additional consumers may require to increase service memory requests and limits, since these consumers would have their own internal fetch queue.
The following Kafka consumer properties are not configurable:
allow.auto.create.topics
→"false"
enable.auto.commit
→"false"
The first parameter is included to enforce user responsibility over topics creation, so that the proper configurations, such as number of partitions, replication factor and retention policy are set. In addition, the latter property disables the driver auto-commit feature in favour of an ad-hoc internal logic.
Producer
The producer
configuration defines how the system produces data. Currently,
ProducerConfig
can only be of type: "kafka"
. A KafkaProducerConfig
requires:
connectionName
: The name of the Kafka connection to use from theconnections
map.topic
: The name of the Kafka topic to which the producer will send messages.
Additional properties for Kafka producer configuration can be found in the librdkafka documentation.
Kafka producer is configured to compress messages by default using snappy
algorithm.
This enables reducing the disk space consumed on the Kafka broker.
Furthermore, the following properties are not configurable:
allow.auto.create.topics
→"false"
enable.idempotence
→"true"
acks
→"all"
The first parameter is included to enforce user responsibility over topics creation, so that the proper configurations, such as number of partitions, replication factor and retention policy are set. In addition, the latter properties ensure that no duplicated messages are produced on Kafka brokers.
Processor
The processor
field defines the aggregation configuration.
It requires a graph
, persistence
, and internalUpdates
configuration.
Persistence
The persistence
configuration defines how aggregated data is stored.
Currently, PersistenceConfig
can only be of type mongo
.
The MongoConfig
requires a url
, which is a Secret
, and can optionally specify
a database
.
For proper functioning, sink collections need to be created. The indexer
CLI can
be run to automatically create the necessary indexes.
Example of connection configuration:
{
// other properties
"persistence": {
"type": "mongo",
"config": {
"url": "mongodb://localhost:27017/farm-data",
"database": "/farm-data",
"appName": "eu.miaplatfor.farm-data.lakes"
}
},
// other properties
}
Farm Data service heavily relies on the persistence layer to cache the current stream state. Consequently, it is highly recommended to configure MongoDB instance to actually sustain a heavy IO load, setting maximum IOPS to at least a value of 2400 IOPS (1200 read + 1200 write), though higher values would only benefit the service throughput.
Internal Updates
The internalUpdates
configuration specifies how internal updates are handled within the system. Currently, InternalUpdatesConfig
can only be of type: "kafka"
. The KafkaInternalUpdatesConfig
requires:
topic
: The name of the Kafka topic for both consuming and producing internal update messages.connectionName
: The name of the Kafka connection to use from theconnections
map.compressionWindowMs
: The timespan of the window to attempt internal update messages compression, that is process only the last message with the same key within the selected time-frame. It defaults to 250 ms and it must be a positive integer (greater or equal to 0).consumer
: An optional Kafka consumer configuration. For this property it is recommended to setgroup.id
to the same value of all the other consumers previously defined andclient.id
to its own identifier (for exampleiu-consumer
) to instantiate an ad-hoc consumer dedicated to processing internal-update eventsproducer
: An optional Kafka producer configuration
Aggregation Graph
See aggregation in Usage page.
Secret Management
The schema supports flexible secret management through the Secret
type, which can be:
- Direct value: plain string value
- Environment variable (
type: "env"
): read from environment variablekey
: environment variable nameencoding
: Optional encoding (e.g., "base64")
- File reference (
type: "file"
): read from file systempath
: file path containing the secretkey
: optional key to read a single value among other properties (for.ini
or.json
files). When not provided, the whole file content is loaded as the secret valueencoding
: optional encoding (e.g., "base64")
Properties that support Secret
interface are:
- Kafka producer configuration
- Kafka consumer configuration
- persistence connection string
For more details, please refer to secrets resolution
in config maps documentation page and secret_rs
library documentation.
Control Plane Support
The service implements the interface for connecting towards a Control-Plane Operator.
However, complete Runtime Management
support, that is with Control Plane UI and a central Control Plane instance
will be added in the future.
Recommended Kafka Configuration
When configuring Kafka consumer, it is advised to set appropriate values for constraining the consumer internal queue. In this manner:
- the maximum amount of memory employed by the service can be finely tuned to avoid wasting resources, since only the number of messages that can effectively be processed in real-time should be pulled in memory;
- it is ensured that consumer continuously poll the broker to avoid it exiting the consumer group, since a lower number of buffered messages can trigger a new fetch to replenish it;
The main values to tune are:
queued.max.messages.kbytes
: maximum number of kilobytes of queued pre-fetched messages in the local consumer queue;queued.min.messages
: minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue;
It is recommended to set queued.min.messages
to a value greater, but close to the
average message consumption rate. It is possible to observer:
kafka_consumer_rx_msgs_total
→ messages readfarm_data_processed_msg
→ total number of processed messages
to check the average values.
For Farm Data service, an example of configuration can be the following one:
{
"queued.max.messages.kbytes": "8192",
"queued.min.messages": "500"
}
Another important property that might need to be tuned is fetch.message.max.bytes
,
which however should be set only in case queued.max.messages.kbytes
is set to
a value lower than 1024
.
Internal Updates Consumer
internal-updates
consumer requires an ad-hoc consumer configuration due
to its messages' uniqueness. In fact, internal-update
messages are very small
(in the bytes range), but they trigger a larger computation that may require different
milliseconds to complete.
Due to the combination of these factors, using the default queue parameters or even the ones
adopted for the input streams is not recommended. Indeed, the Kafka consumer tries
to fetch and buffer a large amount of events, since they are small, but it takes a considered
amount of time to clear them from the queue. This prevents the consumer from fetching newer
messages within the constraint set by max.poll.interval.ms
(a default interval of 5 minutes).
Once that time elapses, the consumer instance is considered dead by the Kafka broker and
forces it to leave the group, triggering a restart of the service since its events stream has
terminated.
To prevent this unwanted situation that hinders the advancement of events processing, it
has been observed that modifying the consumer parameters can improve the stability of the
service itself. Thus, below are provided the recommended configuration to apply to the
Kafka consumer of the internal-updates
configuration:
{
"queued.max.messages.kbytes": "96",
"queued.min.messages": "160",
"fetch.message.max.bytes": "40320"
}
As it can be observed, here also fetch.message.max.bytes
parameter has changed, since
it governs how many bytes are fetched per topic+partition the first time the consumer
connects. Consequently, leaving the default value of 1MB would lead to a behavior where
the service starts, aggregates events for about 5 minutes and then it restarts because
it has been forced out of the consumer group.
When a consumer instance is forced out of its consumer group, such instance may not have the chance to commit the work it has already carried out. Thus, setting the proper values is fundamental to guaranteed service stability and progress in consuming messages from Kafka topic.
queued.max.messages.kbytes
value use KBytes unit, whereas fetch.message.max.bytes
use bytes unit. Thus, the latter appears larger, though it isn't.
Kubernetes
Resources
When the plugin is deployed on Kubernetes, it is advised to set its resources requests and limits. Here are provided which are the recommended and minimum ones, although they can be changed according to your needs:
Recommended
- request:
CPU: 100m
Memory: 100MB - limits:
CPU: 1000m
Memory: 250MB
Memory usage also depends on the parameters described in the previous sections, that are:
- the number of input topics
- the number of spawned internal consumer
- how each Kafka consumer queue is configured
As a result, it is advised to adjust the requests and limits accordingly.
Status Probes
The service exposes the liveness
and readines
status probes as HTTP endpoint, which
helps Kubernetes when the service is successfully started and when it may need to be restarted.
The endpoints are:
liveness
probe:/-/healthz
readiness
probe:/-/ready