Building a real-time big data pipeline (1: Kafka, RESTful, Java)


Updated on January 19, 2021

Kafka is used for building real-time data pipelines and streaming apps.

What is Kafka? Getting started with kafka says Kafka is a distributed append log; in a simplistic view it is like a file on a filesystem. Producers can append data (echo ‘data’ » file.dat), and consumers subscribe to a certain file (tail -f file.dat). In addition, Kafka provides an ever-increasing counter and a timestamp for each consumed message. Kafka uses Zookeeper to store metadata about producers, topics and partitions.

Kafka for local development of applications:

There are multiple ways of running Kafka locally for development of apps but the easiest method is by docker-compose. To download Docker Desktop, go to Docker Hub and Sign In with your Docker ID.

Docker compose facilitates installing Kafka and Zookeeper with the help of docker-compose.yml file.

version: '3'  
    image: wurstmeister/zookeeper  
      - "2181:2181"  
   image: wurstmeister/kafka  
      - "9092:9092"  
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  
     - /var/run/docker.sock:/var/run/docker.sock

1. Start the Kafka service

Open a terminal, go to the directory where you have the docker-compose.yml file, and execute the following command. This command starts the docker-compose engine, and it downloads the images and runs them.

$docker-compose up -d  
Starting kafka_example_zookeeper_1 ... done  
Starting kafka_example_kafka_1     ... done  

To list running docker containers, run the following command

$docker-compose ps  
Name				Command				State	Ports  
kafka_example_kafka_1			Up>9092/tcp                              
kafka_example_zookeeper_1	/bin/sh -c /usr/sbin/sshd  ...	Up>2181/tcp, 22/tcp, 2888/tcp, 3888/tcp  

You can shut down docker-compose by executing the following command in another terminal.

$docker-compose down  
Stopping kafka_example_zookeeper_1 ... done  
Stopping kafka_example_kafka_1     ... done  
Removing kafka_example_zookeeper_1 ... done  
Removing kafka_example_kafka_1     ... done  
Removing network kafka_example_default  

Using the following command check the ZooKeeper logs to verify that ZooKeeper is working and healthy.

$docker-compose logs zookeeper | grep -i binding  

Next, check the Kafka logs to verify that broker is working and healthy.

$docker-compose logs kafka | grep -i started  

Two fundamental concepts in Apache Kafka are Topics and Partitions.

2. Create a Kafka topic

The Kafka cluster stores streams of records in categories called topics. Each record in a topic consists of a key, a value, and a timestamp. A topic can have zero, one, or many consumers that subscribe to the data written to it.

Use docker-compose exec to execute a command in a running container. For example, docker-compose exec command by default allocates a TTY, so that you can use such a command to get an interactive prompt. Go into directory where docker-compose.yml file present, and execute it as

$docker-compose exec kafka bash  

(for zookeeper $docker-compose exec zookeeper bash)

Change the directory to /opt/kafka/bin where you find scripts such as
cd /opt/kafka/bin

Create, list or delete existing topics:

bash-4.4# ./ \  
 --create \  
 --topic mytopic \  
 --partitions 1 \  
 --replication-factor 1 \  
 --bootstrap-server localhost:9092  

