1. import json
  2. import os
  3. def load_task_queue_config(config_file="task_queue_config.json"):
  4. """Loads task queue configuration from a JSON file."""
  5. try:
  6. with open(config_file, "r") as f:
  7. config = json.load(f)
  8. return config
  9. except FileNotFoundError:
  10. print(f"Error: Configuration file '{config_file}' not found.")
  11. return {}
  12. except json.JSONDecodeError:
  13. print(f"Error: Invalid JSON format in '{config_file}'.")
  14. return {}
  15. def replace_task_queues(config):
  16. """Replaces task queues in a local utility based on the configuration."""
  17. # Example usage - replace with your actual utility replacement logic
  18. if not config:
  19. print("No configuration loaded. Skipping replacement.")
  20. return
  21. # Assuming 'task_queues' is a key in the config file
  22. if 'task_queues' in config:
  23. task_queues = config['task_queues']
  24. # Simulate replacing task queues in a local utility (replace with actual logic)
  25. for module, queue in task_queues.items():
  26. try:
  27. # Example: Modify a file or environment variable
  28. with open(module, "w") as f:
  29. f.write(f"Using queue: {queue}\n")
  30. print(f"Replaced task queue in {module} with {queue}")
  31. except Exception as e:
  32. print(f"Error replacing queue in {module}: {e}")
  33. else:
  34. print("No 'task_queues' section found in configuration.")
  35. if __name__ == "__main__":
  36. # Example usage
  37. config = load_task_queue_config()
  38. replace_task_queues(config)

Add your comment