1. import json
  2. import time
  3. import threading
  4. import logging
  5. from collections import deque
  6. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  7. class PayloadScheduler:
  8. def __init__(self, max_payload_size_kb=1024):
  9. self.payloads = deque() # Use deque for efficient adding/removing from both ends
  10. self.max_payload_size_kb = max_payload_size_kb
  11. self.running = False
  12. def add_payload(self, payload_json, execution_delay_sec):
  13. """Adds a JSON payload and its execution delay to the scheduler."""
  14. try:
  15. payload = json.loads(payload_json)
  16. except json.JSONDecodeError:
  17. logging.error("Invalid JSON payload.")
  18. return False
  19. if self._payload_size_exceeds_limit(payload_json):
  20. logging.warning("Payload size exceeds the limit. Will be skipped.")
  21. return False
  22. self.payloads.append((payload, execution_delay_sec))
  23. logging.info(f"Payload added. Executing in {execution_delay_sec} seconds.")
  24. return True
  25. def _payload_size_exceeds_limit(self, payload_json):
  26. """Checks if the payload size exceeds the defined limit.
  27. This is a basic implementation and can be improved.
  28. """
  29. try:
  30. payload = json.loads(payload_json)
  31. payload_size_kb = len(json.dumps(payload).encode('utf-8')) / 1024
  32. return payload_size_kb > self.max_payload_size_kb
  33. except (json.JSONDecodeError, TypeError):
  34. return True # Assume large if invalid JSON
  35. def _execute_payload(self, payload):
  36. """Executes the given payload."""
  37. try:
  38. logging.info(f"Executing payload: {payload}")
  39. # Replace this with your actual payload execution logic
  40. # This is just a placeholder.
  41. # Example:
  42. # exec(payload) # Use with caution!
  43. time.sleep(2) # Simulate some work
  44. logging.info(f"Payload execution completed: {payload}")
  45. except Exception as e:
  46. logging.error(f"Error executing payload: {e}")
  47. def run(self):
  48. """Starts the scheduler in a separate thread."""
  49. self.running = True
  50. while self.running:
  51. if self.payloads:
  52. payload, delay = self.payloads.popleft()
  53. time.sleep(delay)
  54. threading.Thread(target=self._execute_payload, args=(payload,)).start()
  55. else:
  56. time.sleep(0.1) # Check for new payloads periodically
  57. def stop(self):
  58. """Stops the scheduler."""
  59. self.running = False
  60. logging.info("Scheduler stopped.")
  61. if __name__ == '__main__':
  62. scheduler = PayloadScheduler()
  63. # Example usage:
  64. payload1 = '{"message": "Hello from payload 1"}'
  65. payload2 = '{"number": 123}'
  66. payload3 = '{"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}' #potentially large
  67. scheduler.add_payload(payload1, 1) # Execute after 1 second
  68. scheduler.add_payload(payload2, 3) # Execute after 3 seconds
  69. scheduler.add_payload(payload3, 5) # Execute after 5 seconds
  70. scheduler.run()
  71. time.sleep(10) # Let the scheduler run for a while
  72. scheduler.stop()

Add your comment