Skip to main content
Version: 14.4.0

Configuration

In order to configure the service, a set of environment variables are adopted for describing basic service needs, alongside a main configuration file and a sandbox processor function.

Environment Variables

VariableDescriptionDefault
LOG_LEVELthe maximum log level to emit. Accepted levels are trace|debug|info|warn|errorinfo
HTTP_PORTthe HTTP port on which kubernetes status routes and metrics are exposed3000
CONFIGURATION_FOLDERthe filepath to the folder under which configuration file and sandbox processor user-defined function are located<HOME>/.df/stream-processor
OTEL_EXPORTER_OTLP_ENDPOINTspecify the OpenTelemetry OTLP endpoint where traces and metrics should be pushed. When not set, telemetry is not exported
info

Currently <HOME> value is set to /home/stream_processor, which is based on how the service image is built.

Configuration Files

To configure Stream Processor service, a main configuration file config.json and the processor function should be placed within the folder pointed by the CONFIGURATION_FOLDER environment variable. It is expected to find the config.json at the root of the config folder, while the processor function nested under the processors folder. Since only javascript is currently supported, all processing function will be written in a index.js nested under javascript folder.

info

Folders under processors directory should be named as the language employed to write the processor function.

Here it is shown the layout the configuration folder should have:

├── config.json         # --> the configuration file
└── processors
└── javascript
└── index.js # --> user-defined processing function

Service Settings file

The main configuration file describes the connection details with the stream platform, which customizations should be applied to the consumer and producer, whether one or more cache connections should be instantiated and how the sandbox should behave.

The file respects the following JSON schema and the following paragraph it is exampled in details.

Loading ....
note

The raw JSON schema can also be found here.

In addition, Kafka configurations and cache persistence properties support secret resolution.

Required Fields

The configuration requires these main fields:

  • connections: defines the connections used by the service
  • consumer: defines how the service consumes messages from the data source
  • producer: defines how the service produces messages to the output destination
  • processor: defines the processing engine and its settings

Connections (Optional)

The connections field is a map where each key is a connection name and its value is a ConnectionConfig. A Kafka connection configuration can include additional properties defined by the Secret schema. The connection name has to be referenced in both consumer and producer configurations using the connectionName field.

Consumer Configuration

The consumer field configures the input stream source. Currently supports:

  • Kafka Consumer (type: "kafka"): configures connection to a Kafka topic for message consumption
    • type: must be set to "kafka"
    • topic: name of the Kafka topic to read messages from
    • connectionName: reference to a connection defined in the connections field
    • commitIntervalMs: interval between offset commits (default: 500ms)
    • config: additional librdkafka consumer configuration properties (client ID, group ID, offset reset behavior, queue settings, etc.). The field "group.id" is required.

Example:

{
"consumer": {
"type": "kafka",
"topic": "<INPUT_TOPIC>",
"connectionName": "kafka",
"config": {
"client.id": "<CLIENT_ID>",
"group.id": "<CONSUMER_GROUP_ID>",
"auto.offset.reset": "earliest",
"queued.max.messages.kbytes": "32840",
"queued.min.messages": "5000"
}
}
}

Producer Configuration

The producer field configures the output stream destination. Currently supports:

  • Kafka Producer (type: "kafka"): configures connection to a Kafka topic for message production
    • type: must be set to "kafka"
    • topic: name of the Kafka topic to write processed messages to
    • connectionName: reference to a connection defined in the connections field
    • config: additional librdkafka producer configuration properties (client ID, compression settings, etc.)

Example:

{
"producer": {
"type": "kafka",
"topic": "<OUTPUT_TOPIC>",
"connectionName": "kafka",
"config": {
"client.id": "<CLIENT_ID>",
"compression.type": "snappy"
}
}
}

Processor Configuration

The processor field configures the processing engine. Currently supports:

  • JavaScript Sandbox (type: "javascript"): Configures the JavaScript execution environment

    • interruptMs: maximum execution time per function call (default: 5000ms)

    • consoleBuffer: console buffer size in bytes (default: 1024)

    • maxStackSize: maximum stack size in bytes (optional, uses QuickJS default)

    • maxHeapSize: maximum heap size in bytes (optional, uses QuickJS default)

    • payloadSerdeStrategy: serialization/deserialization strategy for message payloads

      For more details, please read the corresponding section in Usage page.

  • onError: defines the behavior when an error occurs during message processing

    • failFast: (default) stop the processing engine upon encountering an error and fail the service
    • dlq: send the failed message to a Dead Letter Queue (DLQ) topic

    When onError is set to dlq, an additional configuration field dlq must be provided in the for of a kafka producer configuration object, which defines how to connect to the DLQ topic.

