Kafka

Diff with other solutions

  • vs RabbitMQ: Kafka has ~1000x better performance (as it uses unidirectional log without handshake)

  • vs Redis: Redis does not handle streaming data nor has a history of past messages

  • vs Amazon SQS: mainly still performance is worse + it is not cheap

  • vs Amazon SNS: very similar, but more expensive

  • vs Azure Streaming Services: this is not exactly meant for pub/sub in general

  • vs Google pub/sub: same as Amazon SNS, but more expensive

How it is distributed

  • Confluent Cloud

  • Cloudera has its own version

  • Strimzi offers Helm charts to deploy Kafka to K8s

Architecture

Zookeeper

  • Zookeeper manages brokers (keeps list of them)

  • Zookeeper helps in performing leader election for partitions

  • Zookeeper sends notifications to Kafka in case of changes (new topic, broker dies/comes_up)

  • Kafka 2.x can't work without Zookeeper

  • Kafka 3.x can work without Zookeeper (KIP-500) - using Kafka Raft instead

  • Kafka 4.x will not have Zookeeper

  • Zookeeper by design operates with an odd number of servers (1,3,5,7)

Topic vs Queue

Topic has a retention configuration:

  • Days: keep messages of up to N days and then delete

  • Bytes: keep messages until the topic has more than X bytes and then delete

  • None: store all messages from the beginning of time

Kafka Broker Discovery

Each Kafka broker is also called "bootstrap server". Each broker knows about all brokers, tpics and partitions (metadata).

Partitioning

Messages are ordered only within a partition, not across partitions.

Each message in a partition has an id (offset) which is always incremental.

Once data is writtem to a partition, it. can not be changed (immutability).

The partitioner uses the key to distribute the messages to the correct partition.

Consumers read from partitions and have an offset per partition.

More partitions enable more consumers => scalability

Example data distribution

E.g. Topic-A has 3 partitions, Topic-B has 2 partitions.

Replication

Topics in PROD should have replication factor > 1 (3 is better). E.g. Topic-A has 2 partitions and replication factor 2.

Topic durability:

For Replication factor of N, you can permanently lose N-1 brokers and still recover your data.

Leader for a Partition

  • At any time only ONE broker can be a leader for a given partition

  • Producers can only send data to the broker that is a leader of a partition

  • Consumers will read only from leader (since v2.4+ can read from replica as well)

  • the other brokers will replicate the data

  • => each partition has one leader and multiple ISR (in-sync replica)

Producer

producer ->
                topic-A/partition-0
                topic-A/partition-1
                
optional Message key helps to find a partition for the message.

if (key == null) use round robin
else messages with same key go to particular partition (hashing)

Producer delivery semantics (acks)

Producers can choose to receive acknowledgment of data writes:

  • at most once, acks=0: Producer won't wait for the ack (possible data loss)

  • at least once, acks=1: Producer will wait for the leader ack (limited data loss)

    • If no acknowledgment is received for the message sent, then the producer will retry sending the messages based on retry configuration. Retries property by default is 0 make sure this is set to desired number or Max.INT.

Idempotent producer

as the producer writes messages to the broker in batches, if that write fails and the producer retries, messages within the batch may be written more than once in Kafka.

However, to avoid duplication, Kafka introduced the feature of the idempotent producer.

Essentially, in order to enable at-least-once semantics in Kafka, we’ll need to:

  • set the property “ack” to value “1” on the producer side

  • set “enable.auto.commit” property to value “false” on the consumer side.

  • set “enable.idempotence” property to value “true

  • attach the sequence number and producer id to each message from the producer

Kafka Broker can identify the message duplication on a topic using the sequence number and producer id.

  • acks=all: Leader + replicas acks (no data loss)

    • there is important property min.insync.replicas if, replication factor = 3, then set property to 2, so that kafka can tolerate failure of 1 broker.

    • always deliver all messages without duplication,

Consumer

