# Kafka

## Diff with other solutions

* vs RabbitMQ: Kafka has \~1000x better performance (as it uses unidirectional log without handshake)
* vs Redis: Redis does not handle streaming data nor has a history of past messages
* vs Amazon SQS: mainly still performance is worse + it is not cheap
* vs Amazon SNS: very similar, but more expensive
* vs Azure Streaming Services: this is not exactly meant for pub/sub in general
* vs Google pub/sub: same as Amazon SNS, but more expensive

## How it is distributed

* Confluent Cloud
* Cloudera has its own version
* Strimzi offers Helm charts to deploy Kafka to K8s

## Architecture

<figure><img src="/files/rgcOFAaUFavjXlSHU8MF" alt=""><figcaption></figcaption></figure>

### Zookeeper

* Zookeeper manages brokers (keeps list of them)
* Zookeeper helps in performing leader election for partitions
* Zookeeper sends notifications to Kafka in case of changes (new topic, broker dies/comes\_up)
* Kafka 2.x can't work without Zookeeper
* Kafka 3.x can work without Zookeeper (KIP-500) - using Kafka Raft instead
* Kafka 4.x will not have Zookeeper
* Zookeeper by design operates with an odd number of servers (1,3,5,7)

### Topic vs Queue

Topic has a retention configuration:

* Days: keep messages of up to N days and then delete
* Bytes: keep messages until the topic has more than X bytes and then delete
* None: store all messages from the beginning of time

### Kafka Broker Discovery

<figure><img src="/files/XQw6Zhiuho48fNomGF5Q" alt=""><figcaption></figcaption></figure>

Each Kafka broker is also called "bootstrap server". Each broker knows about all brokers, tpics and partitions (metadata).

### Partitioning

<figure><img src="/files/skJRuSTmkNcP2O5x1u4Z" alt=""><figcaption></figcaption></figure>

Messages are <mark style="background-color:orange;">ordered</mark> only within a partition, not across partitions.&#x20;

Each message in a partition has an id (<mark style="background-color:orange;">offset</mark>) which is always incremental.

Once data is writtem to a partition, it. can not be changed (<mark style="background-color:orange;">immutability</mark>).

The <mark style="background-color:orange;">partitioner</mark> uses the <mark style="background-color:orange;">key to distribute</mark> the messages to the correct <mark style="background-color:orange;">partition</mark>.

Consumers read from partitions and have an offset per partition.

More partitions enable more consumers => scalability

#### Example data distribution

E.g. Topic-A has 3 partitions, Topic-B has 2 partitions.

<figure><img src="/files/OCEkfLWtvnT53pOsZfxR" alt=""><figcaption></figcaption></figure>

### Replication

Topics in PROD should have replication factor > 1 (3 is better). E.g. Topic-A has 2 partitions and replication factor 2.

<figure><img src="/files/YdGJshMXhyE0pb7hYnr8" alt=""><figcaption></figcaption></figure>

{% hint style="danger" %}
Topic durability:&#x20;

For Replication factor of N, you can permanently lose N-1 brokers and still recover your data.
{% endhint %}

### Leader for a Partition

* At any time only ONE broker can be a leader for a given partition
* Producers can only send data to the broker that is a leader of a partition
* Consumers will read only from leader (since v2.4+ can read from replica as well)
* the other brokers will replicate the data
* \=> each partition has one leader and multiple ISR (in-sync replica)

<figure><img src="/files/UJDgP1s1wkjr4fewVlMW" alt=""><figcaption></figcaption></figure>

## Producer

<figure><img src="/files/KDzBy6ok9VkOxSjyK54g" alt=""><figcaption></figcaption></figure>

```
producer ->
                topic-A/partition-0
                topic-A/partition-1
                
optional Message key helps to find a partition for the message.

if (key == null) use round robin
else messages with same key go to particular partition (hashing)
```

<figure><img src="/files/fkSYvBltI0B5igJoFtWS" alt="" width="375"><figcaption></figcaption></figure>

### Producer delivery semantics (acks)

Producers can choose to receive acknowledgment of data writes:

* **at most once, acks=0:** Producer won't wait for the ack (possible data loss)

<figure><img src="/files/1tFoRozh5HlBNeyrfoj5" alt=""><figcaption><p>what can go wrong</p></figcaption></figure>

* **at least once, acks=1:** Producer will wait for the leader ack (limited data loss)
  * If no acknowledgment is received for the message sent, then the producer will retry sending the messages based on retry configuration. Retries property by default is 0 make sure this is set to desired number or Max.INT.

<details>

<summary>Idempotent producer</summary>

as the producer writes messages to the broker in batches, if that write fails and the producer retries, messages within the batch may be written more than once in Kafka.

**However, to avoid duplication, Kafka introduced the feature of the idempotent producer.**

Essentially, in order to enable at-least-once semantics in Kafka, we’ll need to:

* **set  the property “*****ack*****” to value&#x20;*****“1”*****&#x20;on the producer side**
* **set “*****enable.auto.commit*****” property to value “*****false*****” on the consumer side.**
* **set “*****enable.idempotence*****” property to value “*****true*****“**
* **attach the sequence number and producer id to each message from the producer**

Kafka Broker can identify the message duplication on a topic using the sequence number and producer id.

</details>

* acks=all: Leader + replicas acks (no data loss)
  * there is important property `min.insync.replicas` if, replication factor = 3, then set property to 2, so that kafka can tolerate failure of 1 broker.
  * **always deliver all messages without duplication,**

## Consumer

{% hint style="warning" %}
Producers and consumers are using serializers/deserializers for the message. If data types of the message change => create a new topic!
{% endhint %}

