Learning Kafka
Let's start with
Why do we need Kafka?
You must be hearing "kafka" these days a lot. It has been common messaging system for many companies now a days. It started with LinkedIn in 2011 and later adopted by many companies like Twitter, Uber, Airbnb, Paypal, Spotify, Netflix - just to name a few.
It allows you to store large amount of data , redundantly , acts as a message bus with high throughput (in millions/sec) and helps in real time stream processing data that goea through it. Isn't it interesting???
We know tranferring data means messaging. Now let's understand the problems or challenges in this messaging world and how Kafka solves this problems.
Problem
Let's forget kafka for now, and understand what's the problem people were facing before kafka come into picture.
Messaging happens between Publisher and Subscriber via a broker.
There can be limited scalability due to broker which becomes the bottleneck
In pre-kafka era, broker was hosted on a single server which means local storage system was being used for all data on a machine only. If subscriber is slow , then data will get accumulated in broker machine for a long time. As a result broker may go down, breaking the messaging continuity.
No scope for re-processing of data in case of application faults
It is possible that the subscriber application has some bug which processed data wrong-way and you want to re-process the data after fixing the bug, but you can't do that as the subscriber already consumed the data from message queues in broker. As you know, in message queue, once the data is consumed, it gets popped/removed. One way to solve this problem is , the subscriber has to stash the consumed raw data and use that data for consumption after fixing the bug. But that leads to configuration change for subscriber. The whole thing becomes another task.
Maintaining data consistency
Different publishers may have different logic to write to a broker. There can be many layers between publishers and broker as well. For example, let's say in a billing system, account data is provided by two sources, one source provides initial details and another provides updated details, but one source was able to push the initial data to broker and another had some issue while sending the data. As a result updated account details couldn't reach to broker and subscriber consumed the data with initial details only.
And there are many more problems with pre-kafka era messaging system or enterprise messaging system.
How Kafka Solves these problems
Kafka is a distributed, horizonally scalable, fault tolerant streaming platform, which let's publish, subscribe to stream of records with retention of messages and process streams as they occur.
Let's understand each word one by one.
Distributed system means multiple running servers/machines works together in a cluster to appear as a single machine to the end user. The advantages of this is high-availability and fault-toleant. This means, if one machine or node goes down in a cluster, it will still work as if nothing happend, making the end user happy always.
Horizontally scalable
Let's understand vertical scalability first. You have a laptop. This is nothing but a machine. To increase the computing speed, you can add more RAM or you can upgrade your laptop hardware. But you can do this upto a certain limit , till your laptop supports. This is called vertical scalability. Also, this requires downtime. You won't be able to use your laptop during this upgradation. Now think about big organization. They will also face similar problem which they may not be able to afford.
Horizontal scalability solves this problem by throwing more machines all together. Adding a new machine will not have downtime. Also there is no limit of adding such new machines. All these machines will work together in a cluster and the performance will be increased exponentially. This is called horizontal scalability.
Fault-tolerant
Non-distributed system has a single point of failure (SPoF). If your database server is failed, then you are screwed. But in distributed system, the nodes are designed in such a way that even if one node fails, the cluster still works fine, making the whole system fault-tolerant.
Retention of Messages in Replicated way
In Kafka, the message get stored inside broker for a certain retention time. Messages are stored in commit log (also known as write-ahead log or transaction log) which is an ordered data structure and supports only append from left to write. This helps in sequencial reads by consumer providing ordering of message (Not completely though, we will learn about it later).
These reads and writes are independent of each other , that means writing doesn't lock the log . The same is with reading as well
In Kafka world, publisher becomes producer and subscriber becomes consumer.
In the next section, we will learn about Kafka architecture.
Kafka Architecture
Architecture
As we discussed in the last session, Kafka is basically a messaging system at the core. It works as Publisher-Subscriber model i.e. Pub-Sub Model.