Producers and consumers are using serializers/deserializers for the message. If data types of the message change => create a new topic!

  • Consumers act as a group via the consumer group id

  • Each consumer in the group gets assigned a partition, and a partitions is not a shared by two members of a group

    • in N of consumers > N of partitions => some consumers will idle

    • How to several applications can read from the same topic:

  • A rebalance in triggers when a member leaves the group or they have not sent a heartbeat in a long time

  • A rebalance is a stop the world event that ensures all partitions are attended by some consumer in the group

  • Source Connectors - transfer data from a source to Kafka

  • Sink Connectors - from Kafka to a Sink

Standalone

Consumer delivery semantics

  • At least once (usually preferred)

    • offsets are committed after the message is processed

    • if processing goes wrong, the message will be read again

      • can be duplicate processing of the message => make processing idempotent

Idempotent consumers

In order to avoid processing the same event multiple times, we can use idempotent consumers.

Essentially an idempotent consumer can consume a message multiple times but only processes it once.

The combination of the following approaches enables idempotent consumers in at-least-once delivery:

  • The producer assigns a unique messageId to each message.

  • The consumer maintains a record of all processed messages in a database.

  • When a new message arrives, the consumer checks it against the existing messageIds in the persistent storage table.

  • In case there’s a match, the consumer updates the offset without re-consuming, sends the acknowledgment back, and effectively marks the message as consumed.

  • When the event is not already present, a database transaction is started, and a new messageId is inserted. Next, this new message is processed based on whatever business logic is required. On completion of message processing, the transaction’s finally committed

  • At most once

    • offsets committed once a message is received

    • if processing fails, message will be lost (they won't be read again)

    • In order to enable At-Most-Once semantics in Kafka, we’ll need to set “enable.auto.commit” to “true” at the consumer.

  • Exactly once

Local deploy

3 zookeepers 3 brokers
---
version: '3'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:7.4.1
    hostname: zookeeper-1
    container_name: zookeeper-1
    volumes:
      - ./zookeeper-1_data:/var/lib/zookeeper/data
      - ./zookeeper-1_log:/var/lib/zookeeper/log
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181

  zookeeper-2:
    image: confluentinc/cp-zookeeper:7.4.1
    hostname: zookeeper-2
    container_name: zookeeper-2
    volumes:
      - ./zookeeper-2_data:/var/lib/zookeeper/data
      - ./zookeeper-2_log:/var/lib/zookeeper/log
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181

  zookeeper-3:
    image: confluentinc/cp-zookeeper:7.4.1
    hostname: zookeeper-3
    container_name: zookeeper-3
    volumes:
      - ./zookeeper-3_data:/var/lib/zookeeper/data
      - ./zookeeper-3_log:/var/lib/zookeeper/log
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181


  broker-1:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker-1
    container_name: broker-1
    volumes:
      - ./broker-1-data:/var/lib/kafka/data
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,INTERNAL://broker-1:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: HOST:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_SNAPSHOT_TRUST_EMPTY: true

  broker-2:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker-2
    container_name: broker-2
    volumes:
      - ./broker-2-data:/var/lib/kafka/data
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - broker-1
    ports:
      - 9093:9093
      - 29093:29093
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9093,INTERNAL://broker-2:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: HOST:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_SNAPSHOT_TRUST_EMPTY: true

  broker-3:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker-3
    container_name: broker-3
    volumes:
      - ./broker-3-data:/var/lib/kafka/data
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - broker-1
      - broker-2
    ports:
      - 9094:9094
      - 29094:29094
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9094,INTERNAL://broker-3:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: HOST:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_SNAPSHOT_TRUST_EMPTY: true


  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.4.1
    ports:
      - "8082:8082"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - broker-1
      - broker-2
      - broker-3
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker-1:29092,broker-2:29093,broker-3:29094'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
kafka-topics --bootstrap-server localhost:9092 --create --topic first
kafka-topics --bootstrap-server localhost:9092 --create --topic second --partitions 5
kafka-topics --bootstrap-server localhost:9092 --create --topic third --replication-factor 2

kafka-topics --bootstrap-server localhost:9092 --topic first --describe
kafka-topics --bootstrap-server localhost:9092 --list
kafka-topics --bootstrap-server localhost:9092 --topic second --delete

Last updated