Kafka is a Distributed Event Streaming platform capable of storing and processing trillions of Messages or Data Streams per day. Users can access such instantaneous and real-time data to build event-driven applications. In this article, we’ll use the Kafka client in Python to publish and consume messages.

Overview

  1. Create Kafka Docker Instance
  2. Install the Kafka Python library
  3. Import the KafkaClient class from the kafka-python library
  4. Produce a message to a topic
  5. Consume messages from a topic

Links

Kafka-python Git: https://github.com/dpkp/kafka-python

Documentation: https://kafka-python.readthedocs.io/en/master/apidoc/modules.html

My Git Repository: https://github.com/psujit775/docker-kafka-python-client.git

Steps

1. Create Kafka Docker Instance

create a docker-compose.yml file with the below content.

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-kafka:latest
    container_name: broker
    ports:
    # To learn about configuring Kafka for access across networks see
    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

Run below command in terminal to start Kafka instance

docker-compose up -d

2. Install the Kafka Python library:

pip install kafka-python

Also Read: How to launch an ec2 instance using python

3. Import the KafkaClient class from the kafka-python library:

from kafka import KafkaClient

4. KafkaClient to produce a message to a topic:

You can now use the KafkaClient instance to perform various operations, such as producing and consuming messages, listing topics, and more.

Here is an example of using the KafkaClient to produce message to a topic:

create a file name producer.py with the below content.

from kafka import KafkaProducer
from time import sleep
from json import dumps
# Create a producer
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda x: dumps(x).encode('utf-8'))
# Send a message to the "my-topic" topic
for i in range(100):
    producer.send("my-topic", i)
    print("Sending data : {}".format(i))
    sleep(5)

open a terminal and execute the file using the below command.

python3 producer.py

Sample Output:

producer.py

5. KafkaClient to consume messages from a topic:

create a file name consumer.py with the below content.

from kafka import KafkaConsumer, TopicPartition
# Create a consumer
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
                        enable_auto_commit=True,
                        group_id='my-group',
                        auto_offset_reset='earliest')
topic_partition = TopicPartition("my-topic", 0)
consumer.assign([topic_partition])
# Comment below line to read only new messages.
consumer.seek_to_beginning()
for message in consumer:
    print (message.value.decode('utf-8'))

open a terminal and execute the file using the below command.

python3 consumer.py

Sample Output:

consumer.py

Conclusion

In this article, we have learned how to send a simple message from the Python Kafka client and receive messages. However, using the Kafka Python Client, you can send large or high-end messages from producers to consumers by having Kafka Servers as a Mediator.

0 0 votes
Article Rating
Subscribe
Notify of
guest
1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
trackback
1 year ago

[…] See Also: How to create Kafka Client in Python? […]