Publisher/producer is an application which generates event at one end of data pipeline.
Subscriber /consumer is an application which consumes the events at the other end of the data pipeline.
In between these two components there is server/machine/nodes which stores the data and keep it ready for consumption. In Kafka, we call it broker.
Now there can be multiple broker acts together in a Kafka cluster.
So let's discuss more about this kafka cluster
Kafka Cluster may consist of 1 or more brokers.
We can say,
Cluster size = no. of brokers present in the cluster.
In the diagram, we can see, there are 4 brokers present. Hence the size of the cluster is 4.
Now these brokers may present in same machine or in different machines.
In the diagram, broker 1 and broker 2 are present in different machines having their own CPU/RAM.
whereas broker 3 and broker 4 present in the same machine sharing system resources.
These brokers in the cluster are differenetiated based on the ID which is nothing but a unique no.
We can see in the diagram, there is a topic which is spread across two brokers i.e. broker1 and broker 2
The topic stores the data for a default period of time, called retention time (by default, it is 7 days in production. If you set it to -1, the data will be stored in kafka topic forever)
A topic can have zero, one, or many consumers, that subscribe to the data written to it.
These topics can contain one or multiple partitions. You can define no. of partitions while creating a topic. Even you can alter no. of partitions in a topic later.
In the diagram, you can see there are two partitions in topic 1, but each partition resides in a broker as a whole. That means, a partition is not spread across the brokers, where the brokers are present in different machines. This saves network input-output (I/O).

These partitions increases parallel processing. It is one of the best practices to define no. of consumers as per the no. of partitions, so that one consumer will consume data from one partition. We will come to this design later.
No. of consumers = No. of Partitions
Each partition is an ordered, immutable sequence of records that is continuosly appended to a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
The messages present inside a partition are ordered. but this ordering is not guaranteed across partitions.
So, if a topic has one partition, then the ordering of messages is maintained.
Now let's understand how distributed processing is handled in Kafka.
Distributed Processing
Distributed processing becomes possible, because of kafka cluster. Kafka uses Apache Zookeeper to maintain the list of brokers present inside a cluster. When a broker starts, it registers itself with it's ID in zookeeper by creating an ephimeral node. Different kafka components subscribe to the /brokers/ids path in zookeeper where brokers are registered so that they get notified when brokers are added or removed.
So, there is a unique id corresponding to an ephemeral node. As a result, if you start a broker with same id, then you will get an error.
The Controller
The controller is one of the brokers, which is responsible for electing partition leaders. We will discuss about partition leaders later.
The first broker that starts in the cluster becomes the controller by creating an ephimeral node in zookeeper called /controller. When other broker starts and tries to create a controller, they get error saying "cluster has already a controller". These brokers then create a zookeeper watch on the controller node so that they get notified of changes to this node. As a result, when controller broker stopped or loses connectivity to zookeeper, other brokers get notfied and tries to create controller node in zookeeper. The first broker to create the controller, becomes the new controller of the cluster.
Now let's undersand the replication.
Replication
Replication is at the heart of Kafka's architecture. It guarantees availability and durability when a broker stopped or failed.

As we discussed earlier, data in kafka is organized by topics. Each topic is partitioned and each partition can have multiple replica. While creating a topic, one can provide the replication factor for each partition present in the topic. Those replicas are stored on brokers and each broker typically stores hundreds of replicas belonging to different topics and partitions.
There are two types of replicas
Leader replica
Each partition has a single replica designated as the LEADER. All produce and consume request goes through the leader in order to guarantee consistency.
Follower replica
All replicas for a partition that are not leaders are called followers. Their only job is to copy messages from the leader. They don't serve client requests.
In the diagram, we can see topic1 has 4 partitions , part 1, part 2 , part 3 and part 4
Each partition has 3 replication present in different brokers for high availability.
topic1-part1 has leader replica in broker 1 and follower replicas reside in broker 2 and 3.
Similarly topic1-part2 has leader replica in broker 2 and follower replicas are present in broker 3 and 4.
Replication factor 3 means, there are total 3 no. of copies same partiton. In the diagram shown, the replication factor is 3.
The replica status is maintained in the zookeeper path
/brokers/topics/[topic]/partitions/[partition]/state
Another task of leader is knowing which of the follower replicas are up-to-date. The replicas which are in sync called in-syn replicas or ISR. Only in-sync replicas are elligible to be elected as partition leader in case the existing leader fails. Let's know more what happens when a broker fails.

