Concepts
Data Source
A data source in the context of Fast Data pipelines represents a static resource that contains data / records not in movement. Example of static data sources are databases and APIs.
Stream
A stream in the context of Fast Data pipelines represents a channel where data can be moved from one location to another. During the movement, data can be transformed, filtered or aggregated together with other streams. Example of a stream is a Kafka topic.
It is possible to associate a name to each stream, usually similar to the name of the underlying channel. This name can be customized as needed, although it is usually recommended to follow a naming convention. Using the same format for the different stream may help recognizing group of them within the configuration and better understand their purpose.
For example:
- streams generated as output of a
stream-processor
service may be suffixed depending on the processor function, such as.validated
,.filtered
or.preproc
. - streams generated by a
farm-data
service may be suffixed with.aggregated
to signal their content was combined by joining different sources .aggregated
streams transformed and mapped by astream-processor
service may be suffixed as.product
, since their content can represent a data product
Snapshot / Initial Load
The procedure that reads a whole data source and transform each of its records into change events, producing them onto a stream.
Fast Data Message Format
Within the context of Fast Data, workloads exchange change event following an ad-hoc format. This format provides different details regarding the modification that occurred to the record. In particular, the message contains
-
an identifier of the record that was modified
-
the type of operation it was applied onto the record, which ca be:
READ (r)
→ the record is extracted from the static data source during a snapshot procedureCREATE (c)
→ the record was just inserted in the static data sourceUPDATE (u)
→ the record was updated on the static data sourceDELETE (d)
→ the record was deleted from the static data source
-
the value of the record before the modification occurred (available in
UPDATE
andDELETE
operations) -
the value of the record after the modification occurred (available in
CREATE
andUPDATE
operations) -
an optional metadata describing the data source that contains the record and when the modification occurred on it
-
an optional metadata describing when the modification was registered by the processing service
In the context of the Fast Data pipelines configuration or this documentation, when a message is required to be Fast Data compliant, it means that it should respect the constraints described in this page and adopt the appropriate message format depending on the selected streaming platform.
Kafka Message Schema
When using Kafka as stream messaging platform, the identifier of the record must be set in the message key value, while the remaining details must be set in the message payload.
Here is provided the JSON schema of both message components:
- Schema Viewer
- Raw JSON Schema
{
"$schema": "https://json-schema.org/draft-07/schema",
"title": "Key Schema",
"type": "object",
"additionalProperties": true
}
- Schema Viewer
- Raw JSON Schema
{
"$schema": "https://json-schema.org/draft-07/schema",
"title": "Payload Schema",
"type": "object",
"oneOf": [
{
"title": "Snapshot Event",
"required": [
"op",
"after"
],
"properties": {
"op": {
"description": "Change event generated by a snapshot procedure",
"const": "r"
},
"after": {
"type": "object",
"additionalProperties": true,
"description": "The value of the payload after the modification occurred"
},
"before": {
"type": "null",
"description": "The value of the payload before the modification occurred. For snapshot operations this is not available"
},
"source": {
"$ref": "#/definitions/source"
},
"ts_ms": {
"$ref": "#/definitions/workload_ts_ms"
},
"updateDescription": {
"$ref": "#/definitions/updateDescription"
}
}
},
{
"title": "Create Event",
"required": [
"op",
"after"
],
"properties": {
"op": {
"description": "Change event generated when a new record is created on the data source",
"const": "c"
},
"after": {
"type": "object",
"additionalProperties": true,
"description": "The value of the payload after the modification occurred"
},
"before": {
"type": "null",
"description": "The value of the payload before the modification occurred. For create operations this is not available"
},
"source": {
"$ref": "#/definitions/source"
},
"ts_ms": {
"$ref": "#/definitions/workload_ts_ms"
},
"updateDescription": {
"$ref": "#/definitions/updateDescription"
}
}
},
{
"title": "Update Event",
"required": [
"op",
"after",
"before"
],
"properties": {
"op": {
"description": "Change event generated when an existing record was modified on the data source",
"const": "u"
},
"after": {
"type": "object",
"additionalProperties": true,
"description": "The value of the payload after the modification occurred"
},
"before": {
"type": "object",
"additionalProperties": true,
"description": "The value of the payload before the modification occurred"
},
"source": {
"$ref": "#/definitions/source"
},
"ts_ms": {
"$ref": "#/definitions/workload_ts_ms"
},
"updateDescription": {
"$ref": "#/definitions/updateDescription"
}
}
},
{
"title": "Delete Event",
"required": [
"op",
"before"
],
"properties": {
"op": {
"description": "Change event generated when an existing record was deleted from the data source",
"const": "d"
},
"after": {
"type": "null",
"description": "The value of the payload after the modification occurred. For delete operations this is not available"
},
"before": {
"type": "object",
"additionalProperties": true,
"description": "The value of the payload before the modification occurred"
},
"source": {
"$ref": "#/definitions/source"
},
"ts_ms": {
"$ref": "#/definitions/workload_ts_ms"
},
"updateDescription": {
"$ref": "#/definitions/updateDescription"
}
}
}
],
"definitions": {
"source": {
"type": "object",
"properties": {
"db": {
"type": "string",
"description": "Name of the database containing the original record"
},
"collection": {
"type": "string",
"description": "Name of the MongoDB collection containing the original record"
},
"table": {
"type": "string",
"description": "Name of the table containing the original record"
},
"ts_ms": {
"$ref": "#/definitions/source_ts_ms"
}
},
"additionalProperties": true,
"description": "Metadata regarding the data source containing the original record this change event is associated to"
},
"source_ts_ms": {
"type": "integer",
"format": "uint64",
"description": "Unix timestamp in milliseconds representing when the record modification occurred on the data source"
},
"workload_ts_ms": {
"type": "integer",
"format": "uint64",
"description": "Unix timestamp in milliseconds representing when the change event was processed by the current workload"
},
"updateDescription": {
"anyOf": [
{
"type": "object",
"description": "Metadata describing how the record was modified",
"properties": {
"updatedFields": {
"type": "object"
},
"removedFields": {
"type": "object"
},
"truncated_arrays": {
"anyOf": [
{
"type": "array",
"items": {
"type": "object",
"properties": {
"field": {
"type": "string"
},
"newSize": {
"type": "number",
"format": "int32"
}
}
}
},
{
"type": "null"
}
]
},
"disambiguatedPaths": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
]
}
}
},
{
"type": "null"
}
]
}
}
}
The raw JSON schema of the message's payload can also be downloaded from here.
Examples
In this section are provided examples of change events, each of them representing an operation type.
Snapshot Event
{
"key": {
"_id": "60e1d1b1c7849c71e2e3f4g5"
},
"payload": {
"op": "r",
"after": {
"_id": "60e1d1b1c7849c71e2e3f4g5",
"product_id": "P-101",
"name": "Laptop Pro",
"price": 1200.00,
"stock": 50
},
"before": null,
"source": {
"db": "inventory_db",
"collection": "products",
"ts_ms": 1678886400000
},
"ts_ms": 1678886401000,
"updateDescription": null
}
}
Create Event
{
"key": {
"_id": "60e1d1b1c7849c71e2e3f4g6"
},
"payload": {
"op": "c",
"after": {
"_id": "60e1d1b1c7849c71e2e3f4g6",
"product_id": "P-102",
"name": "Wireless Mouse",
"price": 25.50,
"stock": 200
},
"before": null,
"source": {
"db": "inventory_db",
"collection": "products",
"ts_ms": 1678886402000
},
"ts_ms": 1678886403000,
"updateDescription": null
}
}
Update Event
{
"key": {
"_id": "60e1d1b1c7849c71e2e3f4g5"
},
"payload": {
"op": "u",
"after": {
"_id": "60e1d1b1c7849c71e2e3f4g5",
"product_id": "P-101",
"name": "Laptop Pro",
"price": 1150.00,
"stock": 49
},
"before": {
"_id": "60e1d1b1c7849c71e2e3f4g5",
"product_id": "P-101",
"name": "Laptop Pro",
"price": 1200.00,
"stock": 50
},
"source": {
"db": "inventory_db",
"collection": "products",
"ts_ms": 1678886404000
},
"ts_ms": 1678886405000,
"updateDescription": null
}
}
Delete Event
{
"key": {
"_id": "60e1d1b1c7849c71e2e3f4g6"
},
"payload": {
"op": "d",
"after": null,
"before": {
"_id": "60e1d1b1c7849c71e2e3f4g6",
"product_id": "P-102",
"name": "Wireless Mouse",
"price": 25.50,
"stock": 200
},
"source": {
"db": "inventory_db",
"collection": "products",
"ts_ms": 1678886406000
},
"ts_ms": 1678886407000,
"updateDescription": null
}
}