import redis
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class QueueMirrorer:
def __init__(self, source_queue_url, destination_queue_url, redis_host='localhost', redis_port=6379, redis_db=0):
"""
Initializes the QueueMirrorer.
Args:
source_queue_url: URL of the source message queue (e.g., RabbitMQ).
destination_queue_url: URL of the destination message queue (e.g., Redis).
redis_host: Hostname of the Redis server.
redis_port: Port of the Redis server.
redis_db: Redis database number.
"""
self.source_queue_url = source_queue_url
self.destination_queue_url = destination_queue_url
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port, db=self.redis_db)
def mirror_queue(self, polling_interval=5):
"""
Mirrors messages from the source queue to the destination queue.
Args:
polling_interval: Time in seconds between polling the source queue.
"""
while True:
try:
# Consume messages from the source queue
messages = self.consume_messages()
# Publish messages to the destination queue
if messages:
self.publish_messages(messages)
else:
logging.debug("No messages to mirror.")
except Exception as e:
logging.error(f"Error mirroring queue: {e}")
time.sleep(polling_interval)
def consume_messages(self):
"""
Consumes messages from the source queue. This is a placeholder; implement based on
your message queue system (e.g., RabbitMQ, Kafka).
"""
# Placeholder for message consumption logic. Replace with your queue library.
# Example using a dummy queue:
# return [f"Message {i}" for i in range(3)]
#Simulate a queue with messages
messages = ["Message 1", "Message 2", "Message 3"]
return messages
def publish_messages(self, messages):
"""
Publishes messages to the destination queue.
"""
for message in messages:
try:
self.redis_client.lpush(self.destination_queue_url, message) # Use lpush for FIFO
logging.info(f"Published message: {message} to {self.destination_queue_url}")
except Exception as e:
logging.error(f"Error publishing message {message} to {self.destination_queue_url}: {e}")
#Fallback: Retry after a short delay
time.sleep(1) #Retry after 1 second
if __name__ == '__main__':
# Example Usage
source_queue = 'source_queue'
destination_queue = 'staging_queue'
mirrorer = QueueMirrorer(source_queue, destination_queue)
mirrorer.mirror_queue()
Add your comment