Skip to main content
Version: 12.x (Current)

Ingestion Reloader Configuration

This page describes how to configure the Ingestion Reloader service, that performs the "reingestion" of the messages (the publishing of old ingestion messages stored on the bucket), to a specified Kafka topic.

Overview

The reingestion is triggered by a POST route, and 3 types of reingestions are supported.

  • File Reingestion: where the user defines a specific file on the bucket to be reingested to the Kafka topic
  • Topic Reingestion where the user defines an entire topic to be reingested on the Kafka topic. It's also possible to specify a Date interval for the files' selection.

Service Configuration

In order to connect and authenticate correctly with the bucket and Kafka, please check the relative pages:

Environment variables

NameRequiredDescriptionDefault Value
HTTP_PORTfalsePort exposed by the service3000
LOG_LEVELfalseLog level used by the service (e.g. DEBUG, INFO, WARN, ERROR)INFO
QUARKUS_SHUTDOWN_TIMEOUTfalseTimeout to shutdown Quarkus application30
KAFKA_MAX_POLL_MSfalseMaximum amount of milliseconds a poll operation waits before returning obtained records500
KAFKA_BROKERStrueComma separated list of nodes address belonging to a Kafka cluster-
KAFKA_GROUP_IDtrueConsumer group identifier employed by this application to share how partitions are consumed among multiple instances of the application-
KAFKA_USERNAMEtrueThe Kafka username-
KAFKA_PASSWORDtrueThe Kafka password-
KAFKA_CLIENT_IDfalseClient identifier employed by this applicationingestion-reloader
KAFKA_SASL_MECHANISMfalseSASL mechanism to employ for logging in Kafka clusterSCRAM-SHA-256
KAFKA_DEQUEUE_STRATEGYfalseWhen no consumer group is defined on a topic, it defines which strategy should be applied to consume from the topic the first timelatest
KAFKA_MAX_POLL_RECORDSfalseDefines the maximum number of messages that each poll operation can return. Independently of this number, each poll operation can return at most a certain amount of bytes configured in the consumer500
BUCKET_NAMEtrueThe name of the bucket-
BUCKET_TYPEtrueThe type of the bucket. Can be google or s3-
KEY_PARTITIONS_RELATIVE_PATHtrueThe relative path where the refined by key files are stored in the bucket.-
BUCKET_SPLITStrueThe number of possible partition splits that the bucket has been configured with. An example could be 5000-
GOOGLE_APPLICATION_CREDENTIALSfalseThe path to the credentials file that allows the access to the Google bucket. Required if BUCKET_TYPE is set to Google-
CONFIGURATIONS_PATHfalseDefines the base path of the configurations files-

Routes

Openapi file of the Ingestion Reloader

openapi: 3.0.3
info:
title: Ingestion Reloader APIs
description: This document provides an overview of the different APIs exposed by Ingestion Reloader service
version: 1.1.0
paths:
/reingestion/topic:
post:
tags:
- Raw File Re-Ingestion
description: This API allows to reload messages regarding a specific ingestion topic that were saved on the bucket into another selected topic. When no time range bound is provided, all the messages of selected ingestion topic are produced to the re-ingestion topic.
requestBody:
content:
application/json:
schema:
type: object
properties:
topic:
type: string
description: name of the ingestion topic from which messages were read to be stored on the bucket
reIngestionTopic:
type: string
description: name of the topic where messages should be produced to
startDate:
type: string
format: date-time
description: 'timestamp that defines the lower bound (oldest) on the time interval from which messages should be reloaded. <br>**Note:** the timestamp refers to when messages were produced on Kafka ingestion topic, which may differ from the business timestamp of the record itself'
stopDate:
type: string
format: date-time
description: 'timestamp that defines the upper bound (newest) on the time interval from which messages should be reloaded. <br>**Note:** the timestamp refers to when messages were produced on Kafka ingestion topic, which may differ from the business timestamp of the record itself'
customSelectorName:
type: string
description: 'name of the custom selection script to be used in the filtering process. `CustomSelectorName` is the name of the file present in $CONFIGURATIONS_PATH/selectors/ without the `.kts` extension'
required:
- topic
- reIngestionTopic
additionalProperties: false
responses:
202:
description: OK
content:
application/json:
schema:
type: object
properties:
message:
type: string
code:
type: string
400:
description: OK
content:
application/json:
schema:
type: object
properties:
message:
type: string
code:
type: string
/reingestion/file:
post:
tags:
- Raw File Re-Ingestion
description: This API allows to reload messages saved within a specific file stored on the bucket into selected re-ingestion topic.
requestBody:
content:
application/json:
schema:
type: object
properties:
fileName:
type: string
description: full file-path to the file that contains the messages to be produced into the re-ingestion topic
reIngestionTopic:
type: string
description: name of the topic where messages should be produced to
customSelectorName:
type: string
description: 'name of the custom selection script to be used in the filtering process. `CustomSelectorName` is the name of the file present in $CONFIGURATIONS_PATH/selectors/ without the `.kts` extension'
required:
- fileName
- reIngestionTopic
additionalProperties: false
responses:
202:
description: OK
content:
application/json:
schema:
type: object
properties:
message:
type: string
code:
type: string
400:
description: OK
content:
application/json:
schema:
type: object
properties:
message:
type: string
code:
type: string
/reingestion/stop-reingestion:
post:
tags:
- Raw File Re-Ingestion
description: This API allows to stop a reloading process that is currently progress, based on the selected ingestion topic name. This feature can stop only the reingestion by topic process.
requestBody:
content:
application/json:
schema:
type: object
properties:
topic:
type: string
description: ingestion topic name that is currently being reloaded by the service
required:
- topic
additionalProperties: false
responses:
200:
description: OK
content:
application/json:
schema:
type: object
properties:
message:
type: string
code:
type: string
400:
description: OK
content:
application/json:
schema:
type: object
properties:
message:
type: string
code:
type: string

