1. import argparse
  2. import datetime
  3. import time
  4. import threading
  5. class ResourceSync:
  6. def __init__(self, name="default"):
  7. self.name = name
  8. self.last_value = None
  9. self.lock = threading.Lock()
  10. def get_value(self):
  11. with self.lock:
  12. return self.last_value
  13. def set_value(self, value):
  14. with self.lock:
  15. self.last_value = value
  16. def sync(self, new_value):
  17. with self.lock:
  18. if self.last_value is None or self.last_value != new_value:
  19. self.set_value(new_value)
  20. print(f"Resource '{self.name}' synced to {new_value}")
  21. class DiagnosticsSync:
  22. def __init__(self):
  23. self.resources = {}
  24. def add_resource(self, name, initial_value=None):
  25. if name in self.resources:
  26. raise ValueError(f"Resource '{name}' already exists.")
  27. self.resources[name] = ResourceSync(name)
  28. if initial_value is not None:
  29. self.resources[name].set_value(initial_value)
  30. def get_resource(self, name):
  31. if name not in self.resources:
  32. raise ValueError(f"Resource '{name}' not found.")
  33. return self.resources[name]
  34. def sync_all(self, data):
  35. for name, value in data.items():
  36. self.get_resource(name).sync(value)
  37. def get_all_values(self):
  38. values = {}
  39. for name, resource in self.resources.items():
  40. values[name] = resource.get_value()
  41. return values
  42. def display_values(self):
  43. values = self.get_all_values()
  44. for name, value in values.items():
  45. print(f"{name}: {value}")
  46. def main():
  47. parser = argparse.ArgumentParser(description="Sync and manage diagnostic resources.")
  48. parser.add_argument("action", choices=["add", "sync", "display"])
  49. parser.add_argument("--name", help="Resource name (required for add)")
  50. parser.add_argument("--value", type=str, help="Resource value (required for sync)")
  51. parser.add_argument("--initial_value", type=str, help="Initial value for a new resource")
  52. parser.add_argument("--data", type=str, help="JSON data for syncing multiple resources")
  53. parser.add_argument("--display", action="store_true", help="Display all resource values")
  54. args = parser.parse_args()
  55. sync = DiagnosticsSync()
  56. if args.action == "add":
  57. if not args.name:
  58. print("Error: --name is required for 'add' action.")
  59. return
  60. try:
  61. sync.add_resource(args.name, args.initial_value)
  62. except ValueError as e:
  63. print(e)
  64. elif args.action == "sync":
  65. if not args.value:
  66. print("Error: --value is required for 'sync' action.")
  67. return
  68. sync.sync_all({args.name: args.value})
  69. elif args.action == "display":
  70. sync.display_values()
  71. elif args.action == "data":
  72. if not args.data:
  73. print("Error: --data is required for 'data' action.")
  74. return
  75. try:
  76. data = eval(args.data) #Use eval cautiously
  77. sync.sync_all(data)
  78. except (SyntaxError, NameError):
  79. print("Error: Invalid JSON data.")
  80. if __name__ == "__main__":
  81. main()

Add your comment