1. import redis
  2. import time
  3. import logging
  4. # Configure logging
  5. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  6. class QueueMirrorer:
  7. def __init__(self, source_queue_url, destination_queue_url, redis_host='localhost', redis_port=6379, redis_db=0):
  8. """
  9. Initializes the QueueMirrorer.
  10. Args:
  11. source_queue_url: URL of the source message queue (e.g., RabbitMQ).
  12. destination_queue_url: URL of the destination message queue (e.g., Redis).
  13. redis_host: Hostname of the Redis server.
  14. redis_port: Port of the Redis server.
  15. redis_db: Redis database number.
  16. """
  17. self.source_queue_url = source_queue_url
  18. self.destination_queue_url = destination_queue_url
  19. self.redis_host = redis_host
  20. self.redis_port = redis_port
  21. self.redis_db = redis_db
  22. self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port, db=self.redis_db)
  23. def mirror_queue(self, polling_interval=5):
  24. """
  25. Mirrors messages from the source queue to the destination queue.
  26. Args:
  27. polling_interval: Time in seconds between polling the source queue.
  28. """
  29. while True:
  30. try:
  31. # Consume messages from the source queue
  32. messages = self.consume_messages()
  33. # Publish messages to the destination queue
  34. if messages:
  35. self.publish_messages(messages)
  36. else:
  37. logging.debug("No messages to mirror.")
  38. except Exception as e:
  39. logging.error(f"Error mirroring queue: {e}")
  40. time.sleep(polling_interval)
  41. def consume_messages(self):
  42. """
  43. Consumes messages from the source queue. This is a placeholder; implement based on
  44. your message queue system (e.g., RabbitMQ, Kafka).
  45. """
  46. # Placeholder for message consumption logic. Replace with your queue library.
  47. # Example using a dummy queue:
  48. # return [f"Message {i}" for i in range(3)]
  49. #Simulate a queue with messages
  50. messages = ["Message 1", "Message 2", "Message 3"]
  51. return messages
  52. def publish_messages(self, messages):
  53. """
  54. Publishes messages to the destination queue.
  55. """
  56. for message in messages:
  57. try:
  58. self.redis_client.lpush(self.destination_queue_url, message) # Use lpush for FIFO
  59. logging.info(f"Published message: {message} to {self.destination_queue_url}")
  60. except Exception as e:
  61. logging.error(f"Error publishing message {message} to {self.destination_queue_url}: {e}")
  62. #Fallback: Retry after a short delay
  63. time.sleep(1) #Retry after 1 second
  64. if __name__ == '__main__':
  65. # Example Usage
  66. source_queue = 'source_queue'
  67. destination_queue = 'staging_queue'
  68. mirrorer = QueueMirrorer(source_queue, destination_queue)
  69. mirrorer.mirror_queue()

Add your comment