* Consumers act as a group via the consumer group id
* Each consumer in the group gets assigned a partition, and a partitions is not a shared by two members of a group
  * in N of consumers > N of partitions => some consumers will idle
  * How several applications can read from the same topic:
    \*

    ```
    <figure><img src="/files/zNPJAxJtGffcaFLt7yQe" alt=""><figcaption></figcaption></figure>
    ```
* A rebalance in group triggers when a member leaves the group or they have not sent a heartbeat in a long time
* A rebalance is a stop the world event that ensures all partitions are attended by some consumer in the group

## [Kafka connect](https://docs.confluent.io/platform/current/connect/kafka_connectors.html)

* Source Connectors - transfer data from a source to Kafka
* Sink Connectors - from Kafka to a Sink

### Standalone

<figure><img src="/files/iyrOsA1UEMQc6taTy6Ph" alt=""><figcaption></figcaption></figure>

### Consumer delivery semantics

* <mark style="background-color:orange;">At least once</mark> (usually preferred)
  * offsets are committed after the message is processed
  * if processing goes wrong, the message will be read again
    * can be duplicate processing of the message => make processing idempotent

<details>

<summary>Idempotent consumers</summary>

**In order to avoid processing the same event multiple times, we can use idempotent consumers**.

Essentially an idempotent consumer can consume a message multiple times but only processes it once.

**The combination of the following approaches enables idempotent consumers in at-least-once delivery:**

* The <mark style="background-color:orange;">producer</mark> assigns a unique *<mark style="background-color:orange;">messageId</mark>* to each message.
* The <mark style="background-color:orange;">consumer</mark> maintains a record of all <mark style="background-color:orange;">processed messages in a database</mark>.
* When a new message arrives, the <mark style="background-color:orange;">consumer checks</mark> it against the <mark style="background-color:orange;">existing</mark> <mark style="background-color:orange;"></mark>*<mark style="background-color:orange;">messageId</mark>*<mark style="background-color:orange;">s</mark> in the persistent storage table.
* In case there’s a <mark style="background-color:orange;">match, the consumer updates the offset without re-consuming, sends the acknowledgment back, and effectively marks the message as consumed</mark>.
* When the event is <mark style="background-color:orange;">not</mark> already <mark style="background-color:orange;">present</mark>, a database transaction is started, and a new *<mark style="background-color:orange;">messageId</mark>* <mark style="background-color:orange;"></mark><mark style="background-color:orange;">is inserted</mark>. Next, this new message is processed based on whatever business logic is required. On completion of message processing, the transaction’s finally committed

</details>

* <mark style="background-color:orange;">At most once</mark>
  * offsets committed once a message is received
  * if processing fails, message will be lost (they won't be read again)
  * In order to enable At-Most-Once semantics in Kafka, we’ll need to set “enable.auto.commit” to “true” at the consumer.
* <mark style="background-color:orange;">Exactly once</mark>

## Local deploy

<details>

<summary>3 zookeepers 3 brokers</summary>

```docker
---
version: '3'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:7.4.1
    hostname: zookeeper-1
    container_name: zookeeper-1
    volumes:
      - ./zookeeper-1_data:/var/lib/zookeeper/data
      - ./zookeeper-1_log:/var/lib/zookeeper/log
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181

  zookeeper-2:
    image: confluentinc/cp-zookeeper:7.4.1
    hostname: zookeeper-2
    container_name: zookeeper-2
    volumes:
      - ./zookeeper-2_data:/var/lib/zookeeper/data
      - ./zookeeper-2_log:/var/lib/zookeeper/log
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181

  zookeeper-3:
    image: confluentinc/cp-zookeeper:7.4.1
    hostname: zookeeper-3
    container_name: zookeeper-3
    volumes:
      - ./zookeeper-3_data:/var/lib/zookeeper/data
      - ./zookeeper-3_log:/var/lib/zookeeper/log
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181


  broker-1:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker-1
    container_name: broker-1
    volumes:
      - ./broker-1-data:/var/lib/kafka/data
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,INTERNAL://broker-1:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: HOST:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_SNAPSHOT_TRUST_EMPTY: true

  broker-2:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker-2
    container_name: broker-2
    volumes:
      - ./broker-2-data:/var/lib/kafka/data
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - broker-1
    ports:
      - 9093:9093
      - 29093:29093
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9093,INTERNAL://broker-2:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: HOST:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_SNAPSHOT_TRUST_EMPTY: true

  broker-3:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker-3
    container_name: broker-3
    volumes:
      - ./broker-3-data:/var/lib/kafka/data
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - broker-1
      - broker-2
    ports:
      - 9094:9094
      - 29094:29094
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9094,INTERNAL://broker-3:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: HOST:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_SNAPSHOT_TRUST_EMPTY: true


  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.4.1
    ports:
      - "8082:8082"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - broker-1
      - broker-2
      - broker-3
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker-1:29092,broker-2:29093,broker-3:29094'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"

```

</details>

{% code fullWidth="true" %}

```bash
kafka-topics --bootstrap-server localhost:9092 --create --topic first
kafka-topics --bootstrap-server localhost:9092 --create --topic second --partitions 5
kafka-topics --bootstrap-server localhost:9092 --create --topic third --replication-factor 2

kafka-topics --bootstrap-server localhost:9092 --topic first --describe
kafka-topics --bootstrap-server localhost:9092 --list
kafka-topics --bootstrap-server localhost:9092 --topic second --delete
```

{% endcode %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://amartyushov.gitbook.io/tech/tools/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
