1. import multiprocessing
  2. import queue
  3. import time
  4. import os
  5. def worker(q, task_id):
  6. """Worker function to process tasks from the queue."""
  7. while True:
  8. try:
  9. task = q.get(timeout=1) # Get task, timeout after 1 second
  10. if task is None:
  11. break # Exit if None is received
  12. print(f"Worker {task_id}: Processing task {task}")
  13. time.sleep(0.5) # Simulate task processing
  14. print(f"Worker {task_id}: Finished task {task}")
  15. q.task_done() # Signal task completion
  16. except queue.Empty:
  17. # Queue is empty, check for more tasks
  18. pass
  19. def create_queue_based_script(num_workers, num_tasks):
  20. """
  21. Creates a queue-based script for multiprocessing.
  22. Args:
  23. num_workers: Number of worker processes.
  24. num_tasks: Total number of tasks to be processed.
  25. """
  26. task_queue = queue.Queue()
  27. processes = []
  28. # Create and start worker processes
  29. for i in range(num_workers):
  30. p = multiprocessing.Process(target=worker, args=(task_queue, i))
  31. processes.append(p)
  32. p.start()
  33. # Add tasks to the queue
  34. for i in range(num_tasks):
  35. task_queue.put(i)
  36. # Block until all tasks are done
  37. task_queue.join()
  38. # Signal workers to exit
  39. for _ in range(num_workers):
  40. task_queue.put(None)
  41. # Wait for all worker processes to finish
  42. for p in processes:
  43. p.join()
  44. if __name__ == '__main__':
  45. num_workers = 4
  46. num_tasks = 20
  47. create_queue_based_script(num_workers, num_tasks)
  48. print("All tasks completed.")

Add your comment