import queue
import threading
import time
import json
import requests
class TaskQueue:
def __init__(self, staging_url, dry_run=False):
self.queue = queue.Queue() # Queue for tasks
self.staging_url = staging_url # URL for staging environment
self.dry_run = dry_run # Dry-run mode flag
self.lock = threading.Lock() # Lock for thread safety
def enqueue(self, task):
"""Adds a task to the queue."""
with self.lock:
self.queue.put(task)
def dequeue(self):
"""Retrieves a task from the queue."""
try:
return self.queue.get(timeout=1) # Wait for 1 second
except queue.Empty:
return None
def start_worker(self, worker_id):
"""Starts a worker thread to process tasks."""
def worker():
while True:
task = self.dequeue()
if task:
try:
self.process_task(task)
except Exception as e:
print(f"Error processing task: {e}")
else:
time.sleep(0.1) # avoid busy waiting
thread = threading.Thread(target=worker)
thread.daemon = True # Allow the main thread to exit even if workers are running
thread.start()
def process_task(self, task):
"""Processes a single web form task."""
try:
# Validate task data
if not isinstance(task, dict) or 'form_data' not in task:
raise ValueError("Invalid task format")
form_data = task['form_data']
form_id = task['form_id']
# Construct the staging URL
url = f"{self.staging_url}/{form_id}"
# Prepare the request
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.post(url, data=form_data, headers=headers)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
print(f"Task {form_id} successful. Response: {response.text}")
except requests.exceptions.RequestException as e:
print(f"Request error for task {form_id}: {e}")
except ValueError as e:
print(f"Task error {e}")
except Exception as e:
print(f"Unexpected error processing task {form_id}: {e}")
def run(self, num_workers=2):
"""Starts the task queue and worker threads."""
self.start_worker(1)
for i in range(2, num_workers + 1):
self.start_worker(i)
if __name__ == '__main__':
# Example usage
staging_url = "https://staging.example.com/forms" # Replace with your staging URL
task_queue = TaskQueue(staging_url, dry_run=False) # Set dry_run to True for testing
# Sample tasks
tasks = [
{"form_id": "form123", "form_data": {"field1": "value1", "field2": "value2"}},
{"form_id": "form456", "form_data": {"field1": "value3", "field2": "value4"}},
{"form_id": "form789", "form_data": {"field1": "value5", "field2": "value6"}},
]
for task in tasks:
task_queue.enqueue(task)
task_queue.run(num_workers=3)
Add your comment