import json
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def convert_message(message, input_format, output_format):
"""
Converts a message from one queue format to another.
Args:
message (str): The message as a JSON string.
input_format (str): The format of the input message ('raw', 'json', 'kafka').
output_format (str): The desired format of the output message ('raw', 'json', 'kafka').
Returns:
str: The converted message as a JSON string, or None if conversion fails.
"""
try:
if input_format == 'json':
data = json.loads(message)
elif input_format == 'kafka':
# Simulate Kafka message format (replace with actual Kafka parsing if needed)
data = json.loads(message) # Assuming Kafka messages are JSON
elif input_format == 'raw':
data = message # treat raw input as a string
else:
logging.error(f"Invalid input format: {input_format}")
return None
if output_format == 'json':
return json.dumps(data)
elif output_format == 'kafka':
# Simulate Kafka message format (replace with actual Kafka serialization if needed)
return json.dumps(data)
elif output_format == 'raw':
return str(data)
else:
logging.error(f"Invalid output format: {output_format}")
return None
except json.JSONDecodeError as e:
logging.error(f"JSON decoding error: {e}")
return None
except Exception as e:
logging.error(f"Conversion error: {e}")
return None
if __name__ == '__main__':
# Example Usage and Edge Case Handling
# Test case 1: JSON to Kafka
message1 = '{"name": "Alice", "age": 30}'
converted_message1 = convert_message(message1, 'json', 'kafka')
if converted_message1:
print(f"Test 1 (JSON to Kafka): {converted_message1}")
else:
print("Test 1 failed.")
# Test case 2: Kafka to JSON
message2 = '{"product": "Laptop", "price": 1200}'
converted_message2 = convert_message(message2, 'kafka', 'json')
if converted_message2:
print(f"Test 2 (Kafka to JSON): {converted_message2}")
else:
print("Test 2 failed.")
# Test case 3: Raw to JSON
message3 = "Name: Bob, Age: 25"
converted_message3 = convert_message(message3, 'raw', 'json')
if converted_message3:
print(f"Test 3 (Raw to JSON): {converted_message3}")
else:
print("Test 3 failed.")
# Test case 4: Invalid format
message4 = '{"city": "New York"}'
converted_message4 = convert_message(message4, 'invalid_format', 'json')
if converted_message4:
print(f"Test 4 (Invalid format): {converted_message4}")
else:
print("Test 4 failed (as expected).")
#Test case 5: Invalid JSON
message5 = '{"city": "New York"' #missing closing brace
converted_message5 = convert_message(message5, 'json', 'json')
if converted_message5:
print(f"Test 5 (Invalid JSON): {converted_message5}")
else:
print("Test 5 failed (as expected).")
Add your comment