Skip to main content
Version: 14.x

Configuration

Environment Variables

VariableDescriptionDefault
LOG_LEVELthe maximum log level to emit. Accepted levels are trace|debug|info|warn|errorinfo
CONFIGURATION_FOLDERthe filepath to the folder under which configuration file is located<HOME>/.df/aggregation
FARM_DATA_ALGORITHMwhich type of aggregation algorithm should be employed. Values can be default (incremental aggregation) and full (aggregation only on the HEAD)default
OTEL_EXPORTER_OTLP_ENDPOINTspecify the OpenTelemetry OTLP endpoint where traces and metrics should be pushed. When not set telemetry is not exported
info

Currently <HOME> value is set to /home/farm_data, which is based on how the service image is built.

Configuration File

The Farm Data service configuration is stored in config.json file, and it specifies various aspects of the system, including connections, consumers, producers, and data processing. Its content respects the JSON schema specification provided below. In the next sections, its properties description are further explained and clarified.

Loading ....
note

The raw JSON schema can also be found here.

In addition, Kafka configurations and MongoDB persistence properties support secret resolution.

Connections

The connections field is a map where each key is a connection name and its value is a ConnectionConfig. Currently, ConnectionConfig can only be of type kafka. A Kafka connection configuration can include additional properties defined by the Secret schema.

Consumers

The consumers configuration defines how the system consumes data. Currently, ConsumersConfig can only be of type kafka. The config property within consumers is an object where each key can represent a specific Kafka consumer configuration. Each KafkaConsumerConfig requires:

  • topic: The name of the Kafka topic from which the consumer will poll.
  • connectionName: The name of the Kafka connection to use from the connections map.
  • commitIntervalMs: An optional property defining the number of milliseconds between commits, with a default of 500ms and a minimum of 0. In case the need to change this value should arise, we recommend to assign the same value to all the consumers.

Additional properties for Kafka consumer configuration can be found in the librdkafka documentation.

warning

When configuring the service, it is important to configure the following Kafka consumer properties:

queued.min.messages
queued.max.messages.kbytes

Since they regulate the amount of memory consumed by the service, these values should be tuned depending on the resources (CPU and memory) that can assigned to the service and the size of the underlying database. Indeed, those external factors affect the processing throughput; therefore, retaining more messages in the consumer fetch queue than the amount the service can process may risk to waste service memory.

For example, when using a MongoDB M30 instance with a maximum of 2400 IOPS for read+write, we recommend to start by setting these values to:

queued.min.messages=1000
queued.max.messages.kbytes=16384
info

When configuring consumers, it is important to know that different configurations (e.g. group.id) would trigger the creation of different consumer clients. This may be necessary to enable the same stream to be aggregated in different manners.

Please, notice that the instantiation of additional consumers may require to increase service memory requests and limits, since these consumers would have their own internal fetch queue.

note

The following Kafka consumer properties are not configurable:

  • allow.auto.create.topics"false"
  • enable.auto.commit"false"

The first parameter is included to enforce user responsibility over topics creation, so that the proper configurations, such as number of partitions, replication factor and retention policy are set. In addition, the latter property disables the driver auto-commit feature in favour of an ad-hoc internal logic.

Producer

The producer configuration defines how the system produces data. Currently, ProducerConfig can only be of type: "kafka". A KafkaProducerConfig requires:

  • connectionName: The name of the Kafka connection to use from the connections map.
  • topic: The name of the Kafka topic to which the producer will send messages.

Additional properties for Kafka producer configuration can be found in the librdkafka documentation.

note

Kafka producer is configured to compress messages by default using snappy algorithm. This enables reducing the disk space consumed on the Kafka broker.

Furthermore, the following properties are not configurable:

  • allow.auto.create.topics"false"
  • enable.idempotence"true"
  • acks"all"

The first parameter is included to enforce user responsibility over topics creation, so that the proper configurations, such as number of partitions, replication factor and retention policy are set. In addition, the latter properties ensure that no duplicated messages are produced on Kafka brokers.

Processor

The processor field defines the aggregation configuration. It requires a graph, persistence, and internalUpdates configuration.

Persistence

The persistence configuration defines how aggregated data is stored. Currently, PersistenceConfig can only be of type mongo. The MongoConfig requires a url, which is a Secret, and can optionally specify a database.
For proper functioning, sink collections need to be created. The indexer CLI can be run to automatically create the necessary indexes.

Example of connection configuration:

{
// other properties
"persistence": {
"type": "mongo",
"config": {
"url": "mongodb://localhost:27017/farm-data",
"database": "/farm-data",
"appName": "eu.miaplatfor.farm-data.lakes"
}
},
// other properties
}
warning

Farm Data service heavily relies on the persistence layer to cache the current stream state. Consequently, it is highly recommended to configure MongoDB instance to actually sustain a heavy IO load, setting maximum IOPS to at least a value of 2400 IOPS (1200 read + 1200 write), though higher values would only benefit the service throughput.

Internal Updates

