Dead Letter Queue (DLQ) in Kafka¶
Medium Article Link: https://medium.com/p/29418e0ec6cf¶
Installation¶
!pip install kafka-python
Implementation¶
from kafka import KafkaProducer, KafkaConsumer
import json
bootstrap_servers = ['localhost:9092']
primary_topic = 'primary-topic-name'
dlq_topic = 'dlq-topic-name'
dlq_producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda x: x.encode('utf-8'),
acks='all'
)
consumer = KafkaConsumer(
primary_topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='latest',
enable_auto_commit=True,
value_deserializer=lambda x: x.decode('utf-8')
)
for msg in consumer:
print(f'\nReceived:\nPartition: {msg.partition} \tOffset: {msg.offset}\tValue: {msg.value}')
try:
data = json.loads(msg.value)
print('Data Received:', data)
except:
print(f'Value {msg.value} not in JSON format')
dlq_producer.send(dlq_topic, value=msg.value)
print('Message sent to DLQ Topic')
Received:
Partition: 0 Offset: 542 Value: {"test":"1"}
Data Received: {'test': '1'}
Received:
Partition: 0 Offset: 543 Value: test
Value test not in JSON format
Message sent to DLQ Topic