import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.json.JSONObject;
public class JsonResponseQueue {
private final BlockingQueue<JSONObject> taskQueue;
private final int numConsumers;
private final AtomicInteger taskCount = new AtomicInteger(0);
public JsonResponseQueue(int numConsumers) {
this.taskQueue = new LinkedBlockingQueue<>();
this.numConsumers = numConsumers;
}
public void enqueue(JSONObject task) {
taskQueue.offer(task); // Add task to the queue
}
public void startConsumers() {
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
for (int i = 0; i < numConsumers; i++) {
executor.submit(new Consumer(taskQueue));
}
}
// Consumer class to process JSON responses with retry logic
private class Consumer implements Runnable {
private final BlockingQueue<JSONObject> queue;
public Consumer(BlockingQueue<JSONObject> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
JSONObject task = queue.take(); // Block until a task is available
processTask(task);
} catch (InterruptedException e) {
// Handle interruption (e.g., shutdown signal)
Thread.currentThread().interrupt();
return;
}
}
}
private void processTask(JSONObject task) {
int retryCount = 0;
while (retryCount < 3) { // Retry up to 3 times
try {
// Simulate a network call or processing that might fail
if (simulateFailure()) {
retryCount++;
System.out.println("Task failed, retry attempt: " + retryCount);
Thread.sleep(1000); // Wait before retrying
} else {
System.out.println("Task processed successfully: " + task.toString());
return; // Task completed successfully
}
} catch (Exception e) {
System.err.println("Error processing task: " + e.getMessage());
retryCount++;
Thread.sleep(2000); // Wait before retrying
}
}
System.err.println("Task failed after multiple retries: " + task.toString());
}
private boolean simulateFailure() {
// Simulate a failure condition (e.g., network error)
return Math.random() < 0.3; // 30% chance of failure
}
}
public static void main(String[] args) throws InterruptedException {
JsonResponseQueue queue = new JsonResponseQueue(3); // Create a queue with 3 consumers
// Start the consumers
queue.startConsumers();
// Enqueue some tasks
for (int i = 0; i < 10; i++) {
JSONObject task = new JSONObject().put("id", i).put("data", "some data");
queue.enqueue(task);
Thread.sleep(200); // Simulate task generation
}
// Keep the main thread alive for a while to allow consumers to process tasks
Thread.sleep(5000);
}
}
Add your comment