Configuration
In order to configure the service, a set of environment variables are adopted for describing basic service needs, alongside a main configuration file and a sandbox processor function.
Environment Variables
Variable | Description | Default |
---|---|---|
LOG_LEVEL | the maximum log level to emit. Accepted levels are trace |debug |info |warn |error | info |
HTTP_PORT | the HTTP port on which kubernetes status routes and metrics are exposed | 3000 |
CONFIGURATION_FOLDER | the filepath to the folder under which configuration file and sandbox processor user-defined function are located | <HOME>/.df/stream-processor |
OTEL_EXPORTER_OTLP_ENDPOINT | specify the OpenTelemetry OTLP endpoint where traces and metrics should be pushed. When not set, telemetry is not exported |
Currently <HOME>
value is set to /home/stream_processor
, which is based on how the service image is built.
Configuration Files
To configure Stream Processor service, a main configuration file config.json
and the processor function should be placed within the folder pointed by the
CONFIGURATION_FOLDER
environment variable. It is expected to find the config.json
at the root of the config folder, while the processor function nested under the
processors
folder.
Since only javascript
is currently supported, all processing function will be
written in a index.js
nested under javascript
folder.
Folders under processors
directory should be named as the language employed to
write the processor function.
Here it is shown the layout the configuration folder should have:
├── config.json # --> the configuration file
└── processors
└── javascript
└── index.js # --> user-defined processing function
Service Settings file
The main configuration file describes the connection details with the stream platform, which customizations should be applied to the consumer and producer, whether one or more cache connections should be instantiated and how the sandbox should behave.
The file respects the following JSON schema and the following paragraph it is exampled in details.
- Schema Viewer
- Raw JSON Schema
- Example
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Configuration",
"type": "object",
"properties": {
"caches": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Cache"
},
"default": {}
},
"consumer": {
"$ref": "#/definitions/ConsumerConfig"
},
"controlPlane": {
"anyOf": [
{
"$ref": "#/definitions/ControlPlaneConfig"
},
{
"type": "null"
}
]
},
"processor": {
"$ref": "#/definitions/ProcessorConfig"
},
"producer": {
"$ref": "#/definitions/ProducerConfig"
}
},
"required": [
"consumer",
"producer",
"processor"
],
"definitions": {
"Cache": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "in-memory"
}
},
"required": [
"type"
]
},
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "mongodb"
}
},
"allOf": [
{
"$ref": "#/definitions/MongodbCacheConfig"
}
],
"required": [
"type"
]
}
]
},
"ConsumerConfig": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "kafka"
}
},
"allOf": [
{
"$ref": "#/definitions/KafkaConsumerConfig"
}
],
"required": [
"type"
]
}
]
},
"ControlPlaneConfig": {
"type": "object",
"properties": {
"feedbackInterval": {
"description": "Interval in milliseconds that must elapse between two feedback events sent to Control Plane Operator.\nIt defaults to `3000` ms when not provided during deserialization.",
"type": "integer",
"format": "uint64",
"default": 3000,
"minimum": 0
},
"grpcAddress": {
"description": "Address to the gRPC server that should receive service feedback events",
"type": "string",
"examples": [
"http://control-plane-operator:50052"
]
},
"resumeAfterMs": {
"description": "The number of milliseconds to wait before running the processing logic\nwhen connection with control plane operator failed\nand no desired fast data state was ever received.",
"type": [
"integer",
"null"
],
"format": "uint64",
"default": null,
"minimum": 0
}
},
"required": [
"grpcAddress"
]
},
"KafkaConsumerConfig": {
"type": "object",
"properties": {
"commitIntervalMs": {
"description": "number of milliseconds between one commit and another",
"type": "integer",
"format": "uint64",
"default": 500,
"minimum": 0
},
"config": {
"description": "librdkafka Kafka consumer configuration properties | https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Secret"
}
},
"topic": {
"description": "name of the Kafka topic from which the consumer will read messages",
"type": "string"
}
},
"required": [
"config",
"topic"
]
},
"KafkaProducerConfig": {
"type": "object",
"properties": {
"config": {
"description": "librdkafka Kafka producer configuration properties | https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Secret"
}
},
"topic": {
"description": "name of the Kafka topic to which the producer will send messages",
"type": "string"
}
},
"required": [
"config",
"topic"
]
},
"MongodbCacheConfig": {
"type": "object",
"properties": {
"appName": {
"type": [
"string",
"null"
]
},
"collection": {
"type": "string"
},
"database": {
"type": [
"string",
"null"
]
},
"url": {
"$ref": "#/definitions/Secret"
}
},
"required": [
"url",
"collection"
]
},
"ProcessorConfig": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "javascript"
}
},
"allOf": [
{
"$ref": "#/definitions/SandboxConfig"
}
],
"required": [
"type"
]
}
]
},
"ProducerConfig": {
"oneOf": [
{
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "kafka"
}
},
"allOf": [
{
"$ref": "#/definitions/KafkaProducerConfig"
}
],
"required": [
"type"
]
}
]
},
"SandboxConfig": {
"type": "object",
"properties": {
"consoleBuffer": {
"description": "Size in bytes available to the console object in the sandbox",
"type": "integer",
"format": "uint",
"default": 1024,
"minimum": 0
},
"interruptMs": {
"description": "Max time in milliseconds a single function can be running within the sandbox",
"type": "integer",
"format": "uint64",
"default": 5000,
"minimum": 0
},
"maxHeapSize": {
"description": "Max heap size in bytes. When not set quickjs's default will be used",
"type": [
"integer",
"null"
],
"format": "uint",
"default": null,
"minimum": 0
},
"maxStackSize": {
"description": "Max stack size in bytes. When not set quickjs's default will be used",
"type": [
"integer",
"null"
],
"format": "uint",
"default": null,
"minimum": 0
},
"payloadSerdeStrategy": {
"description": "Option to configure the deserialization\nfor incoming payload, a.k.a. the method to call\non payload before to inject it in the sandbox message argument",
"allOf": [
{
"$ref": "#/definitions/SerdeSettings"
}
],
"default": {
"deserialize": "json"
}
}
}
},
"Secret": {
"anyOf": [
{
"type": "string"
},
{
"type": "object",
"properties": {
"encoding": {
"description": "Define which type of encoding the library supports when it needs to read the actual secret value.",
"type": "string",
"enum": [
"base64"
]
},
"key": {
"type": "string"
},
"type": {
"const": "env"
}
},
"required": [
"type",
"key"
]
},
{
"type": "object",
"properties": {
"encoding": {
"description": "Define which type of encoding the library supports when it needs to read the actual secret value.",
"type": "string",
"enum": [
"base64"
]
},
"key": {
"type": "string"
},
"path": {
"type": "string"
},
"type": {
"const": "file"
}
},
"required": [
"type",
"path"
]
}
],
"examples": [
"my-secret",
{
"key": "CUSTOM_ENV_VAR",
"type": "env"
},
{
"encoding": "base64",
"key": "CUSTOM_ENV_VAR",
"type": "env"
},
{
"path": "/path/to/file",
"type": "file"
}
]
},
"SerdeMode": {
"description": "Describe which serialization or deserialization strategy\nshould be applied to the key of a Kafka message",
"oneOf": [
{
"description": "serialize/deserialize the content as a JSON Object",
"type": "string",
"const": "json"
},
{
"description": "serialize/deserialize the content as a JSON Object\nwith compatibility for schema+payload when\nKafka uses a schema registry. The payload is in a\nsubkey payload",
"type": "string",
"const": "jsonWithSchema"
},
{
"description": "serialize/deserialize the key as a string: useful\nwhen payload bytes have to be processed raw inside the\nsandbox",
"type": "string",
"const": "string"
}
]
},
"SerdeSettings": {
"type": "object",
"properties": {
"deserialize": {
"allOf": [
{
"$ref": "#/definitions/SerdeMode"
}
],
"default": "json"
}
}
}
}
}
{
"$schema": "https://docs.mia-platform.eu/schemas/fast_data/stream-processor.0.5.2.schema.json",
"consumer": {
"type": "kafka",
"topic": "fd.stream-processor.input",
"config": {
"client.id": "stream-processor-consumer",
"bootstrap.servers": {
"type": "file",
"path": "/run/secrets/kafka/bootstrap.servers"
},
"group.id": "fd.stream-processor",
"auto.offset.reset": "earliest"
},
"commitIntervalMs": 500
},
"producer": {
"type": "kafka",
"topic": "fd.stream-processor.output",
"config": {
"client.id": "stream-processor-producer",
"bootstrap.servers": {
"type": "file",
"path": "/run/secrets/kafka/bootstrap.servers"
},
"compression.type": "snappy"
}
},
"processor": {
"type": "javascript"
}
}
The raw JSON schema can also be found here.
In addition, Kafka configurations and cache persistence properties support secret resolution.
Required Fields
The configuration requires three mandatory fields:
consumer
: defines how the service consumes messages from the data sourceproducer
: defines how the service produces messages to the output destinationprocessor
: defines the processing engine and its settings
Consumer Configuration
The consumer
field configures the input stream source. Currently supports:
- Kafka Consumer (
type: "kafka"
): configures connection to a Kafka topic for message consumptionconfig
:librdkafka
consumer configuration properties (connection strings, authentication, etc.)topic
: name of the Kafka topic to read messages fromcommitIntervalMs
: interval between offset commits (default: 500ms)
Producer Configuration
The producer
field configures the output stream destination. Currently supports:
- Kafka Producer (
type: "kafka"
): configures connection to a Kafka topic for message productionconfig
:librdkafka
producer configuration properties (connection strings, authentication, etc.)topic
: name of the Kafka topic to write processed messages to
Processor Configuration
The processor
field configures the processing engine. Currently supports:
- JavaScript Sandbox (
type: "javascript"
): Configures the JavaScript execution environmentinterruptMs
: maximum execution time per function call (default:5000ms
)consoleBuffer
: console buffer size in bytes (default:1024
)maxStackSize
: maximum stack size in bytes (optional, uses QuickJS default)maxHeapSize
: maximum heap size in bytes (optional, uses QuickJS default)payloadSerdeStrategy
: serialization/deserialization strategy for message payloads For more details, please read the corresponding section in Usage page.
Caches Configuration (Optional)
The caches
field is an optional object that defines named caches for use within
processing functions. In case no cache is defined, the Stateful Stream Processing
use case cannot be supported.
Each cache can be:
- MongoDB Cache (
type: "mongodb"
): Persistent cache backed by MongoDBurl
: MongoDB connection string (supports secrets)collection
: collection name for cache storagedatabase
: database name (optional)appName
: application name for MongoDB connection (optional - useful to track which program is carrying out queries on the database)
- In-Memory Cache (
type: "in-memory"
): Simple in-memory key-value storage. Use only when sharing the stream state across different service instances is not necessary. However, when it is possible use a sharable stream state.
For more details on how to interact with the caches, please read the dedicated section in the Usage page.
Control Plane Configuration (Optional)
The controlPlane
field is optional and configures integration with the Control Plane Operator:
grpcAddress
: gRPC server address for service feedback eventsfeedbackInterval
: interval between feedback events in milliseconds (default:3000ms
)resumeAfterMs
: delay before processing when control plane connection fails (optional)
Secret Management
The schema supports flexible secret management through the Secret
type, which can be:
- Direct value: plain string value
- Environment variable (
type: "env"
): read from environment variablekey
: environment variable nameencoding
: Optional encoding (e.g., "base64")
- File reference (
type: "file"
): read from file systempath
: file path containing the secretkey
: optional key to read a single value among other properties (for.ini
or.json
files). When not provided, the whole file content is loaded as the secret valueencoding
: optional encoding (e.g., "base64")
For more details, please refer to secrets resolution
in config maps documentation page and secret_rs
library documentation.
Sandbox User-Defined Function file
Once the sandbox settings are defined, it is possible to write your custom function
in the index.js
file.
For more details on how the user-defined processor function can be written and it how works, please read the Usage page.
Control Plane Support
The service implements the interface for connecting towards a Control-Plane Operator.
However, complete Runtime Management support, that is with Control Plane UI
and a central Control Plane instance will be added in the future.
Recommended Kafka Configuration
When configuring Kafka consumer, it is advised to set appropriate values for constraining the consumer internal queue. In this manner:
- the maximum amount of memory employed by the service can be finely tuned to avoid wasting resources, since only the number of messages that can effectively be processed in real-time should be pulled in memory;
- it is ensured that consumer continuously poll the broker to avoid it exiting the consumer group, since a lower number of buffered messages can trigger a new fetch to replenish it;
The main values to tune are:
queued.max.messages.kbytes
: maximum number of kilobytes of queued pre-fetched messages in the local consumer queue;queued.min.messages
: minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue;
It is recommended to set queued.min.messages
to a value greater, but close to the
average message consumption rate. It is possible to observer:
kafka_consumer_rx_msgs_total
→ messages readsp_processed_messages
→ total number of processed messages
to check the average values.
For Stream Processor service, an example of configuration can be the following one:
{
"queued.max.messages.kbytes": "32840",
"queued.min.messages": "5000"
}
Another important property that might need to be tuned is fetch.message.max.bytes
,
which however should be changed only in case queued.max.messages.kbytes
is set to
a value lower than 1024
.
Kubernetes
Resources
When the plugin is deployed on Kubernetes, it is advised to set its resources requests and limits. Here are provided which are the recommended and minimum ones, although they can be changed according to your needs:
Recommended
- request:
CPU: 100m
Memory: 20MB - limits:
CPU: 800m/1000m
Memory: 100MB
Minimum
- request:
CPU: 50m
Memory: 15MB - limits:
CPU: 250m
Memory: 60MB
Memory usage also depends on the parameter described in the previous section. As a result, it is advised to adjust the requests and limits accordingly when changing consumer queue parameters.
Status Probes
The service exposes the liveness
and readines
status probes as HTTP endpoint, which
helps Kubernetes when the service is successfully started and when it may need to be restarted.
The endpoints are:
liveness
probe:/-/healthz
readiness
probe:/-/ready