Figure 1. Kafka topic partitions layout (Image source

Kafka topics are divided into a number of partitions.

Each partition in a topic is an ordered, immutable sequence of records that continually appended.

bash-4.4# ./ \
 --list \
 --bootstrap-server localhost:9092  

If necessary, delete a topic using the following command.

bash-4.4# ./ \
 --delete \
 --topic mytopic \
 --bootstrap-server localhost:9092  

3a. Kafka Producer and Consumer - kafka from command line

A Kafka producer is an object that consists of a pool of buffer space that holds records that haven’t yet been transmitted to the server. Kafka consumers subscribe to one or more topics of interest and receive messages that are sent to those topics by producers.

Figure 2. Relationship between kafka components. Image source


Kafka broker (a.k.a Kafka server/node) is the server node in the cluster, mainly responsible for hosting partitions of Kafka Topics, transferring messages from Kafka Producer to Kafka Consumer and, providing data replication and partitioning within a Kafka Cluster.

The following is a producer command line to read data from standard input and write it to a Kafka Topic.

bash-4.4# ./ \
 --broker-list localhost:9092 \
 --topic mytopic

Reading data from a Kafka topic The following is a command line to read data from a Kafka topic and write it to standard output.

bash-4.4# ./ \
 --bootstrap-server localhost:9092 \
 --topic mytopic \
^CProcessed a total of 2 messages  

3b. Kafka Producer and Consumer - kafka from java web application

Another way of reading data from a Kafka topic is by simply using a Java Spring Boot.

The following demonstrates how to receive messages from Kafka Topic. First in this blog I create a Spring Kafka Consumer, which is able to listen the messages sent to a Kafka Topic. Then I create a Spring Kafka Producer, which is able to send messages to a Kafka Topic.

Figure 3. Kafka Producer and Consumer in Java (Source


Download Spring Tool Suite4 and install it.
At Eclipse IDE’s Package Explorer click “Create new Spring Starter Project” and
Name: SpringBootKafka
Project type: Maven
Spring Boot Version: 2.3.2
Search “kafka” at New Spring Starter Project Dependencies and select “Spring for Apache Kafka“
Click Finish. class (The Spring Initializr creates the following simple application class for you)

public class SpringBootKafkaApplication {  
	public static void main(String[] args) {, args);  

Configure Kafka through application.yml configuration file

In Spring Boot, properties are kept in the file under the classpath. The file is located in the src/main/resources directory. Change file to application.yml, then add the following content.

    bootstrap-servers: localhost:9092  
    group-id: group_test1  
    auto-offset-reset: earliest  
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  

Create a Spring Kafka Consumer class

Create a class called and add a method with the @KakfaListener annotation.

public class KafkaConsumer {  
	@KafkaListener(id = "group_test1", topics = "mytopic")  
	public void consumeMessage(String message) {  
		System.out.println("Consumed message: " + message);  

How to run Spring Boot web application in Eclipse?

In eclipse Project Explorer, right click the project name -> select “Run As” -> “Maven Build…”
In the goals, enter spring-boot:run
then click Run button.

If you have Spring Tool Suite (STS) plug-in, you see a “Spring Boot App” option under Run As.

Run the following console producer which will enable you to send messages to Kafka:

bash-4.4# ./ \
 --broker-list localhost:9092 \
 --topic mytopic

Try sending a few messages like above (Hello, World etc) and watch the application standard output in the Eclipse shell where you are running your Spring Boot application.

Eclipse Console:

  .   ____          _            __ _ _  
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \  
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \  
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )  
  '  |____| .__|_| |_|_| |_\__, | / / / /  

:: Spring Boot ::        (v2.2.4.RELEASE)  
2020-01-26 14:26:55.205  INFO 11137 --- [           main] c.e.d.SpringBootKafkaConsumerApplication : Starting   
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=simpleconsumer] Setting newly assigned partitions: test1-0  
2020-01-26 14:26:56.384  INFO 11137 --- [econsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=simpleconsumer] Found no committed offset for partition test1-0  
2020-01-26 14:26:56.408  INFO 11137 --- [econsumer-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-1, groupId=simpleconsumer] Resetting offset for partition test1-0 to offset 2.  
2020-01-26 14:26:56.477  INFO 11137 --- [econsumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : simpleconsumer: partitions assigned: [test1-0]  
Got message: hello  
Got message: world  

The following code demonstrates how to send and receive messages from Kafka Topic. The above receives messages that were sent to a Kafka Topic. The followng send messages to a Kafka Topic.

Make sure to have spring-web dependency to pom.xml.


Add two new java classes and

public class KafkaProducer {  
        private static final String TOPIC = "mytopic";  

        private KafkaTemplate<String, String> kafkaTemplate;  

        public void sendMessage(String message) {  
                kafkaTemplate.send(TOPIC, message);  
                System.out.println("Produced message: " + message);  
public class KafkaController {  
	private final KafkaProducer producer;  

	KafkaController(KafkaProducer producer){  
		this.producer = producer;  

	public void messagePrint(@RequestParam(value="message", required = false) String message) {  

Update application.yml file.

  port: 8080
    bootstrap-servers: localhost:9092
    group-id: group_test1
    auto-offset-reset: earliest
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
    bootstrap-servers: localhost:9092  
    key-serializer: org.apache.kafka.common.serialization.StringSerializer  
    value-serializer: org.apache.kafka.common.serialization.StringSerializer  

Run Spring Boot Web application (see How to run Spring Boot Web application in Eclipse?)

Make POST request using Postman.

Select POST and use the API http://localhost:8080/kafka/publish
Body: form-data KEY: message VALUE: hello

Finally click send.

See Eclipse Console for messages:

2020-01-27 13:12:06.911  INFO 31822 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1  
2020-01-27 13:12:06.912  INFO 31822 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01  
2020-01-27 13:12:06.912  INFO 31822 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1580148726911  
2020-01-27 13:12:06.947  INFO 31822 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 6R8O95IPSfGoifR4zzwM6g  
Produced message: hello  
Consumed message: hello

You can shut down docker-compose by executing the following command in another terminal.

bash-4.4# exit  

$docker-compose down  

Further reading…

The Power of Kafka Partitions : How to Get the Most out of Your Kafka Cluster