import java.io.File;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class DirectoryScheduler {
private final int maxConcurrentDirectories;
private final int rateLimitSeconds;
private final BlockingQueue<File> directoryQueue;
private final Semaphore semaphore;
private final AtomicInteger executionCount = new AtomicInteger(0);
private final ExecutorService executorService;
public DirectoryScheduler(int maxConcurrentDirectories, int rateLimitSeconds) {
this(maxConcurrentDirectories, rateLimitSeconds, Executors.newFixedThreadPool(maxConcurrentDirectories));
}
public DirectoryScheduler(int maxConcurrentDirectories, int rateLimitSeconds, ExecutorService executorService) {
this.maxConcurrentDirectories = maxConcurrentDirectories;
this.rateLimitSeconds = rateLimitSeconds;
this.directoryQueue = new LinkedBlockingQueue<>();
this.semaphore = new Semaphore(maxConcurrentDirectories);
this.executorService = executorService;
}
public synchronized void scheduleDirectory(File directory) {
directoryQueue.offer(directory);
try {
scheduleNext();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Scheduler interrupted: " + e.getMessage());
}
}
private void scheduleNext() throws InterruptedException {
if (executionCount.get() >= maxConcurrentDirectories) {
return;
}
long startTime = System.currentTimeMillis();
semaphore.acquire(); // Wait for a slot to become available
try {
executorService.submit(() -> {
try {
processDirectory(directoryQueue.take()); // Take directory from queue
System.out.println("Directory processed: " + directoryQueue.take().getAbsolutePath());
} catch (Exception e) {
System.err.println("Error processing directory: " + e.getMessage());
} finally {
semaphore.release(); // Release the semaphore after processing
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Semaphore Interrupted: " + e.getMessage());
}
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
if (duration < rateLimitSeconds * 1000) {
Thread.sleep(rateLimitSeconds * 1000 - duration); // Sleep to maintain rate limit
}
}
private void processDirectory(File directory) throws Exception {
// Simulate directory processing
try {
Thread.sleep(2000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new Exception("Interrupted during directory processing");
}
}
public void shutdown() {
executorService.shutdown();
}
public static void main(String[] args) throws InterruptedException {
// Example usage
int maxConcurrent = 3;
int rateLimit = 1;
DirectoryScheduler scheduler = new DirectoryScheduler(maxConcurrent, rateLimit);
// Simulate adding directories to schedule
File dir1 = new File("dir1");
File dir2 = new File("dir2");
File dir3 = new File("dir3");
File dir4 = new File("dir4");
File dir5 = new File("dir5");
scheduler.scheduleDirectory(dir1);
scheduler.scheduleDirectory(dir2);
scheduler.scheduleDirectory(dir3);
scheduler.scheduleDirectory(dir4);
scheduler.scheduleDirectory(dir5);
Thread.sleep(10000); // Let the scheduler run for a while
scheduler.shutdown();
}
}
Add your comment