import json
from jsonschema import validate, ValidationError
def flag_json_anomalies(json_data, schema):
"""
Flags anomalies in JSON responses based on a provided schema.
Args:
json_data (str): The JSON data as a string.
schema (dict): The JSON schema to validate against.
Returns:
list: A list of anomaly messages. Returns an empty list if no anomalies are found.
"""
anomalies = []
try:
data = json.loads(json_data)
validate(instance=data, schema=schema)
except json.JSONDecodeError as e:
anomalies.append(f"JSON Decode Error: {str(e)}")
return anomalies
except ValidationError as e:
anomalies.append(f"Schema Validation Error: {str(e)}")
except Exception as e:
anomalies.append(f"Unexpected Error: {str(e)}")
return anomalies
if __name__ == '__main__':
# Example usage
schema = {
"type": "object",
"properties": {
"timestamp": {"type": "string"},
"status": {"type": "string", "enum": ["success", "failure", "warning"]},
"data": {"type": "object"},
"metric_value": {"type": "number"}
},
"required": ["timestamp", "status", "metric_value"]
}
# Valid JSON
valid_json = '{"timestamp": "2023-10-27T10:00:00Z", "status": "success", "data": {"cpu_usage": 50}, "metric_value": 0.75}'
anomalies = flag_json_anomalies(valid_json, schema)
print(f"Valid JSON Anomalies: {anomalies}")
# Invalid JSON (missing required field)
invalid_json_missing_field = '{"timestamp": "2023-10-27T10:00:00Z", "status": "success", "data": {"cpu_usage": 50}}'
anomalies = flag_json_anomalies(invalid_json_missing_field, schema)
print(f"Invalid JSON Anomalies (Missing Field): {anomalies}")
# Invalid JSON (wrong data type)
invalid_json_wrong_type = '{"timestamp": "2023-10-27T10:00:00Z", "status": "success", "data": {"cpu_usage": "50"}, "metric_value": 0.75}'
anomalies = flag_json_anomalies(invalid_json_wrong_type, schema)
print(f"Invalid JSON Anomalies (Wrong Type): {anomalies}")
#Invalid JSON (invalid enum value)
invalid_json_invalid_enum = '{"timestamp": "2023-10-27T10:00:00Z", "status": "unknown", "data": {"cpu_usage": 50}, "metric_value": 0.75}'
anomalies = flag_json_anomalies(invalid_json_invalid_enum, schema)
print(f"Invalid JSON Anomalies (Invalid Enum): {anomalies}")
#Invalid JSON (invalid format)
invalid_json_invalid_format = '{"timestamp": "2023-10-27 10:00:00", "status": "success", "data": {"cpu_usage": 50}, "metric_value": 0.75}'
anomalies = flag_json_anomalies(invalid_json_invalid_format, schema)
print(f"Invalid JSON Anomalies (Invalid Format): {anomalies}")
Add your comment