Part 5: Python Kafka Producer | Hands-On
A producer is a application/program/system/utility that publishes messages/data to Kafka topics.Install Python package to communicate with Kafka Server/Kafka Broker
sudo pip install kafka-python
Example program for Python Kafka Producer
from kafka import KafkaProducer
from datetime import datetime
import time
from json import dumps
import random
KAFKA_TOPIC_NAME_CONS = "testtopic"
KAFKA_BOOTSTRAP_SERVERS_CONS = '34.73.102.250:9092'
if __name__ == "__main__":
print("Kafka Producer Application Started ... ")
kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
value_serializer=lambda x: dumps(x).encode('utf-8'))
transaction_card_type_list = ["Visa", "MasterCard", "Maestro"]
message = None
for i in range(10):
i = i + 1
message = {}
print("Sending message to Kafka topic: " + str(i))
event_datetime = datetime.now()
message["transaction_id"] = str(i)
message["transaction_card_type"] = random.choice(transaction_card_type_list)
message["transaction_amount"] = round(random.uniform(5.5,555.5), 2)
message["transaction_datetime"] = event_datetime.strftime("%Y-%m-%d %H:%M:%S")
print("Message to be sent: ", message)
kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, message)
time.sleep(1)
Part 6: Python Kafka Consumer | Hands-On
A consumer is a application/program/system/utility that subscribe and consumes messages/data from Kafka topics for further processing.Example program for Python Kafka Producer
from kafka import KafkaConsumer
from json import loads
KAFKA_CONSUMER_GROUP_NAME_CONS = "test_consumer_group"
KAFKA_TOPIC_NAME_CONS = "testtopic"
#KAFKA_OUTPUT_TOPIC_NAME_CONS = "outputtopic"
KAFKA_BOOTSTRAP_SERVERS_CONS = '34.73.102.250:9092'
if __name__ == "__main__":
print("Kafka Consumer Application Started ... ")
# auto_offset_reset='latest'
# auto_offset_reset='earliest'
consumer = KafkaConsumer(
KAFKA_TOPIC_NAME_CONS,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
auto_offset_reset='latest',
enable_auto_commit=True,
group_id=KAFKA_CONSUMER_GROUP_NAME_CONS,
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
#print(dir(message))
#print(type(message))
print("Key: ", message.key)
message = message.value
print("Message received: ", message)
Happy Learning !!!
0 Comments