Learning Kafka

Let's start with

Why do we need Kafka?

You must be hearing "kafka" these days a lot. It has been common messaging system for many companies now a days. It started with LinkedIn in 2011 and later adopted by many companies like Twitter, Uber, Airbnb, Paypal, Spotify, Netflix - just to name a few.

It allows you to store large amount of data , redundantly , acts as a message bus with high throughput (in millions/sec) and helps in real time stream processing data that goea through it. Isn't it interesting???

We know tranferring data means messaging. Now let's understand the problems or challenges in this messaging world and how Kafka solves this problems.

Problem

Let's forget kafka for now, and understand what's the problem people were facing before kafka come into picture.

Messaging happens between Publisher and Subscriber via a broker.

  1. There can be limited scalability due to broker which becomes the bottleneck

    In pre-kafka era, broker was hosted on a single server which means local storage system was being used for all data on a machine only. If subscriber is slow , then data will get accumulated in broker machine for a long time. As a result broker may go down, breaking the messaging continuity.

  2. No scope for re-processing of data in case of application faults

    It is possible that the subscriber application has some bug which processed data wrong-way and you want to re-process the data after fixing the bug, but you can't do that as the subscriber already consumed the data from message queues in broker. As you know, in message queue, once the data is consumed, it gets popped/removed. One way to solve this problem is , the subscriber has to stash the consumed raw data and use that data for consumption after fixing the bug. But that leads to configuration change for subscriber. The whole thing becomes another task.

  3. Maintaining data consistency

    Different publishers may have different logic to write to a broker. There can be many layers between publishers and broker as well. For example, let's say in a billing system, account data is provided by two sources, one source provides initial details and another provides updated details, but one source was able to push the initial data to broker and another had some issue while sending the data. As a result updated account details couldn't reach to broker and subscriber consumed the data with initial details only.

And there are many more problems with pre-kafka era messaging system or enterprise messaging system.

How Kafka Solves these problems

Kafka is a distributed, horizonally scalable, fault tolerant streaming platform, which let's publish, subscribe to stream of records with retention of messages and process streams as they occur.

Let's understand each word one by one.

Distributed system means multiple running servers/machines works together in a cluster to appear as a single machine to the end user. The advantages of this is high-availability and fault-toleant. This means, if one machine or node goes down in a cluster, it will still work as if nothing happend, making the end user happy always.

Horizontally scalable

Let's understand vertical scalability first. You have a laptop. This is nothing but a machine. To increase the computing speed, you can add more RAM or you can upgrade your laptop hardware. But you can do this upto a certain limit , till your laptop supports. This is called vertical scalability. Also, this requires downtime. You won't be able to use your laptop during this upgradation. Now think about big organization. They will also face similar problem which they may not be able to afford.

Horizontal scalability solves this problem by throwing more machines all together. Adding a new machine will not have downtime. Also there is no limit of adding such new machines. All these machines will work together in a cluster and the performance will be increased exponentially. This is called horizontal scalability.

Fault-tolerant

Non-distributed system has a single point of failure (SPoF). If your database server is failed, then you are screwed. But in distributed system, the nodes are designed in such a way that even if one node fails, the cluster still works fine, making the whole system fault-tolerant.

Retention of Messages in Replicated way

In Kafka, the message get stored inside broker for a certain retention time. Messages are stored in commit log (also known as write-ahead log or transaction log) which is an ordered data structure and supports only append from left to write. This helps in sequencial reads by consumer providing ordering of message (Not completely though, we will learn about it later).

These reads and writes are independent of each other , that means writing doesn't lock the log . The same is with reading as well

In Kafka world, publisher becomes producer and subscriber becomes consumer.

In the next section, we will learn about Kafka architecture.

Kafka Architecture

Architecture

As we discussed in the last session, Kafka is basically a messaging system at the core. It works as Publisher-Subscriber model i.e. Pub-Sub Model.

Publisher/producer is an application which generates event at one end of data pipeline.