The Ingestion Reloader exposes four POST routes to trigger its functionalities.

Reingestion of a Topic

This route allows the user to reingest a topic partially or from the beginning. The user has to specify the "stored" topic name and the "reingestion" topic name. Optionally, a filter on the timestamp of the topic may also be specified (with the startDate and stopDate properties). In this last case, only the records included in the specified time interval will be reingested on the selected topic. Additionally, the user can specify a customSelectorName to use a custom selector script for filtering the messages to be reingested.

Reingestion of a File

This route allows the user to reingest a single file from the bucket into the topic specified in the request. The user has to specify the file name and the reingestion topic. Additionally, the user can specify a customSelectorName to use a custom selector script for filtering the messages to be reingested.

Stopping reingestion

This route allows the user to stop the ongoing reingestion of a topic. The user has to specify the ingestion topic that is currently being reingested.

caution

This route can stop only the reingestion of a topic, not the one of a single file.

Please refer to the Custom Selector Feature section for more details on how to use the custom selector feature for precise and targeted filtering of messages to be reingested.

Customer Selectors

The customSelector feature of the Ingestion Reloader service is a tool that provides additional flexibility when re-ingesting messages from a bucket into a Kafka topic. This feature allows you to implement your own custom filtering logic to select specific messages for re-ingestion.

Let's consider a case where an organization has a Kafka topic that contains messages from different departments, each identified by a department_id in the message payload. Suppose there's a requirement to re-ingest only the messages that pertain to a specific department, say the Marketing department, into a new topic.

The custom selector feature would be ideal for this. The organization can create a custom selector script that checks if the department_id in the message payload is equal to the id of the Marketing department. This script will return true for all messages from the "Marketing" department and false for messages from other departments. When the re-ingestion request is made, only the messages from the Marketing department would be selected for re-ingestion into the new Kafka topic.

The following is an example of a .kts custom selector script for this scenario:

import kotlinx.serialization.json.JsonElement

class MarketingSelector {
fun check(payload: JsonElement): Boolean {
return payload.jsonObject["department_id"]?.jsonPrimitive?.content == "marketing"
}
}

MarketingSelector()
You can trigger the re-ingestion with a cURL command:

Here's an example of a cURL command to trigger the re-ingestion process using the custom selector script:

curl -X POST "http://<your_address>:<your_port>/reingestion/topic" \
-H "accept: */*" \
-H "Content-Type: application/json" \
-d "{
\"topic\":\"source_topic\",
\"reIngestionTopic\":\"destination_topic\",
\"customSelectorName\":\"CustomSelector\"
}"

How to Use the Custom Selector Feature

To utilize the customSelector feature, users can define their own Kotlin scripting logic that adheres to Custom Selector Script Format.

When making a POST request to either the /reingestion/file or /reingestion/topic endpoint, users can include the customSelectorName field in the JSON body to specify the name of the custom selector script to be used for filtering.
customSelectorName should correspond to the name of the custom selector file without the .kts extension, an in-depth detail of the configuration can be found here.

During the re-ingestion process, each message retrieved from the bucket is evaluated against selected custom selector script.
If the check function returns true for a particular message, it is selected for re-ingestion into the Kafka topic.
If the check function returns false, the message is excluded from the re-ingestion process.

Custom Selector Script Format

The custom selector script must adhere to a specific format:

  • The script must define a class that includes a public check method with the following characteristics:
    • It must accept only one parameter of type JsonElement.
    • It must return a Boolean.
  • The script must return an instance of the class itself.

The Ingestion Reloader service performs validation on the custom selector script.

For an illustrative example, please refer to the following section.

Directory Structure and ConfigMaps for Custom Selector

The custom selector feature requires a specific directory structure to function correctly.
The CONFIGURATIONS_PATH environment variable should point to a directory that contains a subdirectory named /selectors.
This /selectors subdirectory should contain all the custom selector scripts in .kts format.

note

Every custom selector script is fetched and loaded at startup

For example, if CONFIGURATIONS_PATH is set to /app/config, the directory structure should look like this:

/app
/config
/selectors
customSelector1.kts
customSelector2.kts
...

Each .kts file in the /selectors subdirectory represents a custom selector script. In a Kubernetes environment, you can use ConfigMaps to manage and provide the custom selector scripts. You can create a ConfigMap that contains all your .kts files and mount it to the pod running the Ingestion Reloader service. The mount path for the ConfigMap should be the CONFIGURATIONS_PATH directory. This way, the Ingestion Reloader service can load the custom selector scripts at startup from the specified directory.

This approach allows for easy updates to the custom selector scripts. If you need to update a script or add a new one, you can simply update the ConfigMap and restart the pods. The updated or new scripts will be available to the Ingestion Reloader service at the next startup.

Custom Selector Script Example

Here's an example of a .kts custom selector script:

import kotlinx.serialization.json.JsonElement

class CustomSelector {
fun check(payload: JsonElement): Boolean {
// Custom filtering logic goes here
return payload.jsonObject["property"]?.jsonPrimitive?.content == "value"
}
}

// Return the instance
CustomSelector()

In this example, the script defines a class named CustomSelector with a single function check.
The check function accepts a JsonElement parameter named payload, which represents the message payload retrieved from the bucket.
The function performs custom filtering logic on the payload, checking if a specific property (property) has a certain value (value in this case).
If the condition is met, the function returns true; otherwise, it returns false.

Please note that this is just an example, and you can customize the check function in the script to implement your own custom filtering logic based on the requirements of your re-ingestion process.