Version: 8.x (Current)

Kafka2Rest Kotlin Library

This library allow to consume kafka messages and handle them making HTTP REST calls.

Install#

To include the library as a dependency of your Maven/Gradle project use the following snippets.

install Kafka2Rest#

Maven#

<dependency>
<groupId>eu.mia-platform</groupId>
<artifactId>kafka2rest</artifactId>
<version>{VERSION}</version>
</dependency>

Gradle#

implementation 'eu.mia-platform:kafka2rest:{VERSION}'

install Kafka2Rest dependencies#

You need to add Fuel repository to your build.gradle or pom.xml.

Gradle#

repositories {
maven {
name 'spring-lib-release'
url 'https://repo.spring.io/libs-release/'
}
...
}

Usage#

Kafka2Rest is based on Filters and Processors. You can develop your custom Filters and Processor implementing the FilterInterface and ProcessorInterface or use existent filters and processors.

Initialization#

First of all you need to initialize Kafka2Rest with:

Config Parameters#

ParameterMeaning
maxIntervalBetweenPollsThe maximum time (Duration) interval between two consecutive poll. When this timeout is exceeded the method isRunning() returns false.
topicListThe list of topic to subscribe to.
pollTimeoutThe time (Duration) spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.
onExceptionTopic(Optional) Defines a topic where messages causing unhandled processor exceptions will be stored. Kafka2Rest will enrich the message headers with two properties retryNumber and errors.
retriesLimit
sleepPeriod

Code Sample#

val props = Properties()
props["bootstrap.servers"] = "http://your-kafka-host:9092"
props["group.id"] = "consumer-group-id"
props["key.deserializer"] = StringDeserializer::class.java
props["value.deserializer"] = StringDeserializer::class.java
props["max.poll.interval.ms"] = Int.MAX_VALUE
props["connections.max.idle.ms"] = Int.MAX_VALUE
props["enable.auto.commit"] = false
val consumer = KafkaConsumer<String, String>(props)
val config = ConfigBuilder
.addTopicList(listOf("topic1", "topic2"))
.addPollTimeout(Duration.ofSeconds(10L))
.addMaxIntervalBetweenPolls(Duration.ofMinutes(5L))
.build()
val kafka2rest = Kafka2Rest<String, String>(consumer, config)

Messages Handling#

Then you need to set the Filter And Processor couples. Filter parse kafka message and return boolean . If true the linked processor run its process() method and do something.

kafka2rest.set(Filter1(), HTTPPostProcessor())
kafka2rest.set(Filter1(), MyCustomProcessor())
kafka2rest.set(Filter2(), OtherProcessor())
// ...

Start#

Just start Kafka2Rest. Invoking the start() method Kafka2Rest will run on new thread and listen kafka topics.

kafka2rest.start()

Health Check#

Kafka2rest offers two method to verify service and connection healthiness.

  • isRunning() This method verifies the timeout between two consecutive poll. It returns true when the config maxIntervalBetweenPolls timeout is respected. This ensures that the consumer is polling and the processors aren't getting stuck.
  • isConnected() This method verifies that consumer can list broker topics and it must be connected to kafka server to do this. It returns true when topics are more than 0.

ProducerManager (beta)#

You can also send messages to kafka host building ProducerManager and using the sendMessage() method.

val producerManager = ProducerManager("localhost:9092")
producerManager.sendMessage(myKafkaMessage, "topic1")