Dead Letter Queue (DLQ) in Kafka

Medium Article Link: https://medium.com/p/29418e0ec6cf

Installation

!pip install kafka-python

Implementation

from kafka import KafkaProducer, KafkaConsumer
import json

bootstrap_servers = ['localhost:9092']
primary_topic = 'primary-topic-name'
dlq_topic = 'dlq-topic-name'
dlq_producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda x: x.encode('utf-8'),
    acks='all'
)
consumer = KafkaConsumer(
    primary_topic,
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='latest',
    enable_auto_commit=True,
    value_deserializer=lambda x: x.decode('utf-8')
)
for msg in consumer:
    print(f'\nReceived:\nPartition: {msg.partition} \tOffset: {msg.offset}\tValue: {msg.value}')
    
    try:
        data = json.loads(msg.value)
        print('Data Received:', data)
            
    except:
        print(f'Value {msg.value} not in JSON format')
        dlq_producer.send(dlq_topic, value=msg.value)
        print('Message sent to DLQ Topic')
        
Received:
Partition: 0 	Offset: 542	Value: {"test":"1"}
Data Received: {'test': '1'}

Received:
Partition: 0 	Offset: 543	Value: test
Value test not in JSON format
Message sent to DLQ Topic