1. import java.util.concurrent.*;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. import org.json.JSONObject;
  4. public class JsonResponseQueue {
  5. private final BlockingQueue<JSONObject> taskQueue;
  6. private final int numConsumers;
  7. private final AtomicInteger taskCount = new AtomicInteger(0);
  8. public JsonResponseQueue(int numConsumers) {
  9. this.taskQueue = new LinkedBlockingQueue<>();
  10. this.numConsumers = numConsumers;
  11. }
  12. public void enqueue(JSONObject task) {
  13. taskQueue.offer(task); // Add task to the queue
  14. }
  15. public void startConsumers() {
  16. ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
  17. for (int i = 0; i < numConsumers; i++) {
  18. executor.submit(new Consumer(taskQueue));
  19. }
  20. }
  21. // Consumer class to process JSON responses with retry logic
  22. private class Consumer implements Runnable {
  23. private final BlockingQueue<JSONObject> queue;
  24. public Consumer(BlockingQueue<JSONObject> queue) {
  25. this.queue = queue;
  26. }
  27. @Override
  28. public void run() {
  29. while (true) {
  30. try {
  31. JSONObject task = queue.take(); // Block until a task is available
  32. processTask(task);
  33. } catch (InterruptedException e) {
  34. // Handle interruption (e.g., shutdown signal)
  35. Thread.currentThread().interrupt();
  36. return;
  37. }
  38. }
  39. }
  40. private void processTask(JSONObject task) {
  41. int retryCount = 0;
  42. while (retryCount < 3) { // Retry up to 3 times
  43. try {
  44. // Simulate a network call or processing that might fail
  45. if (simulateFailure()) {
  46. retryCount++;
  47. System.out.println("Task failed, retry attempt: " + retryCount);
  48. Thread.sleep(1000); // Wait before retrying
  49. } else {
  50. System.out.println("Task processed successfully: " + task.toString());
  51. return; // Task completed successfully
  52. }
  53. } catch (Exception e) {
  54. System.err.println("Error processing task: " + e.getMessage());
  55. retryCount++;
  56. Thread.sleep(2000); // Wait before retrying
  57. }
  58. }
  59. System.err.println("Task failed after multiple retries: " + task.toString());
  60. }
  61. private boolean simulateFailure() {
  62. // Simulate a failure condition (e.g., network error)
  63. return Math.random() < 0.3; // 30% chance of failure
  64. }
  65. }
  66. public static void main(String[] args) throws InterruptedException {
  67. JsonResponseQueue queue = new JsonResponseQueue(3); // Create a queue with 3 consumers
  68. // Start the consumers
  69. queue.startConsumers();
  70. // Enqueue some tasks
  71. for (int i = 0; i < 10; i++) {
  72. JSONObject task = new JSONObject().put("id", i).put("data", "some data");
  73. queue.enqueue(task);
  74. Thread.sleep(200); // Simulate task generation
  75. }
  76. // Keep the main thread alive for a while to allow consumers to process tasks
  77. Thread.sleep(5000);
  78. }
  79. }

Add your comment