1. import time
  2. import threading
  3. import queue
  4. class TaskQueueCleaner:
  5. def __init__(self, queue, timeout=5):
  6. self.queue = queue
  7. self.timeout = timeout
  8. self.lock = threading.Lock() # For thread safety
  9. def clean_queue(self):
  10. """
  11. Cleans the queue by removing expired tasks.
  12. """
  13. with self.lock: # Acquire lock for thread safety
  14. while True:
  15. try:
  16. item = self.queue.get(timeout=self.timeout) # Get item with timeout
  17. # Process the item (e.g., check if it's expired)
  18. if self._is_expired(item):
  19. print(f"Removing expired task: {item}")
  20. self.queue.task_done() # Signal task completion
  21. else:
  22. self.queue.task_done() # Signal task completion
  23. except queue.Empty:
  24. # Queue is empty, exit the loop
  25. break
  26. def _is_expired(self, item):
  27. """
  28. Checks if a task is expired based on its expiration time.
  29. This is a placeholder; replace with your actual expiration logic.
  30. """
  31. # Replace with your logic to check task expiration
  32. if isinstance(item, dict) and 'expiration_time' in item:
  33. if time.time() > item['expiration_time']:
  34. return True
  35. return False
  36. if __name__ == '__main__':
  37. # Example Usage
  38. task_queue = queue.Queue()
  39. # Add some tasks with expiration times
  40. task_queue.put({'data': 'task1', 'expiration_time': time.time() + 2})
  41. task_queue.put({'data': 'task2', 'expiration_time': time.time() + 5})
  42. task_queue.put({'data': 'task3', 'expiration_time': time.time() + 1})
  43. task_queue.put({'data': 'task4', 'expiration_time': time.time() + 8})
  44. # Create a cleaner thread
  45. cleaner = TaskQueueCleaner(task_queue, timeout=1)
  46. cleaner_thread = threading.Thread(target=cleaner.clean_queue)
  47. cleaner_thread.daemon = True # Allow the main thread to exit even if the cleaner is running
  48. cleaner_thread.start()
  49. # Simulate adding new tasks
  50. time.sleep(2)
  51. task_queue.put({'data': 'task5', 'expiration_time': time.time() + 3})
  52. time.sleep(1)
  53. task_queue.put({'data': 'task6', 'expiration_time': time.time() + 6})
  54. # Wait for all tasks to be processed
  55. task_queue.join()
  56. print("All tasks processed.")

Add your comment