1. import time
  2. import threading
  3. from collections import deque
  4. class MessageQueueNormalizer:
  5. def __init__(self, rate_limit_requests_per_second=10):
  6. self.rate_limit_requests_per_second = rate_limit_requests_per_second
  7. self.request_timestamps = deque() # Store timestamps of recent requests
  8. self.lock = threading.Lock() # Ensure thread-safe access
  9. def normalize_and_rate_limit(self, message):
  10. """
  11. Normalizes a message and applies rate limiting.
  12. """
  13. with self.lock:
  14. now = time.time()
  15. # Remove timestamps older than 1 second
  16. while self.request_timestamps and self.request_timestamps[0] <= now - 1:
  17. self.request_timestamps.popleft()
  18. # Check if rate limit is exceeded
  19. if len(self.request_timestamps) >= self.rate_limit_requests_per_second:
  20. wait_time = 1 / self.rate_limit_requests_per_second
  21. time.sleep(wait_time)
  22. # Add current timestamp to the queue
  23. self.request_timestamps.append(now)
  24. # Perform normalization on the message (replace with your actual normalization logic)
  25. normalized_message = self._normalize_message(message)
  26. return normalized_message
  27. def _normalize_message(self, message):
  28. """
  29. Placeholder for message normalization. Replace with your logic.
  30. """
  31. # Example: Convert message to uppercase
  32. return message.upper()
  33. def clear_rate_limit(self):
  34. """Resets the rate limit counter"""
  35. with self.lock:
  36. self.request_timestamps.clear()

Add your comment