Caches Configuration (Optional)

The caches field is an optional object that defines named caches for use within processing functions. In case no cache is defined, the Stateful Stream Processing use case cannot be supported.

Each cache can be:

  • MongoDB Cache (type: "mongodb"): Persistent cache backed by MongoDB
    • url: MongoDB connection string (supports secrets)
    • collection: collection name for cache storage
    • database: database name (optional)
    • appName: application name for MongoDB connection (optional - useful to track which program is carrying out queries on the database)
  • In-Memory Cache (type: "in-memory"): Simple in-memory key-value storage. Use only when sharing the stream state across different service instances is not necessary. However, when it is possible use a sharable stream state.

For more details on how to interact with the caches, please read the dedicated section in the Usage page.

Control Plane Configuration (Optional)

The controlPlane field is optional and configures integration with the Control Plane Operator:

  • grpcAddress: gRPC server address for service feedback events
  • feedbackInterval: interval between feedback events in milliseconds (default: 3000ms)
  • resumeAfterMs: delay before processing when control plane connection fails (optional)

Secret Management

The schema supports flexible secret management through the Secret type, which can be:

  • Direct value: plain string value
  • Environment variable (type: "env"): read from environment variable
    • key: environment variable name
    • encoding: Optional encoding (e.g., "base64")
  • File reference (type: "file"): read from file system
    • path: file path containing the secret
    • key: optional key to read a single value among other properties (for .ini or .json files). When not provided, the whole file content is loaded as the secret value
    • encoding: optional encoding (e.g., "base64")

For more details, please refer to secrets resolution in config maps documentation page and secret_rs library documentation.

Sandbox User-Defined Function file

Once the sandbox settings are defined, it is possible to write your custom function in the index.js file.

For more details on how the user-defined processor function can be written and it how works, please read the Usage page.

Control Plane Support

The service implements the interface for connecting towards a Control-Plane Operator.
However, complete Runtime Management support, that is with Control Plane UI and a central Control Plane instance will be added in the future.

When configuring Kafka consumer, it is advised to set appropriate values for constraining the consumer internal queue. In this manner:

  • the maximum amount of memory employed by the service can be finely tuned to avoid wasting resources, since only the number of messages that can effectively be processed in real-time should be pulled in memory;
  • it is ensured that consumer continuously poll the broker to avoid it exiting the consumer group, since a lower number of buffered messages can trigger a new fetch to replenish it;

The main values to tune are:

  • queued.max.messages.kbytes: maximum number of kilobytes of queued pre-fetched messages in the local consumer queue;
  • queued.min.messages: minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue;

It is recommended to set queued.min.messages to a value greater, but close to the average message consumption rate. It is possible to observer:

  • kafka_consumer_rx_msgs_total → messages read
  • sp_processed_messages → total number of processed messages

to check the average values.

For Stream Processor service, an example of configuration can be the following one:

{
"queued.max.messages.kbytes": "32840",
"queued.min.messages": "5000"
}

Another important property that might need to be tuned is fetch.message.max.bytes, which however should be changed only in case queued.max.messages.kbytes is set to a value lower than 1024.

Kubernetes

Resources

When the plugin is deployed on Kubernetes, it is advised to set its resources requests and limits. Here are provided which are the recommended and minimum ones, although they can be changed according to your needs:

  • request:
    CPU: 100m 
    Memory: 20MB
  • limits:
    CPU: 800m/1000m
    Memory: 100MB

Minimum

  • request:
    CPU: 50m 
    Memory: 15MB
  • limits:
    CPU: 250m 
    Memory: 60MB
note

Memory usage also depends on the parameter described in the previous section. As a result, it is advised to adjust the requests and limits accordingly when changing consumer queue parameters.

Status Probes

The service exposes the liveness and readines status probes as HTTP endpoint, which helps Kubernetes when the service is successfully started and when it may need to be restarted.

The endpoints are:

  • liveness probe: /-/healthz
  • readiness probe: /-/ready