Subscriber /consumer is an application which consumes the events at the other end of the data pipeline.

In between these two components there is server/machine/nodes which stores the data and keep it ready for consumption. In Kafka, we call it broker.

Now there can be multiple broker acts together in a Kafka cluster.

So let's discuss more about this kafka cluster

Kafka Cluster may consist of 1 or more brokers.

We can say,

Cluster size = no. of brokers present in the cluster.

In the diagram, we can see, there are 4 brokers present. Hence the size of the cluster is 4.

Now these brokers may present in same machine or in different machines.

In the diagram, broker 1 and broker 2 are present in different machines having their own CPU/RAM.

whereas broker 3 and broker 4 present in the same machine sharing system resources.

These brokers in the cluster are differenetiated based on the ID which is nothing but a unique no.

We can see in the diagram, there is a topic which is spread across two brokers i.e. broker1 and broker 2

The topic stores the data for a default period of time, called retention time (by default, it is 7 days in production. If you set it to -1, the data will be stored in kafka topic forever)

A topic can have zero, one, or many consumers, that subscribe to the data written to it.

These topics can contain one or multiple partitions. You can define no. of partitions while creating a topic. Even you can alter no. of partitions in a topic later.

In the diagram, you can see there are two partitions in topic 1, but each partition resides in a broker as a whole. That means, a partition is not spread across the brokers, where the brokers are present in different machines. This saves network input-output (I/O).

These partitions increases parallel processing. It is one of the best practices to define no. of consumers as per the no. of partitions, so that one consumer will consume data from one partition. We will come to this design later.

No. of consumers = No. of Partitions

Each partition is an ordered, immutable sequence of records that is continuosly appended to a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

The messages present inside a partition are ordered. but this ordering is not guaranteed across partitions.

So, if a topic has one partition, then the ordering of messages is maintained.

Now let's understand how distributed processing is handled in Kafka.

Distributed Processing

Distributed processing becomes possible, because of kafka cluster. Kafka uses Apache Zookeeper to maintain the list of brokers present inside a cluster. When a broker starts, it registers itself with it's ID in zookeeper by creating an ephimeral node. Different kafka components subscribe to the /brokers/ids path in zookeeper where brokers are registered so that they get notified when brokers are added or removed.

So, there is a unique id corresponding to an ephemeral node. As a result, if you start a broker with same id, then you will get an error.

The Controller

The controller is one of the brokers, which is responsible for electing partition leaders. We will discuss about partition leaders later.

The first broker that starts in the cluster becomes the controller by creating an ephimeral node in zookeeper called /controller. When other broker starts and tries to create a controller, they get error saying "cluster has already a controller". These brokers then create a zookeeper watch on the controller node so that they get notified of changes to this node. As a result, when controller broker stopped or loses connectivity to zookeeper, other brokers get notfied and tries to create controller node in zookeeper. The first broker to create the controller, becomes the new controller of the cluster.

Now let's undersand the replication.

Replication

Replication is at the heart of Kafka's architecture. It guarantees availability and durability when a broker stopped or failed.

As we discussed earlier, data in kafka is organized by topics. Each topic is partitioned and each partition can have multiple replica. While creating a topic, one can provide the replication factor for each partition present in the topic. Those replicas are stored on brokers and each broker typically stores hundreds of replicas belonging to different topics and partitions.

There are two types of replicas

  1. Leader replica

    Each partition has a single replica designated as the LEADER. All produce and consume request goes through the leader in order to guarantee consistency.

  2. Follower replica

    All replicas for a partition that are not leaders are called followers. Their only job is to copy messages from the leader. They don't serve client requests.

In the diagram, we can see topic1 has 4 partitions , part 1, part 2 , part 3 and part 4

Each partition has 3 replication present in different brokers for high availability.

topic1-part1 has leader replica in broker 1 and follower replicas reside in broker 2 and 3.

Similarly topic1-part2 has leader replica in broker 2 and follower replicas are present in broker 3 and 4.

