1. import time
  2. from collections import deque
  3. class TaskQueueThrottler:
  4. def __init__(self, max_requests, interval):
  5. """
  6. Initializes the throttler.
  7. Args:
  8. max_requests (int): The maximum number of requests allowed within the interval.
  9. interval (float): The time interval in seconds.
  10. """
  11. self.max_requests = max_requests
  12. self.interval = interval
  13. self.request_queue = deque() # Queue to store timestamps of requests
  14. self.lock = time.Lock()
  15. def is_allowed(self):
  16. """
  17. Checks if a request is allowed based on the throttle.
  18. Returns:
  19. bool: True if the request is allowed, False otherwise.
  20. """
  21. with self.lock:
  22. now = time.time()
  23. while self.request_queue and self.request_queue[0] <= now - self.interval:
  24. self.request_queue.popleft() # Remove expired requests
  25. if len(self.request_queue) < self.max_requests:
  26. self.request_queue.append(now)
  27. return True
  28. else:
  29. return False
  30. def submit(self):
  31. """
  32. Submits a request to the queue, throttling if necessary.
  33. """
  34. if self.is_allowed():
  35. # Simulate task execution (replace with your actual task)
  36. print("Executing task...")
  37. time.sleep(1) # Simulate task taking 1 second
  38. else:
  39. print("Throttled. Request denied.")
  40. #Add timestamp to request queue for throttling
  41. with self.lock:
  42. self.request_queue.append(time.time())
  43. time.sleep(self.interval) #Wait for the interval
  44. if __name__ == '__main__':
  45. throttler = TaskQueueThrottler(max_requests=3, interval=2) # Allow 3 requests every 2 seconds
  46. for i in range(5):
  47. if throttler.is_allowed():
  48. throttler.submit()
  49. else:
  50. print(f"Skipping request {i}")

Add your comment