1. import time
  2. import threading
  3. import queue
  4. class TaskQueue:
  5. def __init__(self):
  6. self.task_queue = queue.Queue()
  7. self.running = True
  8. def enqueue(self, task):
  9. """Adds a task to the queue."""
  10. self.task_queue.put(task)
  11. def dequeue(self):
  12. """Retrieves and removes a task from the queue. Blocks if empty."""
  13. return self.task_queue.get()
  14. def stop(self):
  15. """Stops the queue processing."""
  16. self.running = False
  17. def worker(self, worker_id):
  18. """Worker thread to process tasks from the queue."""
  19. while self.running:
  20. try:
  21. task = self.dequeue()
  22. task() # Execute the task
  23. except queue.Empty:
  24. pass #Queue is empty, continue looping
  25. except Exception as e:
  26. print(f"Worker {worker_id} encountered an error: {e}")
  27. def start_workers(self, num_workers):
  28. """Starts a specified number of worker threads."""
  29. threads = []
  30. for i in range(num_workers):
  31. t = threading.Thread(target=self.worker, args=(i,))
  32. threads.append(t)
  33. t.daemon = True # Allow main thread to exit even if workers are running
  34. t.start()
  35. if __name__ == '__main__':
  36. # Example Usage
  37. def my_task(task_id):
  38. """A sample task."""
  39. print(f"Task {task_id} started by thread {threading.current_thread().name}")
  40. time.sleep(2) # Simulate work
  41. print(f"Task {task_id} completed by thread {threading.current_thread().name}")
  42. task_queue = TaskQueue()
  43. num_workers = 3
  44. # Enqueue some tasks
  45. for i in range(5):
  46. task_queue.enqueue(lambda i=i: my_task(i)) #Capture i in the lambda
  47. # Start worker threads
  48. task_queue.start_workers(num_workers)
  49. # Let the workers process tasks for a while
  50. time.sleep(10)
  51. # Stop the queue and wait for workers to finish
  52. task_queue.stop()
  53. print("Stopping queue...")
  54. for t in task_queue.task_queue.queue:
  55. print(f"Remaining tasks: {t}")
  56. print("Queue stopped.")

Add your comment