Skip to main content
Version: 12.x (Current)

Configuration

In order to successfully deploy and use the Kafka2Rest service it requires

  • a global configuration, via environmental variables such as Kafka details and config maps path
  • a mapping between Kafka topics and their target base URL
  • a library of Javascript functions that can be employed to generate the path of a target service
  • a library of Javascript functions that can be employed to generate the request body of a target service

Environment

In this section are listed and explained the environmental variables needed to run an instance of Kafka2Rest service.

  • TOPICS_REST_CONFIG_PATH: the path where is located the kafka-topic:base-url configuration map
  • PATH_PROCESSORS_PATH: the path where is located the path processors Javascript file
  • BODY_PROCESSORS_PATH: the path where is located the request body processors Javascript file
  • VALIDATOR_PROCESSORS_PATH: the path where is located the message validator processors Javascript file
  • KAFKA_CLIENT_ID: the identifier which can be employed to recognize this service within Kafka environment
  • KAFKA_GROUP_ID: the consumer group identifier employed by the service to read messages from Kafka
  • KAFKA_BROKERS_LIST: a string containing a comma (,) separated list of Kafka brokers addresses. At least one broker should be provided
  • KAFKA_AUTH_METHOD: the authentication method adopted by the service to connect to Kafka brokers. It can assume one of the following values, depending on your credentials:
    • plain
    • scram-sha-256
    • scram-sha-512
  • KAFKA_SASL_USERNAME: the username key used by the service to authenticate onto Kafka brokers
  • KAFKA_SASL_PASSWORD: the secret used by the service to authenticate onto Kafka brokers

Kafka Topics - REST (Base URL) Config Map

This configuration is required to inform the service that it has to start listening on a particular topic and allow the service to perform the matching between those topics and their corresponding target services (identified by their base url).
Here is provided the JSON schema that defines the configuration for each topic

{
"type": "object",
"required": [
"baseUrl",
"messageSchema",
"pathProcessor",
"bodyProcessor"
],
"properties": {
"authentication": {
"type": "string",
"enum": ["none", "basic", "privateKeyJwt"],
"default": "none",
"description": "specify (optionally) the way the service should request a token to the client credentials to authenticate the request to the target url"
},
"baseUrl": {
"type": "string",
"description": "the target base url onto which requests should be performed. This MUST terminate with a /"
},
"messageSchema": {
"type": "object",
"description": "a JSON schema employed to validate that the incoming messages on this topic are correct"
},
"pathProcessor": {
"type": "string",
"description": "the name of a path processor to be employed to generate the request path - selected from one of the available in pathProcessors file"
},
"bodyProcessor": {
"type": "string",
"description": "the name of a body processor to be employed to generate the request body - selected from one of the available in bodyProcessors file"
},
"validatorProcessor": {
"type": "string",
"description": "the name of a validator processor to be employed to filter invalid message - selected from one of the available in validatorProcessor file"
},
"tokenIssuerUrl": {
"type": "string",
"description": "the base url of the client credentials (required if authentication is basic or privateKeyJwt)"
},
"clientId": {
"type": "string",
"description": "id of the client to request the auth token (required if authentication is basic or privateKeyJwt)"
},
"clientSecret": {
"type": "string",
"description": "client secret to request the auth token (required if authentication is basic)"
},
"privateKeyPath": {
"type": "string",
"description": "private key to use to request the auth token (required if authentication is privateKeyJwt)"
},
"miaJwtIss": {
"type": "string",
"description": "this value is set as aud in the jwt assertion (required if authentication is privateKeyJwt)"
},
"kid": {
"type": "string",
"description": "id of the private key (required if authentication is privateKeyJwt)"
},
"requestedAudiences": {
"type": "array",
"items": { "type": "string" },
"description": "list of the audiences to insert in the auth token"
},
}
}
caution

baseUrl value must terminate with a trailing slash (/), otherwise the complete URL might not be correctly computed.

An example of a configuration map is the following one:

{
"space-topic": {
"baseUrl": "http://super-cool-service/notify/",
"messageSchema": {
"type": "object",
"required": [
"score"
],
"properties": {
"score": {
"type": "number"
},
"user": {
"type": "string"
}
}
},
"pathProcessor": "computed",
"bodyProcessor": "payloadBased",
"validatorProcessor": "winner"
},
"notification-topic": {
"authentication": "privateKeyJwt",
"tokenIssuerUrl": "http://client-credentials",
"clientId": "pk-client",
"privateKeyPath": "./path/to/the/id_rsa_priv.pem",
"kid": "key-id",
"miaJwtIss": "mia-idp",
"requestedAudiences": ["dih"],
"baseUrl": "http://space-notifications/",
"messageSchema": {
"type": "object",
"required": [
"result"
],
"properties": {
"result": {
"type": "string",
"enum": [
"OK",
"KO"
]
}
}
},
"pathProcessor": "constant",
"bodyProcessor": "constantBody",
"validatorProcessor": "default"
},
"interesting-topic": {
"authentication": "basic",
"tokenIssuerUrl": "http://client-credentials",
"clientId": "my-client",
"clientSecret": "if i tell you i have to kill you",
"requestedAudiences": ["dih"],
"baseUrl": "http://target-url-with-auth",
"messageSchema": {
"type": "object",
"required": [
"result"
],
"properties": {
"result": {
"type": "string",
"enum": [
"OK",
"KO"
]
}
}
},
"pathProcessor": "coolPathProcessor",
"bodyProcessor": "coolBodyProcessor",
"validatorProcessor": "winner"
},
}

Path Processors

It is a Javascript file that exports a set of functions whose input are the key and payload of incoming Kafka messages. These functions must return a string that can be joined with the base url to create the complete target URL.

Below is shown an example of file

'use strict'

module.exports = {
computed: (key, payload) => {
if (!key) {
throw new Error('missing key')
}
return `${key}/${payload.user}/game-over`
},
constant: (key, payload) => "commander"
}

With the following kafka message:

{
"key": "4023901",
"payload": {
"user": "mario",
"code": "342M",
}
}

and the following base url: http://my-service/notify/, the service would perform a request to http://my-service/notify/4023901/mario/game-over using the function computed, or to http://my-service/notify/commander using the function constant.

Body Processors

It is a Javascript file that exports a set of functions whose input are the key and payload of incoming Kafka messages. These functions must return an object, which will be used as request body when executing the REST call to the target service.

Below is shown an example of file

'use strict'

module.exports = {
payloadBased: (key, payload) => ({ score: payload.score }),
constantBody: (key, payload) => {
return { "msg": "Ion cannon ready to rumble!" }
}
}

With the following kafka message:

{
"key": "4023901",
"payload": {
"code": "342M",
"score": 249
}
}

the service would compute the following request body using the function payloadBased:

{
"score": 249
}

or this request body, if using the function constantBody:

{
"msg": "Ion cannon ready to rumble!"
}

Validator Processors

It is a Javascript file that exports a set of functions whose input are the key and payload of incoming Kafka messages. These functions must return a boolean, telling if the message could be sent to the service or should be removed.

Below is shown an example of file

'use strict'

module.exports = {
default: (key, payload) => true,
winner: (key, payload) => payload.score > 250
}

With the following kafka message:

{
"key": "1010010001010101",
"payload": {
"code": "10000111100101100",
"score": 160
}
}

the service would remove the message from the queue if using the winner validator.