1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.LinkedBlockingQueue;
  7. class TaskValidator {
  8. /**
  9. * Batches operations from a list of task queues for validation checks.
  10. *
  11. * @param queues A list of task queues. Each queue contains tasks to be validated.
  12. * @param validationFunction A function to validate a single task.
  13. * @param batchSize The size of the batch to process at a time.
  14. * @return A list of validation results.
  15. */
  16. public static List<String> batchValidate(List<BlockingQueue<Object>> queues, ValidationFunction validationFunction, int batchSize) {
  17. List<String> results = new ArrayList<>(); // Store validation results
  18. ExecutorService executor = Executors.newFixedThreadPool(queues.size()); // Create a thread pool
  19. for (BlockingQueue<Object> queue : queues) {
  20. executor.submit(() -> { // Submit tasks from each queue to the thread pool
  21. while (true) {
  22. try {
  23. Object task = queue.take(); // Retrieve a task from the queue
  24. if (task == null) {
  25. break; // Exit if a null task is encountered (queue is empty)
  26. }
  27. String validationResult = validationFunction.validate(task); // Validate the task
  28. results.add(validationResult); // Add the result to the list
  29. } catch (InterruptedException e) {
  30. Thread.currentThread().interrupt(); // Restore interrupted status
  31. break;
  32. }
  33. }
  34. });
  35. }
  36. executor.shutdown(); // Shutdown the thread pool
  37. try {
  38. executor.awaitTermination(Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS); // Wait for all tasks to complete
  39. } catch (InterruptedException e) {
  40. Thread.currentThread().interrupt(); // Restore interrupted status
  41. }
  42. return results; // Return the list of validation results
  43. }
  44. /**
  45. * Functional interface for validation functions.
  46. */
  47. @FunctionalInterface
  48. public interface ValidationFunction {
  49. String validate(Object task); // Validates a single task and returns a string result
  50. }
  51. public static void main(String[] args) {
  52. // Example usage
  53. BlockingQueue<Object> queue1 = new LinkedBlockingQueue<>();
  54. BlockingQueue<Object> queue2 = new LinkedBlockingQueue<>();
  55. // Add some sample tasks to the queues
  56. queue1.add("task1");
  57. queue1.add("task2");
  58. queue2.add("task3");
  59. queue2.add("task4");
  60. List<BlockingQueue<Object>> queues = List.of(queue1, queue2);
  61. int batchSize = 1;
  62. List<String> validationResults = batchValidate(queues, task -> {
  63. // Simulate a validation check
  64. if (task.equals("task1")) {
  65. return "task1 is valid";
  66. } else if (task.equals("task2")) {
  67. return "task2 is invalid";
  68. } else if (task.equals("task3")) {
  69. return "task3 is valid";
  70. } else {
  71. return "task4 is invalid";
  72. }
  73. }, batchSize);
  74. // Print the validation results
  75. for (String result : validationResults) {
  76. System.out.println(result);
  77. }
  78. }
  79. }

Add your comment