Skip to main content
Version: 7.9.x

Kafka2Rest Kotlin Library

This library allow to consume kafka messages and handle them making HTTP REST calls.

Install

To include the library as a dependency of your Maven/Gradle project use the following snippets.

install Kafka2Rest

Maven

<dependency>
<groupId>eu.mia-platform</groupId>
<artifactId>kafka2rest</artifactId>
<version>{VERSION}</version>
</dependency>

Gradle

implementation 'eu.mia-platform:kafka2rest:{VERSION}'

install Kafka2Rest dependencies

You need to add Fuel repository to your build.gradle or pom.xml.

Gradle

repositories {
maven {
name 'spring-lib-release'
url 'https://repo.spring.io/libs-release/'
}
...
}

Usage

Kafka2Rest is based on Filters and Processors. You can develop your custom Filters and Processor implementing the FilterInterface and ProcessorInterface or use existent filters and processors.

Initialization

First of all you need to initialize Kafka2Rest with:

Config Parameters

ParameterMeaning
maxIntervalBetweenPollsThe maximum time (Duration) interval between two consecutive poll. When this timeout is exceeded the method isRunning() returns false.
topicListThe list of topic to subscribe to.
pollTimeoutThe time (Duration) spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.
onExceptionTopic(Optional) Defines a topic where messages causing unhandled processor exceptions will be stored. Kafka2Rest will enrich the message headers with two properties retryNumber and errors.
retriesLimit
sleepPeriod

Code Sample

    val props = Properties()
props["bootstrap.servers"] = "http://your-kafka-host:9092"
props["group.id"] = "consumer-group-id"
props["key.deserializer"] = StringDeserializer::class.java
props["value.deserializer"] = StringDeserializer::class.java
props["max.poll.interval.ms"] = Int.MAX_VALUE
props["connections.max.idle.ms"] = Int.MAX_VALUE
props["enable.auto.commit"] = false

val consumer = KafkaConsumer<String, String>(props)

val config = ConfigBuilder
.addTopicList(listOf("topic1", "topic2"))
.addPollTimeout(Duration.ofSeconds(10L))
.addMaxIntervalBetweenPolls(Duration.ofMinutes(5L))
.build()

val kafka2rest = Kafka2Rest<String, String>(consumer, config)

Messages Handling

Then you need to set the Filter And Processor couples. Filter parse kafka message and return boolean . If true the linked processor run its process() method and do something.

    kafka2rest.set(Filter1(), HTTPPostProcessor())
kafka2rest.set(Filter1(), MyCustomProcessor())
kafka2rest.set(Filter2(), OtherProcessor())
// ...

Start

Just start Kafka2Rest. Invoking the start() method Kafka2Rest will run on new thread and listen kafka topics.

    kafka2rest.start()

Health Check

Kafka2rest offers two method to verify service and connection healthiness.

  • isRunning() This method verifies the timeout between two consecutive poll. It returns true when the config maxIntervalBetweenPolls timeout is respected. This ensures that the consumer is polling and the processors aren't getting stuck.
  • isConnected() This method verifies that consumer can list broker topics and it must be connected to kafka server to do this. It returns true when topics are more than 0.

ProducerManager (beta)

You can also send messages to kafka host building ProducerManager and using the sendMessage() method.

    val producerManager = ProducerManager("localhost:9092")
producerManager.sendMessage(myKafkaMessage, "topic1")