Configuration
In order to successfully deploy and use the Kafka2Rest
service it requires
- a global configuration, via environmental variables such as Kafka details and config maps path
- a mapping between Kafka topics and their target base URL
- a library of Javascript functions that can be employed to generate the path of a target service
- a library of Javascript functions that can be employed to generate the request body of a target service
Environment
In this section are listed and explained the environmental variables needed to run
an instance of Kafka2Rest
service.
TOPICS_REST_CONFIG_PATH
: the path where is located the kafka-topic:base-url configuration mapPATH_PROCESSORS_PATH
: the path where is located the path processors Javascript fileBODY_PROCESSORS_PATH
: the path where is located the request body processors Javascript fileVALIDATOR_PROCESSORS_PATH
: the path where is located the message validator processors Javascript fileKAFKA_CLIENT_ID
: the identifier which can be employed to recognize this service within Kafka environmentKAFKA_GROUP_ID
: the consumer group identifier employed by the service to read messages from KafkaKAFKA_BROKERS_LIST
: a string containing a comma (,
) separated list of Kafka brokers addresses. At least one broker should be providedKAFKA_AUTH_METHOD
: the authentication method adopted by the service to connect to Kafka brokers. It can assume one of the following values, depending on your credentials:plain
scram-sha-256
scram-sha-512
KAFKA_SASL_USERNAME
: the username key used by the service to authenticate onto Kafka brokersKAFKA_SASL_PASSWORD
: the secret used by the service to authenticate onto Kafka brokers
Kafka Topics - REST (Base URL) Config Map
This configuration is required to inform the service that it has to start listening on a
particular topic and allow the service to perform the matching between those topics and
their corresponding target services (identified by their base url).
Here is provided the JSON schema that defines the configuration for each topic
{
"type": "object",
"required": [
"baseUrl",
"messageSchema",
"pathProcessor",
"bodyProcessor"
],
"properties": {
"authentication": {
"type": "string",
"enum": ["none", "basic", "privateKeyJwt"],
"default": "none",
"description": "specify (optionally) the way the service should request a token to the client credentials to authenticate the request to the target url"
},
"baseUrl": {
"type": "string",
"description": "the target base url onto which requests should be performed. This MUST terminate with a /"
},
"messageSchema": {
"type": "object",
"description": "a JSON schema employed to validate that the incoming messages on this topic are correct"
},
"pathProcessor": {
"type": "string",
"description": "the name of a path processor to be employed to generate the request path - selected from one of the available in pathProcessors file"
},
"bodyProcessor": {
"type": "string",
"description": "the name of a body processor to be employed to generate the request body - selected from one of the available in bodyProcessors file"
},
"validatorProcessor": {
"type": "string",
"description": "the name of a validator processor to be employed to filter invalid message - selected from one of the available in validatorProcessor file"
},
"tokenIssuerUrl": {
"type": "string",
"description": "the base url of the client credentials (required if authentication is basic or privateKeyJwt)"
},
"clientId": {
"type": "string",
"description": "id of the client to request the auth token (required if authentication is basic or privateKeyJwt)"
},
"clientSecret": {
"type": "string",
"description": "client secret to request the auth token (required if authentication is basic)"
},
"privateKeyPath": {
"type": "string",
"description": "private key to use to request the auth token (required if authentication is privateKeyJwt)"
},
"miaJwtIss": {
"type": "string",
"description": "this value is set as aud in the jwt assertion (required if authentication is privateKeyJwt)"
},
"kid": {
"type": "string",
"description": "id of the private key (required if authentication is privateKeyJwt)"
},
"requestedAudiences": {
"type": "array",
"items": { "type": "string" },
"description": "list of the audiences to insert in the auth token"
},
}
}
baseUrl
value must terminate with a trailing slash (/
), otherwise the complete URL might not
be correctly computed.
An example of a configuration map is the following one:
{
"space-topic": {
"baseUrl": "http://super-cool-service/notify/",
"messageSchema": {
"type": "object",
"required": [
"score"
],
"properties": {
"score": {
"type": "number"
},
"user": {
"type": "string"
}
}
},
"pathProcessor": "computed",
"bodyProcessor": "payloadBased",
"validatorProcessor": "winner"
},
"notification-topic": {
"authentication": "privateKeyJwt",
"tokenIssuerUrl": "http://client-credentials",
"clientId": "pk-client",
"privateKeyPath": "./path/to/the/id_rsa_priv.pem",
"kid": "key-id",
"miaJwtIss": "mia-idp",
"requestedAudiences": ["dih"],
"baseUrl": "http://space-notifications/",
"messageSchema": {
"type": "object",
"required": [
"result"
],
"properties": {
"result": {
"type": "string",
"enum": [
"OK",
"KO"
]
}
}
},
"pathProcessor": "constant",
"bodyProcessor": "constantBody",
"validatorProcessor": "default"
},
"interesting-topic": {
"authentication": "basic",
"tokenIssuerUrl": "http://client-credentials",
"clientId": "my-client",
"clientSecret": "if i tell you i have to kill you",
"requestedAudiences": ["dih"],
"baseUrl": "http://target-url-with-auth",
"messageSchema": {
"type": "object",
"required": [
"result"
],
"properties": {
"result": {
"type": "string",
"enum": [
"OK",
"KO"
]
}
}
},
"pathProcessor": "coolPathProcessor",
"bodyProcessor": "coolBodyProcessor",
"validatorProcessor": "winner"
},
}
Path Processors
It is a Javascript file that exports a set of functions whose input are the key
and payload
of incoming Kafka messages. These functions must return a string that can be joined with
the base url to create the complete target URL.
Below is shown an example of file
'use strict'
module.exports = {
computed: (key, payload) => {
if (!key) {
throw new Error('missing key')
}
return `${key}/${payload.user}/game-over`
},
constant: (key, payload) => "commander"
}
With the following kafka message:
{
"key": "4023901",
"payload": {
"user": "mario",
"code": "342M",
}
}
and the following base url: http://my-service/notify/
,
the service would perform a request to http://my-service/notify/4023901/mario/game-over
using the function computed
, or to http://my-service/notify/commander
using the function constant
.
Body Processors
It is a Javascript file that exports a set of functions whose input are the key
and payload
of incoming Kafka messages. These functions must return an object, which will be used as
request body when executing the REST call to the target service.
Below is shown an example of file
'use strict'
module.exports = {
payloadBased: (key, payload) => ({ score: payload.score }),
constantBody: (key, payload) => {
return { "msg": "Ion cannon ready to rumble!" }
}
}
With the following kafka message:
{
"key": "4023901",
"payload": {
"code": "342M",
"score": 249
}
}
the service would compute the following request body using the function payloadBased
:
{
"score": 249
}
or this request body, if using the function constantBody
:
{
"msg": "Ion cannon ready to rumble!"
}
Validator Processors
It is a Javascript file that exports a set of functions whose input are the key
and payload
of incoming Kafka messages. These functions must return a boolean, telling if the message could be
sent to the service or should be removed.
Below is shown an example of file
'use strict'
module.exports = {
default: (key, payload) => true,
winner: (key, payload) => payload.score > 250
}
With the following kafka message:
{
"key": "1010010001010101",
"payload": {
"code": "10000111100101100",
"score": 160
}
}
the service would remove the message from the queue if using the winner
validator.