import time
import threading
import queue
class TaskQueue:
def __init__(self):
self.task_queue = queue.Queue()
self.running = True
def enqueue(self, task):
"""Adds a task to the queue."""
self.task_queue.put(task)
def dequeue(self):
"""Retrieves and removes a task from the queue. Blocks if empty."""
return self.task_queue.get()
def stop(self):
"""Stops the queue processing."""
self.running = False
def worker(self, worker_id):
"""Worker thread to process tasks from the queue."""
while self.running:
try:
task = self.dequeue()
task() # Execute the task
except queue.Empty:
pass #Queue is empty, continue looping
except Exception as e:
print(f"Worker {worker_id} encountered an error: {e}")
def start_workers(self, num_workers):
"""Starts a specified number of worker threads."""
threads = []
for i in range(num_workers):
t = threading.Thread(target=self.worker, args=(i,))
threads.append(t)
t.daemon = True # Allow main thread to exit even if workers are running
t.start()
if __name__ == '__main__':
# Example Usage
def my_task(task_id):
"""A sample task."""
print(f"Task {task_id} started by thread {threading.current_thread().name}")
time.sleep(2) # Simulate work
print(f"Task {task_id} completed by thread {threading.current_thread().name}")
task_queue = TaskQueue()
num_workers = 3
# Enqueue some tasks
for i in range(5):
task_queue.enqueue(lambda i=i: my_task(i)) #Capture i in the lambda
# Start worker threads
task_queue.start_workers(num_workers)
# Let the workers process tasks for a while
time.sleep(10)
# Stop the queue and wait for workers to finish
task_queue.stop()
print("Stopping queue...")
for t in task_queue.task_queue.queue:
print(f"Remaining tasks: {t}")
print("Queue stopped.")
Add your comment