import json
import time
import threading
import logging
from collections import deque
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class PayloadScheduler:
def __init__(self, max_payload_size_kb=1024):
self.payloads = deque() # Use deque for efficient adding/removing from both ends
self.max_payload_size_kb = max_payload_size_kb
self.running = False
def add_payload(self, payload_json, execution_delay_sec):
"""Adds a JSON payload and its execution delay to the scheduler."""
try:
payload = json.loads(payload_json)
except json.JSONDecodeError:
logging.error("Invalid JSON payload.")
return False
if self._payload_size_exceeds_limit(payload_json):
logging.warning("Payload size exceeds the limit. Will be skipped.")
return False
self.payloads.append((payload, execution_delay_sec))
logging.info(f"Payload added. Executing in {execution_delay_sec} seconds.")
return True
def _payload_size_exceeds_limit(self, payload_json):
"""Checks if the payload size exceeds the defined limit.
This is a basic implementation and can be improved.
"""
try:
payload = json.loads(payload_json)
payload_size_kb = len(json.dumps(payload).encode('utf-8')) / 1024
return payload_size_kb > self.max_payload_size_kb
except (json.JSONDecodeError, TypeError):
return True # Assume large if invalid JSON
def _execute_payload(self, payload):
"""Executes the given payload."""
try:
logging.info(f"Executing payload: {payload}")
# Replace this with your actual payload execution logic
# This is just a placeholder.
# Example:
# exec(payload) # Use with caution!
time.sleep(2) # Simulate some work
logging.info(f"Payload execution completed: {payload}")
except Exception as e:
logging.error(f"Error executing payload: {e}")
def run(self):
"""Starts the scheduler in a separate thread."""
self.running = True
while self.running:
if self.payloads:
payload, delay = self.payloads.popleft()
time.sleep(delay)
threading.Thread(target=self._execute_payload, args=(payload,)).start()
else:
time.sleep(0.1) # Check for new payloads periodically
def stop(self):
"""Stops the scheduler."""
self.running = False
logging.info("Scheduler stopped.")
if __name__ == '__main__':
scheduler = PayloadScheduler()
# Example usage:
payload1 = '{"message": "Hello from payload 1"}'
payload2 = '{"number": 123}'
payload3 = '{"data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}' #potentially large
scheduler.add_payload(payload1, 1) # Execute after 1 second
scheduler.add_payload(payload2, 3) # Execute after 3 seconds
scheduler.add_payload(payload3, 5) # Execute after 5 seconds
scheduler.run()
time.sleep(10) # Let the scheduler run for a while
scheduler.stop()
Add your comment