Skip to main content
Version: 10.9.x

Rest2Kafka Kotlin Library

This library allows to easily configure a Ktor Web Server which produces kafka messages.

Getting Started

Install

To include the library as a dependency of your Maven/Gradle project use the following snippets.

Maven

<dependency>
<groupId>eu.mia-platform</groupId>
<artifactId>rest2kafka</artifactId>
<version>1.0.1</version>
</dependency>

Gradle

compile 'eu.mia-platform:rest2kafka:1.0.1'

Quick Start

The following basic usage of the library allows to start a server which expose the route POST /my-route.
For each incoming request, a kafka message to TutorialTopic will be produced.
The value of the message will be the body of the incoming request.

val configurations : MutableList<Rest2KafkaOptions<out Any>> = mutableListOf(
Rest2KafkaOptions(
RouteOptions(
"POST",
"/my-route",
Any::class
),
KafkaOptions(
"TutorialTopic"
)
)
)

val rest2kafka = Rest2kafka("localhost:9092", configurations = configurations)
rest2kafka.start()

N.B. The explicit type MutableList<Rest2KafkaOptions<out Any>> is mandatory to initialize the Rest2Kafka class.

Routes options

The only two mandatory route options are path and method.

By default the library does not validate the request body. To validate the request body we use Yavi or JSON schema.

Yavi request body validation

In order to validate the request body, you need to provide the following route options:

  • schema: the Kotlin class against which to validate the request body
  • validator: the Yavi Validator instance containing all the desired constraints of each field of the schema.
data class MySchema(
val myStringField: String,
val myComplexField: ComplexField
) {
data class ComplexField(
val firstField: String,
val secondField: Int
)
}

val mySchemaValidator: Validator<MySchema> = ValidatorBuilder.of()
.konstraint(MySchema::myStringField) {
notNull()
.notEmpty()
.lessThanOrEqual(3)
}
.build()

val options : MutableList<Rest2KafkaOptions<out Any>> = mutableListOf(
Rest2KafkaOptions(
RouteOptions(
"POST",
"/my-route",
schema = MySchema::class,
validator = mySchemaValidator
),
KafkaOptions(
"my-topic"
)
)
)

val rest2kafka = Rest2kafka(SuccessfulProducer(), configurations = options)

For full list of available constraints please refer to Yavi documentation. Other examples of Validators can be found here.
This library also provides some utilities to validate the date strings.

Validating Int Fields

In order to validate any Int field it is mandatory to declare the property as nullable (Int?) in the Schema. Then, if that field is required, you can use the notNull() constraint in the validator.

JSON Schema validation

In order to validate the request body, you need to provide the following route options:

data class MySchema(
val myStringField: String,
val myComplexField: ComplexField
) {
data class ComplexField(
val firstField: String,
val secondField: Int
)
}

val configurations : MutableList<Rest2KafkaOptions<out Any>> = mutableListOf(
Rest2KafkaOptions(
RouteOptions(
"POST",
"/",
schema=MySchema::class,
jsonSchemaUrl = javaClass.getResource("/json-schema.json")
),
KafkaOptions(
"my-topic"
)
)
)

val rest2kafka = Rest2kafka("my-host", configurations = configurations)
rest2kafka.start()

JSON Schema validation is executed by Medeia Validator.

Kafka options

The only mandatory kafka option is the topic.
The other options are all the possible parameters to instantiate a Kafka ProducerRecord.
Every produced record will be initialized with the given options. The record's value is set to the body of the received request unless value is provided in the KafkaOptions.

Response body

With the configuration above, the API /my-route will respond with 204 No Content.
Eventually, you can specify a response body as last parameter of the Rest2KafkaOptions. The response body can be either a string (Content-Type will be text/plain) or a Kotlin class (Content-Type will be application/json). It will be the response body of every incoming request for that route.

Advanced usage

It is possible to use the library in advanced mode to customize either the response body or the Kafka message.
This is done by using an Adapter. It is an interface with only one method adapt. You can provide an adapter to Rest2kafka using an ad-hoc constructor.

val configurations : MutableList<Rest2KafkaOptions<out Any>> = mutableListOf(
Rest2KafkaOptions(
RouteOptions(
"POST",
"/my-route",
Any::class
),
KeyGeneratingAdapterFromBody(
KeyGeneratingAdapter.KafkaOptions(
"tutorialTopic"
)
)
)
)

val rest2kafka = Rest2kafka("localhost:9092", configurations = configurations)
rest2kafka.start()

This method is the one that actually creates both the ProducerRecord and the http response given the information about the incoming request.

As the adapt method is a suspend function, custom adapters can also do asynchronous operations such as http requests as it is done here.

If the adapt method throws a BadRequestException() then the service will respond with status code specified in the exception; If the adapt function throws any other exception then the service will respond 500 with the message of the exception.

Some adapters are provided by the library and described below.

Key Generating Adapter from Body

The KeyGeneratingAdapterFromBody is an adapter that is initialized with some KafkaOptions. Its adapt method always generates an AdapterResponse having:

  • As response body, an id that is the hash of the body of the received request
  • As ProducerRecord, a record with:
    • key: the hash of the body of the received request;
    • value: the body of received request;
    • other kafkaOptions: the ones provided when the Adapter is initialized.

Health routes

Besides the routes set in the configuration, the library exposes, by default, three health routes:

  • /-/healthz always returns 200
  • /-/ready always returns 200
  • /-/check-up always return 200, unless you use the constructor with checkUpTopic parameter to instantiate the library. In this case, it tries to produce a message on the Kafka topic specified in the parameter: if succeeds returns 200; if fails returns 503.

The response body of these routes will contain both the name and the version of the service.
Thus your project must contain a project.properties file like this one where the library will read these information from.
Remember to update the version in this file every time you tag your service.

Customize your health routes

The Rest2Kafka constructor accepts a third optional parameter that can be:

  • checkUpTopic of type string. The meaning is explained above.
  • healthRouting an extension function of the Routing Ktor class. This will override the default health routes.

Logging

By default, the amount of logs of ktor is huge. In order to adjust the logging settings add a logback.xml file like this one in the resources of your project.


Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

v1.1.0

  • add possibility to customize response status code

v1.0.3

Added

  • default /check-up route can produce a kafka message

v1.0.2

Fixed

  • Reduce Kafka logs

v1.0.1

Fixed

  • DefaultResponseBody can be Any and not only String

v1.0.0

Changed

  • DefaultResponseBody and KafkaOptions are now attributes of the Adapter
  • KeyGeneratingAdapter provided

v0.2.1

Fixed

  • Preserve backward compatibility with version 0.1.2

v0.2.0

Changed

  • Json Schema based input validation with medeia-validator-gson.
  • Version script in build.gradle updated.
  • .gitlab-ci uses gradlew everywhere.

v0.1.2

Changed

  • publish script updated into the build.gradle file