import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
class TaskValidator {
/**
* Batches operations from a list of task queues for validation checks.
*
* @param queues A list of task queues. Each queue contains tasks to be validated.
* @param validationFunction A function to validate a single task.
* @param batchSize The size of the batch to process at a time.
* @return A list of validation results.
*/
public static List<String> batchValidate(List<BlockingQueue<Object>> queues, ValidationFunction validationFunction, int batchSize) {
List<String> results = new ArrayList<>(); // Store validation results
ExecutorService executor = Executors.newFixedThreadPool(queues.size()); // Create a thread pool
for (BlockingQueue<Object> queue : queues) {
executor.submit(() -> { // Submit tasks from each queue to the thread pool
while (true) {
try {
Object task = queue.take(); // Retrieve a task from the queue
if (task == null) {
break; // Exit if a null task is encountered (queue is empty)
}
String validationResult = validationFunction.validate(task); // Validate the task
results.add(validationResult); // Add the result to the list
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupted status
break;
}
}
});
}
executor.shutdown(); // Shutdown the thread pool
try {
executor.awaitTermination(Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS); // Wait for all tasks to complete
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupted status
}
return results; // Return the list of validation results
}
/**
* Functional interface for validation functions.
*/
@FunctionalInterface
public interface ValidationFunction {
String validate(Object task); // Validates a single task and returns a string result
}
public static void main(String[] args) {
// Example usage
BlockingQueue<Object> queue1 = new LinkedBlockingQueue<>();
BlockingQueue<Object> queue2 = new LinkedBlockingQueue<>();
// Add some sample tasks to the queues
queue1.add("task1");
queue1.add("task2");
queue2.add("task3");
queue2.add("task4");
List<BlockingQueue<Object>> queues = List.of(queue1, queue2);
int batchSize = 1;
List<String> validationResults = batchValidate(queues, task -> {
// Simulate a validation check
if (task.equals("task1")) {
return "task1 is valid";
} else if (task.equals("task2")) {
return "task2 is invalid";
} else if (task.equals("task3")) {
return "task3 is valid";
} else {
return "task4 is invalid";
}
}, batchSize);
// Print the validation results
for (String result : validationResults) {
System.out.println(result);
}
}
}
Add your comment