Replication factor 3 means, there are total 3 no. of copies same partiton. In the diagram shown, the replication factor is 3.

The replica status is maintained in the zookeeper path

/brokers/topics/[topic]/partitions/[partition]/state

Another task of leader is knowing which of the follower replicas are up-to-date. The replicas which are in sync called in-syn replicas or ISR. Only in-sync replicas are elligible to be elected as partition leader in case the existing leader fails. Let's know more what happens when a broker fails.

When a broker left the cluster, the controller notices by watching the relevant zookeeper path. Now that broker may contain some partitions which are leader replica and some partitions which are follower replica. The controller now goes over all the partitions that need a new leader. At the time, it checks in-sync replicas for these leader replicas and make one of them as leader replica. Now the choosen leader knows that it needs to serve client requests while the followers know that they need to start replicating messages from the new leader.

Similarly when a broker joins the cluster, the controller notices and uses broker ID to check if there are replicas that exist on this broker. If there are any, the controller notifies both new and existing brokers of the change and replicas on the new broker start replicating messages from the existing leader.

Installation and Set up

Download the kafka from this location, by clicking on kafka_2.11-2.3.1.tgz

http://archive.apache.org/dist/kafka/2.3.1/

This says, scala version used is 2.11 and kafka version is 2.3.1

If you are using scala version 2.12, you can click on kafka_2.12-2.3.1.tgz file

Extract the file

tar -xvf kafka_2.11-2.3.1.tgz

Make a directory for kafka and Zookeeper data

mkdir $HOME/data-kafka
mkdir $HOME/data-zk

Go to config directory inside the extracted directory

cd kafka_2.11-2.3.1/config

Open zookeeper.properties and change dataDir to data-zk path

Open server.properties and change log.dirs to data-kafka path

check server.properties file for important default configuration

Let's set KAFKA_HOME in .bashrc file to kafka_2.11-2.3.1

Open .bashrc file and

export KAFKA_HOME=<path-to-kafka-dir>/kafka_2.11-2.3.1
export PATH=$PATH:$KAFKA_HOME/bin
source .bashrc

Now, go to KAFKA_HOME directory

cd $KAFKA_HOME

Start Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal and start broker

bin/kafka-server-start.sh config/server.properties

Open another terminal and create a topic called "words"

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic words

You can list the topics to check topic is created or not

bin/kafka-topics.sh --list --zookeeper localhost:2181

Also, you can check the no. of partitions, replication factor etc. using describe command like below

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic words

Start a console producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic words

and write some inputs

Hello World
I'm from India

Open another terminal and Start a console consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic words --from-beginning

You can see words input from producer in console

Now you can go to producer terminal and see the same in consumer terminal.

Congrats!!! you have successfully set up Kafka in your system.

Additional Note :

For windows , you can follow similar commands like below

# Open a command prompt and run
> bin\windows\zookeeper-server-start.bat
config\zookeeper.properties
# Open another command prompt and run
> bin\windows\kafka-server-start.bat
config\server.properties
# Open another command prompt and run
> bin\windows\kafka-topics.bat --create --zookeeper
localhost:2181 --replication-factor 1 --partitions 1 --topic words
> bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 words
> bin\windows\kafka-console-producer.bat --broker-list
localhost:9092 --topic words
# Whatever you type here will be visible in consumer prompt
Hello World
I am from India

# Open another command prompt and run
> bin\windows\kafka-console-consumer.bat --bootstrap-server
localhost:9092 --topic words --from-beginning
# Whatever is typed in producer prompt, will be shown here
Hello World
I am from India

Additional Setup

Multiple Partitions

In the last session, we have seen topics with single partition.

In this session, let's create a topic with multiple partitions.

cd $KAFKA_HOME
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic words2

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic words2

You can see multiple partitions listed with leader replica.

Multiple Brokers in a single machine

As there is only one broker, we can't increase replication factor.

Now, let's create multiple brokers

create below directories

