1. import json
  2. from jsonschema import validate, ValidationError
  3. def flag_json_anomalies(json_data, schema):
  4. """
  5. Flags anomalies in JSON responses based on a provided schema.
  6. Args:
  7. json_data (str): The JSON data as a string.
  8. schema (dict): The JSON schema to validate against.
  9. Returns:
  10. list: A list of anomaly messages. Returns an empty list if no anomalies are found.
  11. """
  12. anomalies = []
  13. try:
  14. data = json.loads(json_data)
  15. validate(instance=data, schema=schema)
  16. except json.JSONDecodeError as e:
  17. anomalies.append(f"JSON Decode Error: {str(e)}")
  18. return anomalies
  19. except ValidationError as e:
  20. anomalies.append(f"Schema Validation Error: {str(e)}")
  21. except Exception as e:
  22. anomalies.append(f"Unexpected Error: {str(e)}")
  23. return anomalies
  24. if __name__ == '__main__':
  25. # Example usage
  26. schema = {
  27. "type": "object",
  28. "properties": {
  29. "timestamp": {"type": "string"},
  30. "status": {"type": "string", "enum": ["success", "failure", "warning"]},
  31. "data": {"type": "object"},
  32. "metric_value": {"type": "number"}
  33. },
  34. "required": ["timestamp", "status", "metric_value"]
  35. }
  36. # Valid JSON
  37. valid_json = '{"timestamp": "2023-10-27T10:00:00Z", "status": "success", "data": {"cpu_usage": 50}, "metric_value": 0.75}'
  38. anomalies = flag_json_anomalies(valid_json, schema)
  39. print(f"Valid JSON Anomalies: {anomalies}")
  40. # Invalid JSON (missing required field)
  41. invalid_json_missing_field = '{"timestamp": "2023-10-27T10:00:00Z", "status": "success", "data": {"cpu_usage": 50}}'
  42. anomalies = flag_json_anomalies(invalid_json_missing_field, schema)
  43. print(f"Invalid JSON Anomalies (Missing Field): {anomalies}")
  44. # Invalid JSON (wrong data type)
  45. invalid_json_wrong_type = '{"timestamp": "2023-10-27T10:00:00Z", "status": "success", "data": {"cpu_usage": "50"}, "metric_value": 0.75}'
  46. anomalies = flag_json_anomalies(invalid_json_wrong_type, schema)
  47. print(f"Invalid JSON Anomalies (Wrong Type): {anomalies}")
  48. #Invalid JSON (invalid enum value)
  49. invalid_json_invalid_enum = '{"timestamp": "2023-10-27T10:00:00Z", "status": "unknown", "data": {"cpu_usage": 50}, "metric_value": 0.75}'
  50. anomalies = flag_json_anomalies(invalid_json_invalid_enum, schema)
  51. print(f"Invalid JSON Anomalies (Invalid Enum): {anomalies}")
  52. #Invalid JSON (invalid format)
  53. invalid_json_invalid_format = '{"timestamp": "2023-10-27 10:00:00", "status": "success", "data": {"cpu_usage": 50}, "metric_value": 0.75}'
  54. anomalies = flag_json_anomalies(invalid_json_invalid_format, schema)
  55. print(f"Invalid JSON Anomalies (Invalid Format): {anomalies}")

Add your comment