import time
import threading
from collections import deque
class MessageQueueNormalizer:
def __init__(self, rate_limit_requests_per_second=10):
self.rate_limit_requests_per_second = rate_limit_requests_per_second
self.request_timestamps = deque() # Store timestamps of recent requests
self.lock = threading.Lock() # Ensure thread-safe access
def normalize_and_rate_limit(self, message):
"""
Normalizes a message and applies rate limiting.
"""
with self.lock:
now = time.time()
# Remove timestamps older than 1 second
while self.request_timestamps and self.request_timestamps[0] <= now - 1:
self.request_timestamps.popleft()
# Check if rate limit is exceeded
if len(self.request_timestamps) >= self.rate_limit_requests_per_second:
wait_time = 1 / self.rate_limit_requests_per_second
time.sleep(wait_time)
# Add current timestamp to the queue
self.request_timestamps.append(now)
# Perform normalization on the message (replace with your actual normalization logic)
normalized_message = self._normalize_message(message)
return normalized_message
def _normalize_message(self, message):
"""
Placeholder for message normalization. Replace with your logic.
"""
# Example: Convert message to uppercase
return message.upper()
def clear_rate_limit(self):
"""Resets the rate limit counter"""
with self.lock:
self.request_timestamps.clear()
Add your comment