1. import json
  2. def batch_json_dry_run(json_responses, operations):
  3. """
  4. Performs batch operations on a list of JSON responses for dry-run scenarios.
  5. Args:
  6. json_responses (list): A list of JSON strings or dictionaries.
  7. operations (list): A list of operations to perform on each JSON response.
  8. Each operation is a dictionary with 'type' and 'value' keys.
  9. 'type' can be 'replace', 'add', or 'remove'.
  10. 'value' contains the operation's details.
  11. Returns:
  12. list: A list of modified JSON responses (strings or dictionaries).
  13. """
  14. modified_responses = []
  15. for i, response in enumerate(json_responses):
  16. try:
  17. # Load JSON if it's a string
  18. if isinstance(response, str):
  19. data = json.loads(response)
  20. else:
  21. data = response # Assume it's already a dictionary
  22. for operation in operations:
  23. operation_type = operation['type']
  24. if operation_type == 'replace':
  25. # Replace a key's value
  26. key = operation['value']['key']
  27. new_value = operation['value']['value']
  28. if key in data:
  29. data[key] = new_value
  30. else:
  31. print(f"Warning: Key '{key}' not found in response {i+1}")
  32. elif operation_type == 'add':
  33. # Add a new key-value pair
  34. key = operation['value']['key']
  35. value = operation['value']['value']
  36. data[key] = value
  37. elif operation_type == 'remove':
  38. # Remove a key
  39. key = operation['value']['key']
  40. if key in data:
  41. del data[key]
  42. else:
  43. print(f"Warning: Key '{key}' not found in response {i+1}")
  44. else:
  45. print(f"Warning: Unknown operation type '{operation_type}'")
  46. # Convert back to JSON string if the original was a string
  47. if isinstance(response, str):
  48. modified_responses.append(json.dumps(data, indent=4)) # indent for readability
  49. else:
  50. modified_responses.append(data)
  51. except json.JSONDecodeError:
  52. print(f"Error decoding JSON in response {i+1}")
  53. modified_responses.append(response) #Return original if decode fails.
  54. except Exception as e:
  55. print(f"An unexpected error occurred while processing response {i+1}: {e}")
  56. modified_responses.append(response) #Return original if any other error.
  57. return modified_responses
  58. if __name__ == '__main__':
  59. # Example usage
  60. json_responses = [
  61. '{"name": "Alice", "age": 30}',
  62. {"city": "New York", "population": 8400000}
  63. ]
  64. operations = [
  65. {'type': 'replace', 'value': {'key': 'age', 'value': 31}},
  66. {'type': 'add', 'value': {'key': 'occupation', 'value': 'Engineer'}},
  67. {'type': 'remove', 'value': {'key': 'population'}} #remove population from the second response
  68. ]
  69. modified_responses = batch_json_dry_run(json_responses, operations)
  70. for i, response in enumerate(modified_responses):
  71. print(f"Modified Response {i+1}:\n{response}\n")

Add your comment