When a broker left the cluster, the controller notices by watching the relevant zookeeper path. Now that broker may contain some partitions which are leader replica and some partitions which are follower replica. The controller now goes over all the partitions that need a new leader. At the time, it checks in-sync replicas for these leader replicas and make one of them as leader replica. Now the choosen leader knows that it needs to serve client requests while the followers know that they need to start replicating messages from the new leader.
Similarly when a broker joins the cluster, the controller notices and uses broker ID to check if there are replicas that exist on this broker. If there are any, the controller notifies both new and existing brokers of the change and replicas on the new broker start replicating messages from the existing leader.
Installation and Set up
Download the kafka from this location, by clicking on kafka_2.11-2.3.1.tgz
http://archive.apache.org/dist/kafka/2.3.1/
This says, scala version used is 2.11 and kafka version is 2.3.1
If you are using scala version 2.12, you can click on kafka_2.12-2.3.1.tgz file
Extract the file
tar -xvf kafka_2.11-2.3.1.tgzMake a directory for kafka and Zookeeper data
mkdir $HOME/data-kafka
mkdir $HOME/data-zkGo to config directory inside the extracted directory
cd kafka_2.11-2.3.1/configOpen zookeeper.properties and change dataDir to data-zk path
Open server.properties and change log.dirs to data-kafka path
check server.properties file for important default configuration
Let's set KAFKA_HOME in .bashrc file to kafka_2.11-2.3.1
Open .bashrc file and
export KAFKA_HOME=<path-to-kafka-dir>/kafka_2.11-2.3.1
export PATH=$PATH:$KAFKA_HOME/binsource .bashrcNow, go to KAFKA_HOME directory
cd $KAFKA_HOMEStart Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.propertiesOpen another terminal and start broker
bin/kafka-server-start.sh config/server.propertiesOpen another terminal and create a topic called "words"
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsYou can list the topics to check topic is created or not
bin/kafka-topics.sh --list --zookeeper localhost:2181Also, you can check the no. of partitions, replication factor etc. using describe command like below
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic wordsStart a console producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsand write some inputs
Hello World
I'm from IndiaOpen another terminal and Start a console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic words --from-beginningYou can see words input from producer in console
Now you can go to producer terminal and see the same in consumer terminal.
Congrats!!! you have successfully set up Kafka in your system.
Additional Note :
For windows , you can follow similar commands like below
# Open a command prompt and run
> bin\windows\zookeeper-server-start.bat
config\zookeeper.properties
# Open another command prompt and run
> bin\windows\kafka-server-start.bat
config\server.properties
# Open another command prompt and run
> bin\windows\kafka-topics.bat --create --zookeeper
localhost:2181 --replication-factor 1 --partitions 1 --topic words
> bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 words
> bin\windows\kafka-console-producer.bat --broker-list
localhost:9092 --topic words
# Whatever you type here will be visible in consumer prompt
Hello World
I am from India
# Open another command prompt and run
> bin\windows\kafka-console-consumer.bat --bootstrap-server
localhost:9092 --topic words --from-beginning
# Whatever is typed in producer prompt, will be shown here
Hello World
I am from IndiaAdditional Setup
Multiple Partitions
In the last session, we have seen topics with single partition.
In this session, let's create a topic with multiple partitions.
cd $KAFKA_HOME
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic words2
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic words2You can see multiple partitions listed with leader replica.
Multiple Brokers in a single machine
As there is only one broker, we can't increase replication factor.
Now, let's create multiple brokers
create below directories
mkdir -p $HOME/data-kafka-1
mkdir -p $HOME/data-kafka-2
mkdir -p $HOME/data-kafka-3
mkdir $HOME/data-zk #create this directory from scratchCreate 3 server.properties for each broker inside config directory of Kafka installation folder
cd $KAFKA_HOME/config
cp server.properties server1.properties
cp server1.properties server2.properties
cp server1.properties server3.propertiesNow open server1.properties and make below changes
broker.id=1
port=9092
log.dir=$HOME/data-kafka-1Now open server2.properties and make below changes
broker.id=2
port=9093
log.dir=$HOME/data-kafka-2Now open server3.properties and make below changes
broker.id=3
port=9094
log.dir=$HOME/data-kafka-3Start Zookeeper
cd $KAFKA_HOME
bin/zookeeper-server-start.sh config/zookeeper.propertiesNow you can start each kafka broker in each console
Start broker 1
# Start the first broker in its own terminal session
$ env JMX_PORT=9999 bin/kafka-server-start.sh config/server1.propertiesOpen another terminal and Start broker 2
# Start the second broker in its own terminal session
$ env JMX_PORT=10000 bin/kafka-server-start.sh config/server2.propertiesOpen another terminal and Start broker 3
# Start the third broker in its own terminal session
$ env JMX_PORT=10001 bin/kafka-server-start.sh config/server3.propertiesHere is a summary of the configured network interfaces and ports that the brokers will listen on:
Broker 1 Broker 2 Broker 3
----------------------------------------------
Kafka *:9092/tcp *:9093/tcp *:9094/tcp
JMX *:9999/tcp *:10000/tcp *:10001/tcpNow let's create a topic with replication factor 3, as that's the maximum replication we can have in 3 brokers
Open and another terminal and run
cd $KAFKA_HOME
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic words3
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic words3Now let's produce some data to kafka
bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic words3and give some input
Open another terminal and start consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic words3 --from-beginningYou can see input from producers in consumer console.
Now we have completed set up for multible kafka brokers in a single machine.
Producers
Producers produces data to a specific topic. Also they can choose the partition, to which they will publish the data. Producer does this using Partitioner, which uses either round-robin fashion to balance the load or using a hash function. The partitioner ensures same non-empty key will be sent to the same partition. If there is no key or key is NULL, the partition is selected as per round-robin fashion by default.
As we discussed in last session, all producer request goes through leader replica. Depending upon producer configuration, the throughput, latency and durability of message delivery is decided.
The number of acknowledgments the producer requires the leader to have received before considering a request complete is considered as acks level.
Acks
Description
Throughput
Latency
Durability
0
No Acknowledgments
high
low
No guarentee
1
Only Leader acks
medium
medium
leader
-1
for full ISR
low
high
ISR
Demo
Now we will see how to write a simple producer which will publish messages to kafka using Java.
Let's create a kafka-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic wordsTo start a console consumer, which will check the published data to kafka broker
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic words --from-beginningNB: Please refer to previous session to start Zookeeper, kafka brokers before running application
Consumers
Consumers belong to a consumer group in kafka. The consumers in a consumer group co-operates to consume data from same topic.
As we discussed earlier,
No. of Partitions = No. of Consumers
Now this rule is a bit modified. The more correct formula is :
No. of Partitions = No. of Consumers in a Consumer Group

In the diagram, there are two server or broker in kafka cluster, hosting 4 partitions of a topic.
Four Partitions are P0, P1, P2, P3
Now there are two consumer groups. Consumer Group A has two consumers, where consumer group B has 4 consumers.
The way consumption is implemented is by dividing the partitions over the consumer instances in a consumer group.
For Consumer group A,
all data of partition P0 and P3 are consumed by C1 consumer instance and all data of P1 and P2 are consumed by Consumer C2.
But for Consumer Group B, each partition is mapped to each consumer.
This means, a partition data can not be consumed by two different consumer instances belong to same consumer group.
Last updated
Was this helpful?