1. import time
  2. import logging
  3. import queue
  4. # Configure logging
  5. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  6. class MessageQueueCompressor:
  7. def __init__(self, queue_name, batch_size=100, compression_interval=5):
  8. """
  9. Initializes the MessageQueueCompressor.
  10. Args:
  11. queue_name (str): The name of the message queue.
  12. batch_size (int): The number of messages to batch for compression.
  13. compression_interval (int): Time (seconds) between compression attempts.
  14. """
  15. self.queue_name = queue_name
  16. self.batch_size = batch_size
  17. self.compression_interval = compression_interval
  18. self.queue = queue.Queue() # Simulate message queue
  19. self.is_running = False
  20. def enqueue(self, message):
  21. """
  22. Adds a message to the queue.
  23. Args:
  24. message (any): The message to add.
  25. """
  26. self.queue.put(message)
  27. logging.info(f"Message enqueued: {message}")
  28. def compress(self):
  29. """
  30. Compresses messages in the queue.
  31. """
  32. if self.queue.empty():
  33. logging.info("Queue is empty. No compression needed.")
  34. return
  35. try:
  36. messages = []
  37. for _ in range(self.batch_size):
  38. messages.append(self.queue.get())
  39. if messages:
  40. # Simulate compression (e.g., serialize to JSON)
  41. compressed_messages = [str(msg) for msg in messages] # Simple string conversion
  42. logging.info(f"Compressed {len(messages)} messages.")
  43. # Simulate putting compressed messages back into the queue
  44. for compressed_msg in compressed_messages:
  45. self.enqueue(compressed_msg)
  46. else:
  47. logging.info("No messages to compress in this batch.")
  48. except queue.Empty:
  49. logging.warning("Queue is empty during compression.")
  50. except Exception as e:
  51. logging.error(f"Error during compression: {e}")
  52. def run(self):
  53. """
  54. Starts the compression process in a loop.
  55. """
  56. self.is_running = True
  57. while self.is_running:
  58. self.compress()
  59. time.sleep(self.compression_interval)
  60. def stop(self):
  61. """
  62. Stops the compression process.
  63. """
  64. self.is_running = False
  65. logging.info("Compression stopped.")
  66. if __name__ == '__main__':
  67. # Example usage
  68. compressor = MessageQueueCompressor("my_queue", batch_size=5, compression_interval=2)
  69. # Simulate adding messages to the queue
  70. for i in range(20):
  71. compressor.enqueue(f"Message {i}")
  72. time.sleep(0.5)
  73. # Start the compression process
  74. compressor.run()
  75. # Let it run for a while (e.g., 10 seconds)
  76. time.sleep(10)
  77. # Stop the compression process
  78. compressor.stop()

Add your comment