Projection Storer Configuration
Projection Storer is the service in charge of constantly keeping up-to-date projections records with respect to change events received from the associated System of Record. Once modifications are saved to the storage system, the service triggers downstream components with the proper mechanism, so that Single Views can be regenerated with the latest data.
This service partially overlap with the concerns of Real-Time Updater plugin, in particular converting in near real-time change events into projections records. Nonetheless, it has been designed to offer a more streamlined configuration experience, improved performances and higher reliability.
For an overview of which are Real-Time Updater's features, it is possible to read the introduction documentation here, whereas here can be found a brief introduction to Projection Storer service.
Projection Storer plugin does not support the Fast Data standard architecture. However, it supports all the others architecture where Projection Record Updates messages are emitted by the service as triggers for the Fast Data downstream components. This means that, in order to use the Projection Storer, it will be necessary to instantiate a Single View Trigger Generator plugin to actually trigger the generation of Single Views.
Environment variables
Name | Required | Description | Default |
---|---|---|---|
LOG_LEVEL | ✓ | defines the service logger level, which can be TRACE , DEBUG , INFO , WARN or ERROR | - |
HTTP_PORT | - | defines the service HTTP port where status and metrics endpoints are exposed | 3000 |
CONFIG_FILE_PATH | - | defines the file path where the service configuration is found (it can either be a json or yaml file) | conf/config.json |
Attach to System of Record
To evaluate data from external CDC, the Projections included in the System of Record must be attached to one or more Real-Time Updater or Projection Storer. Services must be created in advance and they can be attached moving to the Services tab of the selected System of Record.
Please remember that after attaching a Projection Storer to the Systems of Record, you must select the projections the service should evaluate to ensure the service updates those projections. To do that, you can use the table in the Projections attached to services section to search the projection and attach it to a specific service. Otherwise, you can access the service configuration page by clicking on the button next to the service name and configure the list of projections from there.
Additionally, note that each projection can be evaluated by only one service.
A click on the "Edit configuration" button in the row of the Projection Storer will lead to a page where the user can configure the Projection Storer.
The page will contain the following configurations:
- the Projections to be managed by the Projection Storer
- a check to enable the Soft Delete (in case of a delete message received from CDC, the projection will be updated with the
__STATE__
set toDELETED
) - a code editor to configure the Consumer configuration of the service
- a code editor to configure the Producer configuration of the service
- a code editor to configure the Storage configuration of the service
All these configurations, after executing a commit to save all the modifications, will automatically generate the Configuration File that will be saved as a config map of the service. This file will be in read-only mode and updated at any change in the service or the System of Record.
Configuration File
In the following sections, each configuration file section is described. It is important to note that Fast Data configurator will provide the option to configure the service without manually writing the file.
Service Settings
Here are described the parameters intrinsic to the service that specify its behaviors. An example of service configuration can be the following one:
"settings": {
"systemOfRecords": "inventory",
"enableSoftDelete": true,
"dataSourceAdapter": {
"type": "debezium"
},
"castFunctions": {
"mapToAddressType": "/app/extensions/mapToAddressType.kts",
}
}
System of Record
Property | Type | Required | Default |
---|---|---|---|
systemOfRecord | string | - |
It is the data source identifier (a name) describing from which system the service is importing data.
Deletion Policy
Property | Type | Required | Default |
---|---|---|---|
enableSoftDelete | boolean | - | true |
Records on the origin system may get deleted and consequently a change event requesting the removal of a document from a projection
is produced. When the service reads this event, it is possible to define which behavior is adopted in processing the delete operation,
depending on your needs.
Indeed, the service can be configured to apply a soft delete policy, meaning that projections records are not delete from the storage system, but
they are marked as DELETED
, so that downstream components would not read those records when aggregating the Single View records.
On the contrary, the service can also adopt a hard delete policy, where there projection record is removed immediately from the storage system.
This flag determines whether the soft delete policy is enabled, which by default it is.
Message Adapter
Property | Type | Required | Default |
---|---|---|---|
dataSourceAdapter | object | - | { "type": "db2" } |
When consuming change events from ingestion topics, the Projection Storer needs to know how to parse them. For this reason it is provided a convenient manner to select the message adapter. Out of the box the service supports message formats employed by some Change Data Capture systems (CDC), which are:
db2
, based on the IBM InfoSphere Data Replication for DB2 CDC"dataSourceAdapter": { "type": "db2" }
golden-gate
, based on the Oracle GoldenGate CDC"dataSourceAdapter": { "type": "golden-gate" }
debezium
, based on the Debezium CDC"dataSourceAdapter": { "type": "debezium" }
In addition to the already existing message adapters, it is also possible to provide to the service a user-defined function
that acts as message adapter. This function takes as input the incoming message associated to a projection, the list of
primary key fields expected for that projection and the service logger. Its goal is to process the message and produce
an object containing the description of the parsed message in a common format.
Thus, besides defining the usage of custom
message adapter type, in the configuration it is also necessary to provide the location of the file
containing the custom message adapter function. For example:
"dataSourceAdapter": {
"type": "custom"
"filepath": "/app/extensions/messageAdapter.kt"
}
It is important to remember to mount the custom message adapter within the plugin instance to the same location that it is defined
in the dataSourceAdapter
configuration.
Here is described the interface of the custom message adapter function:
Adapter Input
message
→ a map representing the ingestion message. It contains the following properties:key
:Buffer
→ a string representing the key of the incoming message from the ingestion topic. It may be parsed as JSON object if needed
value
:Buffer
→ the raw value that contains the payload of the incoming message from the ingestion topic. It can be converted to string and parsed as JSON object
primaryKeys
→ the list of field names that compose the primary key identifier for that specific projectionlogger
→ service logger instance which exports leveled output functions (e.g.info()
,debug()
, ...)
{
"key": "{\"warehouseId\":\"W0A113\",\"productId\":\"P219345\"}",
"value": Buffer.from([123,34,119,97,114,101,104,111,117,115,101,73,100,34,58,34,87,48,65,49,51,51,34,44,34,112,114,111,100,117,99,116,73,100,34,58,34,80,50,49,57,51,52,53,34,44,34,113,117,97,110,116,105,116,121,34,58,50,48,48,44,34,110,101,101,100,82,101,112,108,97,99,101,109,101,110,116,34,58,102,97,108,115,101,125])
}
Adapter Output
operation
→ the identifier of the operation applied on the record, which can beI
(insertion),U
(update) orD
(deletion)key
→ an object/map that contains the primary key fields for the record matching their valuebefore
→ an object/map ornull
, which represents the record before the last change occurredafter
→ an object/map ornull
, which represents the record obtained after applying the change that triggered the ingestion event
{
"operation": "I",
"key": {
"warehouseId": "W0A113",
"productId": "P219345"
},
"before": null,
"after": {
"warehouseId": "W0A113",
"productId": "P219345",
"quantity": 200,
"needReplacement": false
}
}
Taking into account the above details, it is possible to implement user-defined functions in Javascript.
Custom Message Adapter Function (messageAdapter.js)
'use strict'
/* Custom Ingestion Message (this is just an example of how logic can be customized)
{
"data": { ... },
"delete": false
}
*/
// NOTE: the message adapter entry point function must be named `messageAdapter`
function messageAdapter(message, primaryKeys, logger) {
const { value } = message
// in this adapter it is expected that payload is always set (never empty nor null)
const payload = JSON.parse(value.toString())
const key = extractKey(payload?.data, primaryKeys)
logger.trace(JSON.stringify(key))
if (payload.delete) {
return {
operation: 'D',
key,
before: payload?.data,
after: undefined
}
}
return {
operation: 'I',
key,
before: undefined,
after: payload?.data
}
}
function extractKey(obj, wantedKeys) {
return Object.fromEntries(
wantedKeys
.filter(keyEntry => obj[keyEntry] !== undefined)
.map(keyEntry => [keyEntry, obj[keyEntry]])
)
}
// the following code allows to use the same custom function
// in both the Projection Storer and the Real-Time Updater
try {
// export function for Real-Time Updater
module.exports = messageAdapter
} catch (error) {
// ignore error when importing the custom function in the Projection Storer
// since it exploits the function name
}
Custom Message Adapter Function (with empty payload management - messageAdapter.js)
'use strict'
/* Custom Ingestion Message -> db2 like */
// NOTE: the message adapter entry point function must be named `messageAdapter`
function messageAdapter(message, primaryKeys, logger) {
const { value, key: keyAsString } = message
const keyObject = JSON.parse(keyAsString)
// please notice that value is Buffer - to obtain its length,
// the length function should be employed. In addition, to ensure
// compatibility also with Real-Time Updater plugin, the following check has been added
const valueLength = (typeof value?.length === "function")
? value?.length()
: value?.length
// in this adapter it may happen that payload is either empty or null
const payload = (value && valueLength > 0)
? JSON.parse(value.toString())
: null
logger.trace('received message in custom message adapter')
return {
operation: payload ? 'U' : 'D',
key: keyObject,
before: null,
after: payload
}
}
// the following code allows to use the same custom function
// in both the Projection Storer and the Real-Time Updater
try {
// export function for Real-Time Updater
module.exports = messageAdapter
} catch (error) {
// ignore error when importing the custom function in the Projection Storer
// since it exploits the function name
}
Within the custom message adapter script file it is possible to define multiple functions. However, it is mandatory
to define a function named messageAdapter
, which will be treated as the entry point for the custom message adapter.
Moreover, to support both Projection Storer and Real-Time Updater services, the module.export
instruction must be
added, but encapsulated within a try/catch block as shown below:
try {
// export function for Real-Time Updater
module.exports = messageAdapter
} catch (error) {
// ignore error when importing the custom function in the Projection Storer
// since it exploits the function name
}
Cast Functions and Additional Cast Functions
Property | Type | Required | Default |
---|---|---|---|
castFunctions | object | - |
Projection Storer service allows to perform basic transformation logic on each field of projection records before writing them onto the storage system. By default, it offers a set of predefined functions that convert a projection record field from one type into another. For example, it allows to convert a string containing a number into an integer. Below it is shown the list of existing functions:
identity
→ applies the identity function (no change occurs)castToString
→ convert the input value into a stringcastToInteger
→ convert the input value into an integer numbercastToFloat
→ convert the input value into a decimal numbercastUnixTimestampToISOString
→ convert the input value from a Unix timestamp (e.g.1695141357284
) to the same timestamp in ISO 8601 format (e.g.2023-09-19T16:35:57.284Z
)castStringToBoolean
→ convert the string valuetrue
andfalse
to their corresponding boolean valuecastToDate
→ convert a string or a number into a Date objectcastToObject
→ parse a JSON object represented as string into a JSON objectcastToArrayOfObject
→ parse a JSON array represented as string into a JSON array
Whenever these functions do not cover a particular use case, it is possible to configure additional user-defined functions as custom cast functions. These cast functions can be implemented in Javascript, each of them written in their own file. When the files containing the user-defined functions are loaded, the service will search within them for a function named as the key name in the configuration. The function with such name must exist otherwise the service will encounter a processing error.
The castFunctions
property is then configurable as follows: it should be a mapping between names of custom cast functions
and the path on the service where to find the file containing its implementation.
Here it is shown a possible example of configuring two custom cast functions:
"castFunctions": {
"mapToAddressType": "/app/extensions/mapToAddressType.js",
"castToTitleCase": "/app/extensions/castToTitleCase.js"
}
It is important to remember to mount each custom cast function or the folder containing them within the plugin instance.
Then in the castFunctions
property they should be properly named and set the correct location to each file containing
the function's implementation.
Considering the implementation of these cast functions, they expect as input two parameters, that are the value the field
to be transformed and the field name represented as string
. The output of the cast functions should be a single value
in the type expected by the data model for that specific field on which the cast function is applied.
Below you can find several examples about implementation of cast functions.
Custom Cast Function (mapToAddressType.js)
const addressMapping = {
1: "SHIPPING",
2: "BILLING",
3: "LIVING"
};
// NOTE: the name of the function must correspond to
// the key associated to the file containing it
function mapToAddressType(value, fieldName) {
if (typeof value === 'string') {
return addressMapping[parseInt(value)];
} else if (typeof value === 'number') {
return addressMapping[value];
} else {
// NOTE: a basic logger can be accessed via internal binding
logger.debug(`not an address type code: ${value} - fieldName: ${fieldName}`);
return null;
}
}
// the following code allows to use the same custom function
// in both the Projection Storer and the Real-Time Updater
try {
// export function for Real-Time Updater
module.exports = mapToAddressType
} catch(error) {
// ignore error when importing the custom function in the Projection Storer
// since it exploits the function name
}
Custom Cast Function (castToTitleCase.js)
'use strict'
// NOTE: the name of the function must correspond to
// the key associated to the file containing it
function castToTitleCase(value, fieldName) {
const str = value.toString()
// NOTE: a basic logger can be accesses via internal binding
logger.debug(`incoming value: ${value}`)
return str[0].toUpperCase() + str.slice(1).toLowerCase()
}
// the following code allows to use the same custom function
// in both the Projection Storer and the Real-Time Updater
try {
// export function for Real-Time Updater
module.exports = castToTitleCase
} catch(error) {
// ignore error when importing the custom function in the Projection Storer
// since it exploits the function name
}
For those migrating custom cast functions from the ones employed in the Real-Time Updater, please bear in mind that
the logger
parameter is not provided anymore in the function signature, and therefore it should be removed.
For debugging purposes, a logger is still provided via internal bindings, but it is recommended to not log
within cast functions at levels higher than debug
.
Consumer
Property | Type | Required | Default |
---|---|---|---|
type | string | ✓ | kafka |
configuration | object | ✓ |
Describe which type of consumer and its configuration properties the service should employ to read ingestion messages as input events. Currently only Kafka (and platforms adopting Kafka APIs) is supported as consumer.
Kafka Configuration
If you select the Kafka Consumer, the service will create a Kafka Client to consume messages from the ingestion topics you setup.
When Kafka is selected as consumer for the Projection Storer service, it is possible to provide most of the Kafka Consumer properties that are defined in the Apache Kafka documentation.
This is an example of consumer configuration when kafka
is selected as type:
"consumer": {
"type": "kafka",
"configuration": {
"client.id": "galaxy.fast-data.DEV.inventory-projection-storer-consumer",
"bootstrap.servers": "localhost:9092",
"group.id": "galaxy.fast-data.DEV.inventory-projection-storer",
"auto.offset.reset": "latest",
"max.poll.records": 2000,
"max.poll.timeout.ms": 500
}
}
The consumer property max.poll.timeout.ms
is an ad-hoc property, which does not belong to the set of Kafka Consumer properties.
In fact, it is employed by the service to set the maximum number of milliseconds the consumers waits for a poll operation
before returning any event (in case max.poll.records
or message.max.bytes
are not reached earlier than the configured timeout).
The following Kafka Consumer properties cannot be customized by the user, since are managed by the service:
enable.auto.commit
key.deserializer
value.deserializer
Producer
Property | Type | Required | Default |
---|---|---|---|
type | string | ✓ | kafka |
configuration | object | ✓ |
Describe which type of producer and its configuration properties the service should employ to trigger projection update as output events. Currently only Kafka (and platforms adopting Kafka APIs) is supported as producer.
Kafka Configuration
If you select the Kafka Producer, the service will create a Kafka Client to produce messages into the topics related to the PR Updates you setup.
When Kafka is selected as producer for the Projection Storer service, it is possible to provide most of the Kafka Producer properties that are defined in the Apache Kafka documentation.
This is an example of producer configuration when kafka
is selected as type:
"producer": {
"type": "kafka",
"configuration": {
"client.id": "galaxy.fast-data.DEV.inventory-projection-storer-producer",
"bootstrap.servers": "localhost:9092"
}
}
The following Kafka Producer properties cannot be customized by the user, since are managed by the service:
acks
enable.idempotence
key.deserializer
value.deserializer
Storage
Property | Type | Required | Default |
---|---|---|---|
type | string | ✓ | mongodb |
configuration | object | ✓ |
Describe which type of storage system is employed by the service for writing the projections records and which are its configuration properties. Currently only MongoDB is supported as storage system.
MongoDB Configuration
When MongoDB is selected as a storage system for the Projection Storer service, it requires the connections string and the name of the database the service will connect to. The database name is not necessary in case it is already specified in the connection string, although it is recommended to set it in case the connection string is shared with other services.
This is an example of storage configuration when mongodb
is selected as type:
"storage": {
"type": "mongodb",
"configuration": {
"url": "mongodb://localhost:27017/fast-data-inventory-local",
"database": "fast-data-inventory-local"
}
}
Projections Config
This section of the configuration provides all the details related to each projection associated to an instance of the Projection Storer service. The content of this property is mapping between projection names and their configuration the input and output specification together with the mapping configuration that instructs the service on how to transform each ingestion event into a projection record and where to store it.
When using Mia-Platform Console to configure the service, the configuration shown in this section are managed by the UI, so that interacting with the list of projections and their fields in the Systems of Record section will automatically generate the needed configuration for the Projection Storer service.
Topics
Property | Type | Required | Default |
---|---|---|---|
ingestion | object | ✓ | |
prUpdate | object | ✓ |
In this section are specified for each projection their input channel (ingestion), from which change events on the source system (System of Record) will be read, and the output channel (prUpdate), where update notifications will be emitted to trigger Fast Data downstream components.
Ingestion
{
"ingestion": {
"name": "galaxy.fast-data.DEV.orders.ingestion"
}
}
Projection Record Update
{
"prUpdate": {
"name": "galaxy.fast-data.DEV.customers.pr-update"
}
}
Custom Storage Namespace
Property | Type | Required | Default |
---|---|---|---|
customStorageNamespace | string | - | <projection-name> |
Represents the name employed on the storage system for knowing where to store the records of this projection. By default, this value corresponds to the name of the projection itself.
For example, when using MongoDB as storage system, this field allows to define a custom name for the collection where the records of this projection will be saved.
Primary Keys
Property | Type | Required | Default |
---|---|---|---|
primaryKeys | string[] | ✓ |
It is the list of fields names that uniquely identify a record for this projection. The names contained in this list are relative
to the fields of the source record, which are the ones contained in the ingestion event.
There should always be at least one name in this list, so that it is possible to uniquely connect records of different projections among them.
Moreover, the projection key allows Fast Data system to ensure events ordering, so that changes over the same record are processed in
the order they occur.
Fields Mapping
Property | Type | Required | Default |
---|---|---|---|
fieldsMapping | object | ✓ |
This projection configuration property describes which fields of in the incoming record should be extracted and stored. Indeed, not all
the fields of those documents coming from the System of Record may be necessary to construct the projection. From this,
here it is applied a "projection" (filter) operation on the names of the record fields.
For each of these fields of interest of this projection it is necessary to configure the following two settings:
targetField
→ the name for this specific field to be employed when storing the projection record on the storage system. It can potentially be different from the original name, although by default the configuration system tend to match the name from the incoming document with the one saved on the projections storage system to avoid possible confusion.castFunction
→ the identifier of the function to be applied on the value of this field. For a reference of possible values that this property can get, please refer to cast functions section.
Since targetField
property may lead to a renaming of a property on the projection record, it is important
to notice that configuration in primaryKeys
property described above must contain the names of the fields
that are found on the ingestion event document, not the ones saved in the storage system.
The service will then properly forward the updated list when it emits the corresponding projection-record update events.
Considering all the settings explained above, here is displayed a configuration for a projection named
pr_products
. It has its own ingestion and prUpdate topic, its records will be stored on the storage system
under the namespace products
and its fields will maintain the same naming with their value transformed according to
defined cast function.
"projections": {
"pr_products": {
"topics": {
"ingestion": {
"name": "galaxy.fast-data.DEV.products.ingestion"
},
"prUpdate": {
"name": "galaxy.fast-data.DEV.products.pr-update"
}
},
"customStorageNamespace": "products",
"primaryKeys": [
"id"
],
"fieldsMapping": {
"id": {
"targetField": "id",
"castFunction": "identity"
},
"name": {
"targetField": "name",
"castFunction": "castToString"
},
"description": {
"targetField": "description",
"castFunction": "castToString"
},
"weight": {
"targetField": "weight",
"castFunction": "castToFloat"
},
"material": {
"targetField": "material",
"castFunction": "castToString"
},
"price": {
"targetField": "price",
"castFunction": "castToFloat"
}
}
}
}
Runtime Management Config
Starting from version v1.1.0
of Projection Storer service it is possible to instruct it to pause
and resume
the
consumption of change events from ingestion topics. This feature is enabled by specifying the Control Plane configuration
in the dedicated section of the configuration page.
Adding the Control Plane configuration to the service enables it to communicate with an instance of Fast Data Control Plane.
This connection is then employed to establish which status each service ingestion component should apply (either pause
or resume
) and to
provide the corresponding feedback back to the Control Plane service.
By design, every service interacting with the Control Plane starts up in a paused
state, unless the Control Plane
has already resumed the data stream before.
Therefore, when the Projection Storer starts up, consumption from ingestion topics will not start automatically.
In this case, you just need to send a resume
command
to one of the projections managed by the Projection Storer.
In the next paragraphs are described the fields that compose the Projection Storer Control Plane configuration, which at
the top level in the configuration file is identified by the controlPlane
property.
State
This field describe which communication protocol is employed for receiving the Fast Data state from the Fast Data Control Plane instance. This can happen directly via GRPC or indirectly via a topic on a Kafka broker. Here are detailed the fields to be configured depending on the selected communication protocol.
GRPC
Property | Type | Required | Default |
---|---|---|---|
type | string | ✓ |
When type
is set to grpc
then no further configuration under this field is needed by the service.
GRPC communication protocol support is available since version v1.1.1
of Projection Storer
Kafka
Property | Type | Required | Default |
---|---|---|---|
type | string | ✓ | |
channel | string | ✓ | |
configuration | object | ✓ |
When type
is set to kafka
then it is necessary to specify from which channel
, that is the Kafka topic,
Fast Data state events should be read and the Kafka configuration properties to be passed to the Kafka Consumer.
In order to guarantee global order on Fast Data states emitted by Fast Data Control Plane instance, configured topic must have a single partition, from which all the Fast Data services will read the current state to be applied.
The following Kafka Consumer properties cannot be customized by the user, since are managed by the service:
key.deserializer
value.deserializer
Feedback
This field describe which communication protocol is employed for sending a feedback as heartbeat to the Fast Data Control Plane instance. This can happen directly via GRPC or indirectly via a topic on a Kafka broker. Here are detailed the fields to be configured depending on the selected communication protocol.
GRPC
Property | Type | Required | Default |
---|---|---|---|
type | string | ✓ |
When type
is set to grpc
then no further configuration under this field is needed by the service.
GRPC communication protocol support is available since version v1.1.1
of Projection Storer
Kafka
Property | Type | Required | Default |
---|---|---|---|
type | string | ✓ | |
channel | string | ✓ | |
configuration | object | ✓ |
When type
is set to kafka
then it is necessary to specify on which channel
, that is the Kafka topic,
service feedback events should be produced and the Kafka configuration properties to be passed
to the Kafka Producer.
The following Kafka Producer properties cannot be customized by the user, since are managed by the service:
acks
enable.idempotence
key.deserializer
value.deserializer
Shared Settings
Property | Type | Required | Default |
---|---|---|---|
grpc | object |
The property settings
under the Control Plane settings
field contains a set of configurations that are shared by both
state
and feedback
components. In particular, there is a property grpc
that allows to specify the host
and the
port
of the Fast Data Control Plane instance that expose the GRPC server.
When using Mia-Platform Console to configure the settings explained above, please remember to insert only the
content of controlPlane.settings
property in the dedicated space within Projection Storer configuration page. As a reference,
the configuration panel is shown in the screenshot here:
Below are provided three examples of how Control Plane settings can be configured for the Projection Storer service. These are:
- GRPC feedback loop: both Fast Data state and service feedback transit over GRPC communication protocol. This is the recommended configuration, since it is better suited for this kind of inter-service communication and it does not require any additional external resource.
- Kafka feedback loop: both Fast Data state and service feedback transit between services as Kafka messages
- Kafka + GRPC feedback loop: Fast Data state is read from a Kafka topic as a message while and service feedback is sent directly to the Fast Data Control Plane instance via GRPC.
Control Plane Configuration | GRPC feedback loop
"controlPlane": {
"settings": {
"state": {
"type": "grpc"
},
"feedback": {
"type": "grpc"
},
"settings": {
"grpc": {
"host": "<fd-control-plane-k8s-service-name>",
"port": 50051
}
}
},
"bindings": {
...
}
}
Control Plane Configuration | Kafka feedback loop
"controlPlane": {
"settings": {
"state": {
"type": "kafka",
"channel": "<control-plane-state-topic>",
"configuration": {
"client.id": "galaxy.fast-data.DEV.inventory-projection-storer-state",
"bootstrap.servers": "localhost:9092",
"group.id": "galaxy.fast-data.DEV.inventory-projection-storer-control-plane",
/* the following properties define the authentication parameters - please update or remove them after copying the example config */
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<username>\" password=\"<password>\";",
"sasl.mechanism": "SCRAM-SHA-256",
"security.protocol": "SASL_SSL",
"max.poll.records": 1000,
"max.poll.timeout": 1000
}
},
"feedback": {
"type": "kafka",
"channel": "<control-plane-feedback-topic>",
"configuration": {
"client.id": "galaxy.fast-data.DEV.inventory-projection-storer-feedback",
"bootstrap.servers": "localhost:9092",
/* the following properties define the authentication parameters - please update or remove them after copying the example config */
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<username>\" password=\"<password>\";",
"sasl.mechanism": "SCRAM-SHA-256",
"security.protocol": "SASL_SSL"
}
}
},
"bindings": {
...
}
}
Control Plane Configuration | Kafka+GRPC feedback loop
"controlPlane": {
"settings": {
"state": {
"type": "kafka",
"channel": "<control-plane-state-topic>",
"configuration": {
"client.id": "galaxy.fast-data.DEV.inventory-projection-storer-state",
"bootstrap.servers": "localhost:9092",
"group.id": "galaxy.fast-data.DEV.inventory-projection-storer-control-plane",
/* the following properties define the authentication parameters - please update or remove them after copying the example config */
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<username>\" password=\"<password>\";",
"sasl.mechanism": "SCRAM-SHA-256",
"security.protocol": "SASL_SSL",
"max.poll.records": 1000,
"max.poll.timeout": 1000
}
},
"feedback": {
"type": "grpc"
},
"settings": {
"grpc": {
"host": "<fd-control-plane-k8s-service-name>",
"port": 50051
}
}
},
"bindings": {
...
}
}
In order for the service to be aware of which Fast Data pipelines artifacts
and execs it is managing, an additional
bindings
property must be passed within the Control Plane service configuration. This latter configuration maps
pipeline artifacts identifiers with their corresponding details.
Beware that without the bindings
property the Control Plane component will not be enabled on the Projection Storer.
Mia-Platform Console automatically generates for you such mapping starting from the Fast Data configuration and adds it to the service, so that these bindings do not have be added by the user.
Configuration File Example
Below is presented an example of Projection Storer working configuration, both JSON
and YAML
formats, which are the ones
supported by the service.
Projection Storer configuration (JSON)
{
"version": 2,
"settings": {
"systemOfRecords": "inventory",
"enableSoftDelete": true,
"dataSourceAdapter": {
"type": "debezium"
}
},
"consumer": {
"type": "kafka",
"configuration": {
"client.id": "galaxy.fast-data.DEV.inventory-projection-storer-consumer",
"bootstrap.servers": "localhost:9092",
"group.id": "galaxy.fast-data.DEV.inventory-projection-storer",
"auto.offset.reset": "latest",
"max.poll.records": 2000,
"max.poll.timeout.ms": 500,
/* the following properties define the authentication parameters - please update or remove them after copying the example config */
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<username>\" password=\"<password>\";",
"sasl.mechanism": "SCRAM-SHA-256",
"security.protocol": "SASL_SSL"
}
},
"producer": {
"type": "kafka",
"configuration": {
"client.id": "galaxy.fast-data.DEV.inventory-projection-storer-producer",
"bootstrap.servers": "localhost:9092",
/* the following properties define the authentication parameters - please update or remove them after copying the example config */
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<username>\" password=\"<password>\";",
"sasl.mechanism": "SCRAM-SHA-256",
"security.protocol": "SASL_SSL"
}
},
"storage": {
"type": "mongodb",
"configuration": {
"url": "mongodb://localhost:27017",
"database": "fast-data-inventory-dev"
}
},
"projections": {
"pr_customers": {
"topics": {
"ingestion": {
"name": "galaxy.fast-data.DEV.customers.ingestion"
},
"prUpdate": {
"name": "galaxy.fast-data.DEV.customers.pr-update"
}
},
"customStorageNamespace": "customers",
"primaryKeys": [
"id"
],
"fieldsMapping": {
"id": {
"targetField": "id",
"castFunction": "identity"
},
"first_name": {
"targetField": "first_name",
"castFunction": "castToString"
},
"last_name": {
"targetField": "last_name",
"castFunction": "castToString"
},
"email": {
"targetField": "email",
"castFunction": "castToString"
},
"bio": {
"targetField": "bio",
"castFunction": "castToString"
}
}
},
"pr_orders": {
"topics": {
"ingestion": {
"name": "galaxy.fast-data.DEV.orders.ingestion"
},
"prUpdate": {
"name": "galaxy.fast-data.DEV.orders.pr-update"
}
},
"customStorageNamespace": "orders",
"primaryKeys": [
"order_number"
],
"fieldsMapping": {
"order_number": {
"targetField": "order_number",
"castFunction": "identity"
},
"order_date": {
"targetField": "order_date",
"castFunction": "castToDate"
},
"purchaser": {
"targetField": "purchaser",
"castFunction": "identity"
},
"quantity": {
"targetField": "quantity",
"castFunction": "castToInteger"
},
"product_id": {
"targetField": "product_id",
"castFunction": "identity"
},
"price": {
"targetField": "price",
"castFunction": "castToFloat"
},
"delivery": {
"targetField": "delivery",
"castFunction": "castToString"
}
}
},
"pr_products": {
"topics": {
"ingestion": {
"name": "galaxy.fast-data.DEV.products.ingestion"
},
"prUpdate": {
"name": "galaxy.fast-data.DEV.products.pr-update"
}
},
"customStorageNamespace": "products",
"primaryKeys": [
"id"
],
"fieldsMapping": {
"id": {
"targetField": "id",
"castFunction": "identity"
},
"name": {
"targetField": "name",
"castFunction": "castToString"
},
"description": {
"targetField": "description",
"castFunction": "castToString"
},
"weight": {
"targetField": "weight",
"castFunction": "castToFloat"
},
"material": {
"targetField": "material",
"castFunction": "castToString"
},
"price": {
"targetField": "price",
"castFunction": "castToFloat"
}
}
}
},
"controlPlane": {
"settings": {
"state": {
"type": "grpc"
},
"feedback": {
"type": "grpc"
},
"settings": {
"grpc": {
"host": "localhost"
}
}
},
"bindings": {}
}
}
Projection Storer configuration (YAML)
version: 2
settings:
systemOfRecords: inventory
enableSoftDelete: true
dataSourceAdapter:
type: debezium
consumer:
type: kafka
configuration:
"client.id": galaxy.fast-data.DEV.inventory-projection-storer-consumer
"bootstrap.servers": localhost:9092
"group.id": galaxy.fast-data.DEV.inventory-projection-storer
"auto.offset.reset": latest
"max.poll.records": 2000
"max.poll.timeout.ms": 500
# the following properties define the authentication parameters - please update or remove them after copying the example config
"sasl.jaas.config": org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";
"sasl.mechanism": SCRAM-SHA-256
"security.protocol": SASL_SSL
producer:
type: kafka
configuration:
"client.id": galaxy.fast-data.DEV.inventory-projection-storer-producer
"bootstrap.servers": localhost:9092
# the following properties define the authentication parameters - please update or remove them after copying the example config
"sasl.jaas.config": org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";
"sasl.mechanism": SCRAM-SHA-256
"security.protocol": SASL_SSL
storage:
type: mongodb
configuration:
url: mongodb://localhost:27017
database: fast-data-inventory-dev
projections:
pr_customers:
topics:
ingestion:
name: galaxy.fast-data.DEV.customers.ingestion
prUpdate:
name: galaxy.fast-data.DEV.customers.pr-update
customStorageNamespace: customers
primaryKeys:
- id
fieldsMapping:
id:
targetField: id
castFunction: identity
first_name:
targetField: first_name
castFunction: castToString
last_name:
targetField: last_name
castFunction: castToString
email:
targetField: email
castFunction: castToString
bio:
targetField: bio
castFunction: castToString
pr_orders:
topics:
ingestion:
name: galaxy.fast-data.DEV.orders.ingestion
prUpdate:
name: galaxy.fast-data.DEV.orders.pr-update
customStorageNamespace: orders
primaryKeys:
- order_number
fieldsMapping:
order_number:
targetField: order_number
castFunction: identity
order_date:
targetField: order_date
castFunction: castToDate
purchaser:
targetField: purchaser
castFunction: identity
quantity:
targetField: quantity
castFunction: castToInteger
product_id:
targetField: product_id
castFunction: identity
price:
targetField: price
castFunction: castToFloat
delivery:
targetField: delivery
castFunction: castToString
pr_products:
topics:
ingestion:
name: galaxy.fast-data.DEV.products.ingestion
prUpdate:
name: galaxy.fast-data.DEV.products.pr-update
customStorageNamespace: products
primaryKeys:
- id
fieldsMapping:
id:
targetField: id
castFunction: identity
name:
targetField: name
castFunction: castToString
description:
targetField: description
castFunction: castToString
weight:
targetField: weight
castFunction: castToFloat
material:
targetField: material
castFunction: castToString
price:
targetField: price
castFunction: castToFloat
controlPlane:
settings:
state:
type: grpc
feedback:
type: grpc
settings:
grpc:
host: localhost
# bindings map is empty and therefore Control Plane component is disabled
bindings: {}
Migration Guide
In the following section is explained how to migrate the configuration of an existing Real-Time Updater into the one needed by a Projection Storer service. To support the migration operations, a small command-line interface is available, which translates most of the main settings
Migrating a Real-Time Updater is subjected to certain constraints on the Console and services version, which are reported in the table below:
Mia-Platform Console | Real-Time Updater | Projection Storer |
---|---|---|
>= v12.0.0 | v7.x.y | v1.x.y |
Before starting the migration procedure it is strongly recommended to first upgrade the Console version to the v12, since it provides additional features for managing both Real-Time Updater and Projection Storer services within Fast Data configurator.
Requirements
In order to proceed with the migration process it is necessary to have:
the permissions to download
api-console-config.json
andfastdata-config.json
configuration files within your project configurations repositoryaccess to the Mia-Platform internal npm registry
locally installed
nodejs
, so that it is possible to install and exploit the command-line interface dedicated to migrate services configurations.The cli can be installed with the following command:
npm i -g @mia-platform-internal/projection-storer-migration
Migration Process
First, let's open in Console the project that contains the Real-Time Updater service that should be migrated to a Projection Storer. From there, access the project configuration stored within your Company's git provider, using the link in the top-right corner of Mia-Platform Console interface:
Once the project configuration files are visible, navigate within the repository and locate at root level the following configuration files:
api-console-config.json
fastdata-config.json
Please, download them into a selected folder where to generate the new configuration files. For example, the folder may be named <project-name>-real-time-updater-migration
.
Now it is possible to employ the provided command-line interface to generate Projection Storer configurations matching the ones of existing Real-Time Updater. To achieve so, open a terminal on your computer, navigate to the folder where configuration files mentioned above were downloaded and execute the following command:
rtu-to-ps project -cc <filepath-to-api-console-config> \
-fdc <filepath-to-fast-data-config> \
-s <name-system-of-record>
-r <name-service-to-migrate-linked-to-system-of-record> \
rtu-to-ps
is the program name of the command-line interface that have been installed earlier, as explained in the requirements section.
To learn more about its features, it is possible to launch the cli using the --help
flag.
Considering a real-life example, let's suppose that api-console-config.json
and fastdata-config.json
are available in the folder
where the cli is launched, the selected System of Record is inventory
and the Real-Time Updater to migrate is named fast-data-inventory-realtime-updater
.
Consequently, the cli should be launched as follows:
rtu-to-ps project -cc api-console-config.json \
-fdc fastdata-config.json \
-s inventory \
-r fast-data-inventory-realtime-updater
Upon its execution, the cli outputs a series of details regarding generated files, where to find them, possible warnings and how to handle generated configs in order to configure a Projection Storer.
Focusing on the command output, a new folder out
should have been created. The output folder can be customized using the -o
flag of the cli.
Within this folder it should be possible to find another folder named as the selected service, where the reference to Real-Time Updater has been replaced
with the one to Projection Storer. Below is reported the expected structure of the generated folder:
out
└── fast-data-inventory-projection-storer
├── castFunctions # folder found only when at least a custom cast function has been defined
│ └── castToTitleCase.js
├── messageAdapter.js # file found only when a custom message adapter has been previously defined
├── ps-config.json
└── service.env
where:
service.env
→ is the environment file containing all the environment variables need by the serviceps-config.json
→ contains the main configuration of a Projection Storer service- [optional]
messageAdapter.js
→ this file can be found only when a custom message adapter has been previously associated to the Real-Time Updater to be migrated. Represents the custom implementation employed by the existing Real-Time Updater, which can be adapted to be employed within the Projection Storer service - [optional]
castFunctions
→ this folder can be found only when at least one custom cast function was defined in the project. Within the folder can be found the implementation of each custom cast function, which can be loaded also in the Projection Storer service
Then, these generated files can be employed either for configuring a new Projection Storer in Mia-Platform Console, which would replace the existing Real-Time Updater, or for setting up a Projection Storer in your environment.
Extracted user-defined functions, such as the message adapter or the custom cast functions may need to be slightly adjusted before using them in a Projection Storer service. In particular, Projection Storer:
- supports both Real-Time Updater and its own custom message adapter definition. In fact, when the output of existing adapters is mapped internally to the newer one.
However, it is recommended to review existing implementation to reflect the newer return type, as explained in the output paragraph. This allows to have a greater control over the details provided to the Projection Storer - expects that custom cast function do not receive as input parameter the
logger
. It is requested that the implementation of those custom function is updated to remove the logger and obtain a working user-defined function. More details on custom cast functions implementation can be found in the cast function section
Replace existing Real-Time Updater service in Mia-Platform Console
Below are reported all the steps necessary to set-up a Projection Storer that could replace the corresponding Real-Time Updater.
In the project of interest, where the existing Real-Time Updater can be found, open the Marketplace section, search for Projection Storer plugin and create a new service, as shown in the figure:
When the Projection Storer plugin is created, it already contains the proper set of environment variables.
Once the service is created, navigate to the Fast Data Configurator (Systems of Record section) and select the System of Record that contains the Real-Time Updater to be replaced. In the submenu, please select the Services tab, as displayed below:
After entering the services section, the first action to be carried out is to detach from the System of Record the existing Real-Time Updater service. In this manner, the projections that were associated to such service are now free to be assigned to other services.
Then, the next step is to attach to the System of Record the newly created Projection Storer plugin, so that it can be configured from the Fast Data configurator
Eventually, after attaching the Projection Storer, click on the edit microservice button and start configuring its properties. Here, in case it has been followed the previous guide on how to generate a Projection Storer configurator starting from an existing Real-Time Updater, it is possible to just copy and adjust where needed the pieces of the main configuration file.
In particular:
- Projections to be managed by the service can be directly selected in Console from the drop-down menu. This allows to automatically generate the projections configuration needed by the Projection Storer
- Enable Soft Delete value can be found under
settings
property of the main Projection Storer configuration file - Consumer configuration value can be copied and adjusted from
consumer
property of the generated main Projection Storer configuration file (ps-config.json
) - Producer configuration value can be copied and adjusted from
producer
property of the generated main Projection Storer configuration file (ps-config.json
) - Storage configuration value can be copied and adjusted from
storage
property of the generated main Projection Storer configuration file (ps-config.json
)
Finally, please verify whether configured message adapter corresponds to the intended one (found under
settings
property). In case the selected message adapter is custom, then please verify that the user-defined implementation adhere to the expected configuration explained here.
As explained earlier, Projection Storer service is in charge only of importing, clean, filter and validate change events as projection records. Computing which Single View should be re-created given a specific change event is now a responsibility of the Single View Trigger Generator, which should be configured accordingly. In this page can be found an explanation on how to configure it.
In case the System of Record of your concern is currently adopting a Fast Data standard architecture, which means the Real-Time Updater was responsible also of triggering Single Views re-generation, the Single View Trigger Generator plugin has to be introduced in the system, since Projection Storer only supports Fast Data event-driven architectures.
A detailed explanation of how the Single View Trigger Generator service should be introduced and how to migrate Fast Data from standard to event-driven architecture is provided here