import time
import threading
import queue
class TaskQueueCleaner:
def __init__(self, queue, timeout=5):
self.queue = queue
self.timeout = timeout
self.lock = threading.Lock() # For thread safety
def clean_queue(self):
"""
Cleans the queue by removing expired tasks.
"""
with self.lock: # Acquire lock for thread safety
while True:
try:
item = self.queue.get(timeout=self.timeout) # Get item with timeout
# Process the item (e.g., check if it's expired)
if self._is_expired(item):
print(f"Removing expired task: {item}")
self.queue.task_done() # Signal task completion
else:
self.queue.task_done() # Signal task completion
except queue.Empty:
# Queue is empty, exit the loop
break
def _is_expired(self, item):
"""
Checks if a task is expired based on its expiration time.
This is a placeholder; replace with your actual expiration logic.
"""
# Replace with your logic to check task expiration
if isinstance(item, dict) and 'expiration_time' in item:
if time.time() > item['expiration_time']:
return True
return False
if __name__ == '__main__':
# Example Usage
task_queue = queue.Queue()
# Add some tasks with expiration times
task_queue.put({'data': 'task1', 'expiration_time': time.time() + 2})
task_queue.put({'data': 'task2', 'expiration_time': time.time() + 5})
task_queue.put({'data': 'task3', 'expiration_time': time.time() + 1})
task_queue.put({'data': 'task4', 'expiration_time': time.time() + 8})
# Create a cleaner thread
cleaner = TaskQueueCleaner(task_queue, timeout=1)
cleaner_thread = threading.Thread(target=cleaner.clean_queue)
cleaner_thread.daemon = True # Allow the main thread to exit even if the cleaner is running
cleaner_thread.start()
# Simulate adding new tasks
time.sleep(2)
task_queue.put({'data': 'task5', 'expiration_time': time.time() + 3})
time.sleep(1)
task_queue.put({'data': 'task6', 'expiration_time': time.time() + 6})
# Wait for all tasks to be processed
task_queue.join()
print("All tasks processed.")
Add your comment