mkdir -p $HOME/data-kafka-1
mkdir -p $HOME/data-kafka-2
mkdir -p $HOME/data-kafka-3
mkdir $HOME/data-zk #create this directory from scratch

Create 3 server.properties for each broker inside config directory of Kafka installation folder

cd $KAFKA_HOME/config
cp server.properties server1.properties
cp server1.properties server2.properties
cp server1.properties server3.properties

Now open server1.properties and make below changes

broker.id=1
port=9092
log.dir=$HOME/data-kafka-1

Now open server2.properties and make below changes

broker.id=2
port=9093
log.dir=$HOME/data-kafka-2

Now open server3.properties and make below changes

broker.id=3
port=9094
log.dir=$HOME/data-kafka-3

Start Zookeeper

cd $KAFKA_HOME
bin/zookeeper-server-start.sh config/zookeeper.properties

Now you can start each kafka broker in each console

Start broker 1

# Start the first broker in its own terminal session
$ env JMX_PORT=9999  bin/kafka-server-start.sh config/server1.properties

Open another terminal and Start broker 2

# Start the second broker in its own terminal session
$ env JMX_PORT=10000 bin/kafka-server-start.sh config/server2.properties

Open another terminal and Start broker 3

# Start the third broker in its own terminal session
$ env JMX_PORT=10001 bin/kafka-server-start.sh config/server3.properties

Here is a summary of the configured network interfaces and ports that the brokers will listen on:

        Broker 1     Broker 2      Broker 3
        ----------------------------------------------
Kafka   *:9092/tcp   *:9093/tcp    *:9094/tcp
JMX     *:9999/tcp   *:10000/tcp   *:10001/tcp

Now let's create a topic with replication factor 3, as that's the maximum replication we can have in 3 brokers

Open and another terminal and run

cd $KAFKA_HOME
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic words3

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic words3

Now let's produce some data to kafka

bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic words3

and give some input

Open another terminal and start consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic words3 --from-beginning

You can see input from producers in consumer console.

Now we have completed set up for multible kafka brokers in a single machine.

Producers

Producers produces data to a specific topic. Also they can choose the partition, to which they will publish the data. Producer does this using Partitioner, which uses either round-robin fashion to balance the load or using a hash function. The partitioner ensures same non-empty key will be sent to the same partition. If there is no key or key is NULL, the partition is selected as per round-robin fashion by default.

As we discussed in last session, all producer request goes through leader replica. Depending upon producer configuration, the throughput, latency and durability of message delivery is decided.

The number of acknowledgments the producer requires the leader to have received before considering a request complete is considered as acks level.

Acks

Description

Throughput

Latency

Durability

0

No Acknowledgments

high

low

No guarentee

1

Only Leader acks

medium

medium

leader

-1

for full ISR

low

high

ISR

Demo

Now we will see how to write a simple producer which will publish messages to kafka using Java.

Let's create a kafka-topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic words

To start a console consumer, which will check the published data to kafka broker

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic words --from-beginning

NB: Please refer to previous session to start Zookeeper, kafka brokers before running application

Consumers

Consumers belong to a consumer group in kafka. The consumers in a consumer group co-operates to consume data from same topic.

As we discussed earlier,

No. of Partitions = No. of Consumers

Now this rule is a bit modified. The more correct formula is :

No. of Partitions = No. of Consumers in a Consumer Group

In the diagram, there are two server or broker in kafka cluster, hosting 4 partitions of a topic.

Four Partitions are P0, P1, P2, P3

Now there are two consumer groups. Consumer Group A has two consumers, where consumer group B has 4 consumers.

The way consumption is implemented is by dividing the partitions over the consumer instances in a consumer group.

For Consumer group A,

all data of partition P0 and P3 are consumed by C1 consumer instance and all data of P1 and P2 are consumed by Consumer C2.

But for Consumer Group B, each partition is mapped to each consumer.

This means, a partition data can not be consumed by two different consumer instances belong to same consumer group.

Last updated

Was this helpful?