Kafka Projection Updates Configuration
Overview
The Kafka Projection Updates is a JSON file which describes for each projection a Projection Updates topic and a Strategy to generate the projection changes identifier. This file is typically used by:
- the Real-Time Updater, to generate a
pr-update
message; - the Single View Trigger Generator, to execute a strategy starting from a
pr-update
message.
Configuration Properties
The Kafka Projection Updates is a JSON object where each key represents the name of a projection. Each entry has the following fields:
updatesTopic
: The topic where the Projection Updates messages are sent, if you haven't created a topic for the projection yet take a look to our naming conventions for the Projection Updates topics.strategy
: the type of strategies that you want to use onto this projection. You can choose either__automatic__
or__fromFile__
.
There will be different behaviors based on the strategy value. The JSON configuration can contain both automatic and manual strategies.
- Automatic
- Manual
{
"PROJECTION_NAME": {
"updatesTopic": "PROJECTION_UPDATE_TOPIC",
"strategy": "__automatic__"
}
}
The automatic strategy handles the generation of the projection changes identifier accordingly to the paths in the Projection Changes Schema config map containing the projection.
{
"PROJECTION_NAME": {
"updatesTopic": "PROJECTION_UPDATE_TOPIC",
"strategy": "__fromFile__[customFunction.js]"
}
}
The __fromFile__
keyword lets you specify a Javascript module inside the squared brackets, which will execute the a manual Strategy on the Projection Updates message.
The file
should be mountend into a specific config map of the micro-service. Please refer to Real-Time Updater and Single View Trigger Generator configuration pages.
This Javascript file should export a default async generator function with the following parameters:
strategyContext
: object made of two properties:logger
: Pino loggerdbMongo
: the configured MongoDB Database instance where the projections are stored
updateEvent
: The Update Event object created from the Projection Update message.
The function yields one or more identifier retrieved from the update event.
Here's a couple of examples example of what a custom strategy could look like:
Simple Custom Strategy
// note: this has to be an AsyncGenerator
module.exports = async function* myCustomStrategy ({ logger, dbMongo }, { after }) {
yield { IDENTIFIER_FIELD: after.FIELD }
}
Custom Strategy with 1:n lookup
module.exports = async function* myCustomStrategy ({ logger, dbMongo }, { after }) {
const lookupRecords = await dbMongo.collection('lookup-collection').find({ field: after.FIELD })
for await(const lookup of lookupRecords) {
yield { IDENTIFIER_FIELD: lookup.FIELD }
}
}
While we only extracted the after
property in the examples before, the whole object has actually the following structure:
{
operationType: string
operationTimestamp: string
before?: Document | null
after?: Document | null
primaryKeys: (string | number | symbol)[]
__internal__kafkaInfo: {
timestamp: string
topic: string
key: string
partition: number
offset: number
};
}
You might notice that it does not look quite like the incoming pr-update
message you might have,
this is because we transform the message into a unique format to represent the various versions of the message.
So if you switch your Real-time Updater (pr-update version 1.0) for a Projection Storer (pr-update version 2.0)
your manual strategy will still work exactly as it did before.