Skip to main content
Version: 10.9.x

Ingestion Storer Configuration

This page describes how to configure the Ingestion Storer service which is responsible for consuming messages from a message streaming platform and storing them into a Cloud Storage Bucket.

Overview

The Ingestion Storer service reads the ingestion messages from the chosen message streaming platform. Then those messages are grouped by topic name and partition so they can later be saved in the bucket. This way each stored file contains only messages coming from a single partition, so that multiple service replicas do not interfere with each other. Moreover, this saving logic simplifies potential messages reorganization.

Each time a file is written in the bucket a corresponding output event is emitted on a dedicated topic to notify a write operation has been completed successfully. Optionally, it re-emits ingested messages into post-ingestion topics.

Considering Ingestion Storer service functionalities, it can be introduced within an event-driven architecture in different ways, such as:

  • sequentially to other services, to store messages on a bucket so that downstream components can proceed reading ingestion messages only when those records are effectively written to the bucket
  • in parallel to other services, to store messages transparently with respect to other message processing

A concrete example of these two architecture can be observed within Fast Data context, where the Real-Time Updater service can read message either from post-ingestion topics fed by the Ingestion Storer (when implementing a sequential architecture) or directly from ingestion topics alongside Ingestion Storer (when implementing a parallel architecture). In the latter case, post-ingestion messages generation may be disabled since no consumer would read those messages.

Service Configuration

In order to connect and authenticate correctly with the bucket and Kafka, please check the relative pages:

Environment variables

NameRequiredDescriptionDefault Value
HTTP_PORTfalsePort exposed by the service3000
LOG_LEVELfalseLog level used by the serviceINFO
QUARKUS_SHUTDOWN_TIMEOUTfalseTimeout to shutdown Quarkus application30
KAFKA_MAX_POLL_MSfalseMaximum amount of milliseconds a poll operation waits before returning obtained records500
KAFKA_BROKERStrueComma separated list of nodes address belonging to a Kafka cluster-
KAFKA_GROUP_IDtrueConsumer group identifier employed by this application to share how partitions are consumed among multiple instances of the application-
KAFKA_USERNAMEtrueThe Kafka username-
KAFKA_PASSWORDtrueThe Kafka password-
KAFKA_CLIENT_IDfalseClient identifier employed by this applicationingestion-reloader
KAFKA_SASL_MECHANISMfalseSASL mechanism to employ for logging in Kafka clusterSCRAM-SHA-256
KAFKA_DEQUEUE_STRATEGYfalseWhen no consumer group is defined on a topic, it defines which strategy should be applied to consume from the topic the first timelatest
KAFKA_MAX_POLL_RECORDSfalseDefines the maximum number of messages that each poll operation can return. Independently of this number, each poll operation can return at most a certain amount of bytes configured in the consumer500
BUCKET_NAMEtrueThe name of the bucket-
BUCKET_TYPEtrueThe type of the bucket. Can be google or s3-
BSS_EVENTS_TOPICtrueTopic where the service should produce messages notifying that a file has been written to the bucket-
BSS_ENABLE_POST_INGESTIONfalseSelect whether ingestion messages should be re-published towards the post-ingestion topicstrue
GOOGLE_APPLICATION_CREDENTIALSfalseThe path to the credentials file that allows the access to the Google bucket. Required if BUCKET_TYPE is set to Google-

Service Configuration

When the application is built, the main configuration file is included in it. It is designed so that most configurable values can be customized through environment variables. However, custom service configuration, such as the mapping between the ingestion topics and the post-ingestion ones, can and should be customized by end user. This can be achieved by providing an additional application.yaml file in the configs folder located aside the application launcher file.

Example of folder structure:

|
|--app
|--application-launcher
|--configs
|-- application.yaml

This config allows to define the mapping between each ingestion topic and its corresponding post-ingestion topics, which can be one or more. The service then subscribes to all the ingestion topics provided in the map and forwards the incoming messages (in case it is enabled) towards the post-ingestion ones.

bss:
topics-config:
data-topics-mapping:
- ingestion: <input-topic-name-1>
post-ingestion:
- <output-topic-name-1>
- ingestion: <input-topic-name-2>
post-ingestion:
- <output-topic-name-2>

File format

The service can subscribe to a number of different ingestion topics. For each topic, a folder on the bucket is created to contain all the files related to the same ingestion topic. Each created file is composed by messages coming from a single partition of that ingestion topic.
Within the file, messages are saved one by line adopting the following JSON format:

{
"timestamp": {
"type" : "string",
"format": "datetime",
"description": "timestamp when the message was received on the cluster"
},
"partition": {
"type" : "number",
"description": "partition identifier containing this message"
},
"offset": {
"type" : "number",
"description": "offset of the message"
},
"key": {
"type" : "string",
"description": "message key encoded as JSON string"
},
"payload": {
"type" : "string",
"description": "message payload encoded as JSON string"
}
}

File naming

Each file is named after the data it contains based on the following convention:

<iso-timestamp-first-message-in-batch>_<topic-name>_<partition-number>_<timestamp-message-was-consumed>.txt

For example, this is a generated filename:

2022-11-14T13:26:37.728Z_fd-bucket-storage.DEV.restaurant.orders.ingestion_0_1668432398047.txt

Event Format

Once the service terminates processing a set of messages coming from a single partition of ingestion topics, it produces a new message towards the output events Kafka topic (see environment variable BSS_EVENTS_TOPIC) to notify downstream components of such action. The produced event employs the structure reported below:

Message Key

{
"topic": {
"type": "string",
"description": "topic name from which messages where consumed"
},
"partition": {
"type": "number",
"description": "partition identifier from which messages where consumed"
}
}

Message Payload

{
"filePath": {
"type": "string",
"description": "full filepath to retrieve saved file from bucket"
},
"batchTimeStart": {
"type": "string",
"format": "datetime",
"description": "timestamp of the first message contained in the batch"
},
"batchTimeEnd": {
"type": "string",
"format": "datetime",
"description": "timestamp of the last message contained in the batch"
},
"batchOffsetStart": {
"type": "number",
"description": "offset of the first message contained in the batch"
},
"batchOffsetEnd": {
"type": "number",
"description": "offset of the last message contained in the batch"
},
"batchSize": {
"type": "number",
"description": "number of messages contained in the stored file"
}
}