1. import json
  2. import logging
  3. # Configure logging
  4. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  5. def convert_message(message, input_format, output_format):
  6. """
  7. Converts a message from one queue format to another.
  8. Args:
  9. message (str): The message as a JSON string.
  10. input_format (str): The format of the input message ('raw', 'json', 'kafka').
  11. output_format (str): The desired format of the output message ('raw', 'json', 'kafka').
  12. Returns:
  13. str: The converted message as a JSON string, or None if conversion fails.
  14. """
  15. try:
  16. if input_format == 'json':
  17. data = json.loads(message)
  18. elif input_format == 'kafka':
  19. # Simulate Kafka message format (replace with actual Kafka parsing if needed)
  20. data = json.loads(message) # Assuming Kafka messages are JSON
  21. elif input_format == 'raw':
  22. data = message # treat raw input as a string
  23. else:
  24. logging.error(f"Invalid input format: {input_format}")
  25. return None
  26. if output_format == 'json':
  27. return json.dumps(data)
  28. elif output_format == 'kafka':
  29. # Simulate Kafka message format (replace with actual Kafka serialization if needed)
  30. return json.dumps(data)
  31. elif output_format == 'raw':
  32. return str(data)
  33. else:
  34. logging.error(f"Invalid output format: {output_format}")
  35. return None
  36. except json.JSONDecodeError as e:
  37. logging.error(f"JSON decoding error: {e}")
  38. return None
  39. except Exception as e:
  40. logging.error(f"Conversion error: {e}")
  41. return None
  42. if __name__ == '__main__':
  43. # Example Usage and Edge Case Handling
  44. # Test case 1: JSON to Kafka
  45. message1 = '{"name": "Alice", "age": 30}'
  46. converted_message1 = convert_message(message1, 'json', 'kafka')
  47. if converted_message1:
  48. print(f"Test 1 (JSON to Kafka): {converted_message1}")
  49. else:
  50. print("Test 1 failed.")
  51. # Test case 2: Kafka to JSON
  52. message2 = '{"product": "Laptop", "price": 1200}'
  53. converted_message2 = convert_message(message2, 'kafka', 'json')
  54. if converted_message2:
  55. print(f"Test 2 (Kafka to JSON): {converted_message2}")
  56. else:
  57. print("Test 2 failed.")
  58. # Test case 3: Raw to JSON
  59. message3 = "Name: Bob, Age: 25"
  60. converted_message3 = convert_message(message3, 'raw', 'json')
  61. if converted_message3:
  62. print(f"Test 3 (Raw to JSON): {converted_message3}")
  63. else:
  64. print("Test 3 failed.")
  65. # Test case 4: Invalid format
  66. message4 = '{"city": "New York"}'
  67. converted_message4 = convert_message(message4, 'invalid_format', 'json')
  68. if converted_message4:
  69. print(f"Test 4 (Invalid format): {converted_message4}")
  70. else:
  71. print("Test 4 failed (as expected).")
  72. #Test case 5: Invalid JSON
  73. message5 = '{"city": "New York"' #missing closing brace
  74. converted_message5 = convert_message(message5, 'json', 'json')
  75. if converted_message5:
  76. print(f"Test 5 (Invalid JSON): {converted_message5}")
  77. else:
  78. print("Test 5 failed (as expected).")

Add your comment