1. import java.io.File;
  2. import java.util.concurrent.*;
  3. import java.util.concurrent.atomic.AtomicInteger;
  4. public class DirectoryScheduler {
  5. private final int maxConcurrentDirectories;
  6. private final int rateLimitSeconds;
  7. private final BlockingQueue<File> directoryQueue;
  8. private final Semaphore semaphore;
  9. private final AtomicInteger executionCount = new AtomicInteger(0);
  10. private final ExecutorService executorService;
  11. public DirectoryScheduler(int maxConcurrentDirectories, int rateLimitSeconds) {
  12. this(maxConcurrentDirectories, rateLimitSeconds, Executors.newFixedThreadPool(maxConcurrentDirectories));
  13. }
  14. public DirectoryScheduler(int maxConcurrentDirectories, int rateLimitSeconds, ExecutorService executorService) {
  15. this.maxConcurrentDirectories = maxConcurrentDirectories;
  16. this.rateLimitSeconds = rateLimitSeconds;
  17. this.directoryQueue = new LinkedBlockingQueue<>();
  18. this.semaphore = new Semaphore(maxConcurrentDirectories);
  19. this.executorService = executorService;
  20. }
  21. public synchronized void scheduleDirectory(File directory) {
  22. directoryQueue.offer(directory);
  23. try {
  24. scheduleNext();
  25. } catch (InterruptedException e) {
  26. Thread.currentThread().interrupt();
  27. System.err.println("Scheduler interrupted: " + e.getMessage());
  28. }
  29. }
  30. private void scheduleNext() throws InterruptedException {
  31. if (executionCount.get() >= maxConcurrentDirectories) {
  32. return;
  33. }
  34. long startTime = System.currentTimeMillis();
  35. semaphore.acquire(); // Wait for a slot to become available
  36. try {
  37. executorService.submit(() -> {
  38. try {
  39. processDirectory(directoryQueue.take()); // Take directory from queue
  40. System.out.println("Directory processed: " + directoryQueue.take().getAbsolutePath());
  41. } catch (Exception e) {
  42. System.err.println("Error processing directory: " + e.getMessage());
  43. } finally {
  44. semaphore.release(); // Release the semaphore after processing
  45. }
  46. });
  47. } catch (InterruptedException e) {
  48. Thread.currentThread().interrupt();
  49. System.err.println("Semaphore Interrupted: " + e.getMessage());
  50. }
  51. long endTime = System.currentTimeMillis();
  52. long duration = endTime - startTime;
  53. if (duration < rateLimitSeconds * 1000) {
  54. Thread.sleep(rateLimitSeconds * 1000 - duration); // Sleep to maintain rate limit
  55. }
  56. }
  57. private void processDirectory(File directory) throws Exception {
  58. // Simulate directory processing
  59. try {
  60. Thread.sleep(2000); // Simulate work
  61. } catch (InterruptedException e) {
  62. Thread.currentThread().interrupt();
  63. throw new Exception("Interrupted during directory processing");
  64. }
  65. }
  66. public void shutdown() {
  67. executorService.shutdown();
  68. }
  69. public static void main(String[] args) throws InterruptedException {
  70. // Example usage
  71. int maxConcurrent = 3;
  72. int rateLimit = 1;
  73. DirectoryScheduler scheduler = new DirectoryScheduler(maxConcurrent, rateLimit);
  74. // Simulate adding directories to schedule
  75. File dir1 = new File("dir1");
  76. File dir2 = new File("dir2");
  77. File dir3 = new File("dir3");
  78. File dir4 = new File("dir4");
  79. File dir5 = new File("dir5");
  80. scheduler.scheduleDirectory(dir1);
  81. scheduler.scheduleDirectory(dir2);
  82. scheduler.scheduleDirectory(dir3);
  83. scheduler.scheduleDirectory(dir4);
  84. scheduler.scheduleDirectory(dir5);
  85. Thread.sleep(10000); // Let the scheduler run for a while
  86. scheduler.shutdown();
  87. }
  88. }

Add your comment