Skip to main content
Version: 14.x

Configuration

In order to configure the service, a set of environment variables are adopted for describing basic service needs, alongside a main configuration file and a sandbox processor function.

Environment Variables

VariableDescriptionDefault
LOG_LEVELthe maximum log level to emit. Accepted levels are trace|debug|info|warn|errorinfo
HTTP_PORTthe HTTP port on which kubernetes status routes and metrics are exposed3000
CONFIGURATION_FOLDERthe filepath to the folder under which configuration file and sandbox processor user-defined function are located<HOME>/.df/stream-processor
OTEL_EXPORTER_OTLP_ENDPOINTspecify the OpenTelemetry OTLP endpoint where traces and metrics should be pushed. When not set, telemetry is not exported
info

Currently <HOME> value is set to /home/stream_processor, which is based on how the service image is built.

Configuration Files

To configure Stream Processor service, a main configuration file config.json and the processor function should be placed within the folder pointed by the CONFIGURATION_FOLDER environment variable. It is expected to find the config.json at the root of the config folder, while the processor function nested under the processors folder. Since only javascript is currently supported, all processing function will be written in a index.js nested under javascript folder.

info

Folders under processors directory should be named as the language employed to write the processor function.

Here it is shown the layout the configuration folder should have:

├── config.json         # --> the configuration file
└── processors
└── javascript
└── index.js # --> user-defined processing function

Service Settings file

The main configuration file describes the connection details with the stream platform, which customizations should be applied to the consumer and producer, whether one or more cache connections should be instantiated and how the sandbox should behave.

The file respects the following JSON schema and the following paragraph it is exampled in details.

Loading ....
note

The raw JSON schema can also be found here.

In addition, Kafka configurations and cache persistence properties support secret resolution.

Required Fields

The configuration requires three mandatory fields:

  • consumer: defines how the service consumes messages from the data source
  • producer: defines how the service produces messages to the output destination
  • processor: defines the processing engine and its settings

Consumer Configuration

The consumer field configures the input stream source. Currently supports:

  • Kafka Consumer (type: "kafka"): configures connection to a Kafka topic for message consumption
    • config: librdkafka consumer configuration properties (connection strings, authentication, etc.)
    • topic: name of the Kafka topic to read messages from
    • commitIntervalMs: interval between offset commits (default: 500ms)

Producer Configuration

The producer field configures the output stream destination. Currently supports:

  • Kafka Producer (type: "kafka"): configures connection to a Kafka topic for message production
    • config: librdkafka producer configuration properties (connection strings, authentication, etc.)
    • topic: name of the Kafka topic to write processed messages to

Processor Configuration

The processor field configures the processing engine. Currently supports:

  • JavaScript Sandbox (type: "javascript"): Configures the JavaScript execution environment
    • interruptMs: maximum execution time per function call (default: 5000ms)
    • consoleBuffer: console buffer size in bytes (default: 1024)
    • maxStackSize: maximum stack size in bytes (optional, uses QuickJS default)
    • maxHeapSize: maximum heap size in bytes (optional, uses QuickJS default)
    • payloadSerdeStrategy: serialization/deserialization strategy for message payloads For more details, please read the corresponding section in Usage page.

Caches Configuration (Optional)

The caches field is an optional object that defines named caches for use within processing functions. In case no cache is defined, the Stateful Stream Processing use case cannot be supported.

Each cache can be:

  • MongoDB Cache (type: "mongodb"): Persistent cache backed by MongoDB
    • url: MongoDB connection string (supports secrets)
    • collection: collection name for cache storage
    • database: database name (optional)
    • appName: application name for MongoDB connection (optional - useful to track which program is carrying out queries on the database)
  • In-Memory Cache (type: "in-memory"): Simple in-memory key-value storage. Use only when sharing the stream state across different service instances is not necessary. However, when it is possible use a sharable stream state.

For more details on how to interact with the caches, please read the dedicated section in the Usage page.

Control Plane Configuration (Optional)

The controlPlane field is optional and configures integration with the Control Plane Operator:

  • grpcAddress: gRPC server address for service feedback events
  • feedbackInterval: interval between feedback events in milliseconds (default: 3000ms)
  • resumeAfterMs: delay before processing when control plane connection fails (optional)

Secret Management

The schema supports flexible secret management through the Secret type, which can be:

  • Direct value: plain string value
  • Environment variable (type: "env"): read from environment variable
    • key: environment variable name
    • encoding: Optional encoding (e.g., "base64")
  • File reference (type: "file"): read from file system
    • path: file path containing the secret
    • key: optional key to read a single value among other properties (for .ini or .json files). When not provided, the whole file content is loaded as the secret value
    • encoding: optional encoding (e.g., "base64")

For more details, please refer to secrets resolution in config maps documentation page and secret_rs library documentation.

Sandbox User-Defined Function file

Once the sandbox settings are defined, it is possible to write your custom function in the index.js file.

For more details on how the user-defined processor function can be written and it how works, please read the Usage page.

Control Plane Support

The service implements the interface for connecting towards a Control-Plane Operator.
However, complete Runtime Management support, that is with Control Plane UI and a central Control Plane instance will be added in the future.

When configuring Kafka consumer, it is advised to set appropriate values for constraining the consumer internal queue. In this manner:

  • the maximum amount of memory employed by the service can be finely tuned to avoid wasting resources, since only the number of messages that can effectively be processed in real-time should be pulled in memory;
  • it is ensured that consumer continuously poll the broker to avoid it exiting the consumer group, since a lower number of buffered messages can trigger a new fetch to replenish it;

The main values to tune are:

  • queued.max.messages.kbytes: maximum number of kilobytes of queued pre-fetched messages in the local consumer queue;
  • queued.min.messages: minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue;

It is recommended to set queued.min.messages to a value greater, but close to the average message consumption rate. It is possible to observer:

  • kafka_consumer_rx_msgs_total → messages read
  • sp_processed_messages → total number of processed messages

to check the average values.

For Stream Processor service, an example of configuration can be the following one:

{
"queued.max.messages.kbytes": "32840",
"queued.min.messages": "5000"
}

Another important property that might need to be tuned is fetch.message.max.bytes, which however should be changed only in case queued.max.messages.kbytes is set to a value lower than 1024.

Kubernetes

Resources

When the plugin is deployed on Kubernetes, it is advised to set its resources requests and limits. Here are provided which are the recommended and minimum ones, although they can be changed according to your needs:

  • request:
    CPU: 100m 
    Memory: 20MB
  • limits:
    CPU: 800m/1000m
    Memory: 100MB

Minimum

  • request:
    CPU: 50m 
    Memory: 15MB
  • limits:
    CPU: 250m 
    Memory: 60MB
note

Memory usage also depends on the parameter described in the previous section. As a result, it is advised to adjust the requests and limits accordingly when changing consumer queue parameters.

Status Probes

The service exposes the liveness and readines status probes as HTTP endpoint, which helps Kubernetes when the service is successfully started and when it may need to be restarted.

The endpoints are:

  • liveness probe: /-/healthz
  • readiness probe: /-/ready