The internalUpdates configuration specifies how internal updates are handled within the system. Currently, InternalUpdatesConfig can only be of type: "kafka". The KafkaInternalUpdatesConfig requires:

  • topic: The name of the Kafka topic for both consuming and producing internal update messages.
  • connectionName: The name of the Kafka connection to use from the connections map.
  • compressionWindowMs: The timespan of the window to attempt internal update messages compression, that is process only the last message with the same key within the selected time-frame. It defaults to 250 ms and it must be a positive integer (greater or equal to 0).
  • consumer: An optional Kafka consumer configuration. For this property it is recommended to set group.id to the same value of all the other consumers previously defined and client.id to its own identifier (for example iu-consumer) to instantiate an ad-hoc consumer dedicated to processing internal-update events
  • producer: An optional Kafka producer configuration

Aggregation Graph

See aggregation in Usage page.

Secret Management

The schema supports flexible secret management through the Secret type, which can be:

  • Direct value: plain string value
  • Environment variable (type: "env"): read from environment variable
    • key: environment variable name
    • encoding: Optional encoding (e.g., "base64")
  • File reference (type: "file"): read from file system
    • path: file path containing the secret
    • key: optional key to read a single value among other properties (for .ini or .json files). When not provided, the whole file content is loaded as the secret value
    • encoding: optional encoding (e.g., "base64")

Properties that support Secret interface are:

  • Kafka producer configuration
  • Kafka consumer configuration
  • persistence connection string

For more details, please refer to secrets resolution in config maps documentation page and secret_rs library documentation.

Control Plane Support

The service implements the interface for connecting towards a Control-Plane Operator.
However, complete Runtime Management support, that is with Control Plane UI and a central Control Plane instance will be added in the future.

When configuring Kafka consumer, it is advised to set appropriate values for constraining the consumer internal queue. In this manner:

  • the maximum amount of memory employed by the service can be finely tuned to avoid wasting resources, since only the number of messages that can effectively be processed in real-time should be pulled in memory;
  • it is ensured that consumer continuously poll the broker to avoid it exiting the consumer group, since a lower number of buffered messages can trigger a new fetch to replenish it;

The main values to tune are:

  • queued.max.messages.kbytes: maximum number of kilobytes of queued pre-fetched messages in the local consumer queue;
  • queued.min.messages: minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue;

It is recommended to set queued.min.messages to a value greater, but close to the average message consumption rate. It is possible to observer:

  • kafka_consumer_rx_msgs_total → messages read
  • farm_data_processed_msg → total number of processed messages

to check the average values.

For Farm Data service, an example of configuration can be the following one:

{
"queued.max.messages.kbytes": "8192",
"queued.min.messages": "500"
}

Another important property that might need to be tuned is fetch.message.max.bytes, which however should be set only in case queued.max.messages.kbytes is set to a value lower than 1024.

Internal Updates Consumer

internal-updates consumer requires an ad-hoc consumer configuration due to its messages' uniqueness. In fact, internal-update messages are very small (in the bytes range), but they trigger a larger computation that may require different milliseconds to complete.

Due to the combination of these factors, using the default queue parameters or even the ones adopted for the input streams is not recommended. Indeed, the Kafka consumer tries to fetch and buffer a large amount of events, since they are small, but it takes a considered amount of time to clear them from the queue. This prevents the consumer from fetching newer messages within the constraint set by max.poll.interval.ms (a default interval of 5 minutes). Once that time elapses, the consumer instance is considered dead by the Kafka broker and forces it to leave the group, triggering a restart of the service since its events stream has terminated.

To prevent this unwanted situation that hinders the advancement of events processing, it has been observed that modifying the consumer parameters can improve the stability of the service itself. Thus, below are provided the recommended configuration to apply to the Kafka consumer of the internal-updates configuration:

{
"queued.max.messages.kbytes": "96",
"queued.min.messages": "160",
"fetch.message.max.bytes": "40320"
}

As it can be observed, here also fetch.message.max.bytes parameter has changed, since it governs how many bytes are fetched per topic+partition the first time the consumer connects. Consequently, leaving the default value of 1MB would lead to a behavior where the service starts, aggregates events for about 5 minutes and then it restarts because it has been forced out of the consumer group.

danger

When a consumer instance is forced out of its consumer group, such instance may not have the chance to commit the work it has already carried out. Thus, setting the proper values is fundamental to guaranteed service stability and progress in consuming messages from Kafka topic.

note

queued.max.messages.kbytes value use KBytes unit, whereas fetch.message.max.bytes use bytes unit. Thus, the latter appears larger, though it isn't.

Kubernetes

Resources

When the plugin is deployed on Kubernetes, it is advised to set its resources requests and limits. Here are provided which are the recommended and minimum ones, although they can be changed according to your needs:

  • request:
    CPU: 100m 
    Memory: 100MB
  • limits:
    CPU: 1000m
    Memory: 250MB
note

Memory usage also depends on the parameters described in the previous sections, that are:

  • the number of input topics
  • the number of spawned internal consumer
  • how each Kafka consumer queue is configured

As a result, it is advised to adjust the requests and limits accordingly.

Status Probes

The service exposes the liveness and readines status probes as HTTP endpoint, which helps Kubernetes when the service is successfully started and when it may need to be restarted.

The endpoints are:

  • liveness probe: /-/healthz
  • readiness probe: /-/ready