Kafka is a Distributed Event Streaming platform capable of storing and processing trillions of Messages or Data Streams per day. Users can access such instantaneous and real-time data to build event-driven applications. In this article, we’ll use the Kafka client in Python to publish and consume messages.
Overview
- Create Kafka Docker Instance
- Install the Kafka Python library
- Import the KafkaClient class from the kafka-python library
- Produce a message to a topic
- Consume messages from a topic
Links
Kafka-python Git: https://github.com/dpkp/kafka-python
Documentation: https://kafka-python.readthedocs.io/en/master/apidoc/modules.html
My Git Repository: https://github.com/psujit775/docker-kafka-python-client.git
Steps
1. Create Kafka Docker Instance
create a docker-compose.yml file with the below content.
--- version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:latest container_name: broker ports: # To learn about configuring Kafka for access across networks see # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ - "9092:9092" depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
Run below command in terminal to start Kafka instance
docker-compose up -d
2. Install the Kafka Python library:
pip install kafka-python
Also Read: How to launch an ec2 instance using python
3. Import the KafkaClient class from the kafka-python library:
from kafka import KafkaClient
4. KafkaClient to produce a message to a topic:
You can now use the KafkaClient instance to perform various operations, such as producing and consuming messages, listing topics, and more.
Here is an example of using the KafkaClient to produce message to a topic:
create a file name producer.py with the below content.
from kafka import KafkaProducer from time import sleep from json import dumps # Create a producer producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda x: dumps(x).encode('utf-8')) # Send a message to the "my-topic" topic for i in range(100): producer.send("my-topic", i) print("Sending data : {}".format(i)) sleep(5)
open a terminal and execute the file using the below command.
python3 producer.py
Sample Output:
5. KafkaClient to consume messages from a topic:
create a file name consumer.py with the below content.
from kafka import KafkaConsumer, TopicPartition # Create a consumer consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], enable_auto_commit=True, group_id='my-group', auto_offset_reset='earliest') topic_partition = TopicPartition("my-topic", 0) consumer.assign([topic_partition]) # Comment below line to read only new messages. consumer.seek_to_beginning() for message in consumer: print (message.value.decode('utf-8'))
open a terminal and execute the file using the below command.
python3 consumer.py
Sample Output:
Conclusion
In this article, we have learned how to send a simple message from the Python Kafka client and receive messages. However, using the Kafka Python Client, you can send large or high-end messages from producers to consumers by having Kafka Servers as a Mediator.
[…] See Also: How to create Kafka Client in Python? […]