1. import queue
  2. import threading
  3. import time
  4. import json
  5. import requests
  6. class TaskQueue:
  7. def __init__(self, staging_url, dry_run=False):
  8. self.queue = queue.Queue() # Queue for tasks
  9. self.staging_url = staging_url # URL for staging environment
  10. self.dry_run = dry_run # Dry-run mode flag
  11. self.lock = threading.Lock() # Lock for thread safety
  12. def enqueue(self, task):
  13. """Adds a task to the queue."""
  14. with self.lock:
  15. self.queue.put(task)
  16. def dequeue(self):
  17. """Retrieves a task from the queue."""
  18. try:
  19. return self.queue.get(timeout=1) # Wait for 1 second
  20. except queue.Empty:
  21. return None
  22. def start_worker(self, worker_id):
  23. """Starts a worker thread to process tasks."""
  24. def worker():
  25. while True:
  26. task = self.dequeue()
  27. if task:
  28. try:
  29. self.process_task(task)
  30. except Exception as e:
  31. print(f"Error processing task: {e}")
  32. else:
  33. time.sleep(0.1) # avoid busy waiting
  34. thread = threading.Thread(target=worker)
  35. thread.daemon = True # Allow the main thread to exit even if workers are running
  36. thread.start()
  37. def process_task(self, task):
  38. """Processes a single web form task."""
  39. try:
  40. # Validate task data
  41. if not isinstance(task, dict) or 'form_data' not in task:
  42. raise ValueError("Invalid task format")
  43. form_data = task['form_data']
  44. form_id = task['form_id']
  45. # Construct the staging URL
  46. url = f"{self.staging_url}/{form_id}"
  47. # Prepare the request
  48. headers = {'Content-Type': 'application/x-www-form-urlencoded'}
  49. response = requests.post(url, data=form_data, headers=headers)
  50. response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
  51. print(f"Task {form_id} successful. Response: {response.text}")
  52. except requests.exceptions.RequestException as e:
  53. print(f"Request error for task {form_id}: {e}")
  54. except ValueError as e:
  55. print(f"Task error {e}")
  56. except Exception as e:
  57. print(f"Unexpected error processing task {form_id}: {e}")
  58. def run(self, num_workers=2):
  59. """Starts the task queue and worker threads."""
  60. self.start_worker(1)
  61. for i in range(2, num_workers + 1):
  62. self.start_worker(i)
  63. if __name__ == '__main__':
  64. # Example usage
  65. staging_url = "https://staging.example.com/forms" # Replace with your staging URL
  66. task_queue = TaskQueue(staging_url, dry_run=False) # Set dry_run to True for testing
  67. # Sample tasks
  68. tasks = [
  69. {"form_id": "form123", "form_data": {"field1": "value1", "field2": "value2"}},
  70. {"form_id": "form456", "form_data": {"field1": "value3", "field2": "value4"}},
  71. {"form_id": "form789", "form_data": {"field1": "value5", "field2": "value6"}},
  72. ]
  73. for task in tasks:
  74. task_queue.enqueue(task)
  75. task_queue.run(num_workers=3)

Add your comment