import time
import logging
import queue
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class MessageQueueCompressor:
def __init__(self, queue_name, batch_size=100, compression_interval=5):
"""
Initializes the MessageQueueCompressor.
Args:
queue_name (str): The name of the message queue.
batch_size (int): The number of messages to batch for compression.
compression_interval (int): Time (seconds) between compression attempts.
"""
self.queue_name = queue_name
self.batch_size = batch_size
self.compression_interval = compression_interval
self.queue = queue.Queue() # Simulate message queue
self.is_running = False
def enqueue(self, message):
"""
Adds a message to the queue.
Args:
message (any): The message to add.
"""
self.queue.put(message)
logging.info(f"Message enqueued: {message}")
def compress(self):
"""
Compresses messages in the queue.
"""
if self.queue.empty():
logging.info("Queue is empty. No compression needed.")
return
try:
messages = []
for _ in range(self.batch_size):
messages.append(self.queue.get())
if messages:
# Simulate compression (e.g., serialize to JSON)
compressed_messages = [str(msg) for msg in messages] # Simple string conversion
logging.info(f"Compressed {len(messages)} messages.")
# Simulate putting compressed messages back into the queue
for compressed_msg in compressed_messages:
self.enqueue(compressed_msg)
else:
logging.info("No messages to compress in this batch.")
except queue.Empty:
logging.warning("Queue is empty during compression.")
except Exception as e:
logging.error(f"Error during compression: {e}")
def run(self):
"""
Starts the compression process in a loop.
"""
self.is_running = True
while self.is_running:
self.compress()
time.sleep(self.compression_interval)
def stop(self):
"""
Stops the compression process.
"""
self.is_running = False
logging.info("Compression stopped.")
if __name__ == '__main__':
# Example usage
compressor = MessageQueueCompressor("my_queue", batch_size=5, compression_interval=2)
# Simulate adding messages to the queue
for i in range(20):
compressor.enqueue(f"Message {i}")
time.sleep(0.5)
# Start the compression process
compressor.run()
# Let it run for a while (e.g., 10 seconds)
time.sleep(10)
# Stop the compression process
compressor.stop()
Add your comment