1. import java.util.HashMap;
  2. import java.util.Map;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.ScheduledExecutorService;
  6. import java.util.concurrent.TimeUnit;
  7. public class LogStreamDependencyResolver {
  8. private final Map<String, LogStream> logStreams = new ConcurrentHashMap<>();
  9. private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
  10. public void registerLogStream(String streamId, LogStream logStream) {
  11. logStreams.put(streamId, logStream);
  12. }
  13. public boolean validateDependencies(String streamId) {
  14. LogStream logStream = logStreams.get(streamId);
  15. if (logStream == null) {
  16. return false; // Stream not found
  17. }
  18. for (String dependencyId : logStream.getDependencies()) {
  19. LogStream dependency = logStreams.get(dependencyId);
  20. if (dependency == null) {
  21. System.err.println("Dependency " + dependencyId + " not found for stream " + streamId);
  22. return false; // Dependency not found
  23. }
  24. if (!dependency.isValid()) {
  25. System.err.println("Dependency " + dependencyId + " is invalid for stream " + streamId);
  26. return false; // Dependency is invalid
  27. }
  28. }
  29. return true; // All dependencies are valid
  30. }
  31. public void startRetryInterval(String streamId, long intervalSeconds) {
  32. ScheduledExecutorService.scheduleAtFixedRate(() -> {
  33. if (validateDependencies(streamId)) {
  34. System.out.println("Log stream " + streamId + " dependencies validated successfully.");
  35. } else {
  36. System.err.println("Log stream " + streamId + " dependency validation failed. Retrying...");
  37. }
  38. }, 0, intervalSeconds, TimeUnit.SECONDS);
  39. }
  40. // Inner class to represent a log stream
  41. public static class LogStream {
  42. private final String id;
  43. private final String[] dependencies;
  44. public LogStream(String id, String[] dependencies) {
  45. this.id = id;
  46. this.dependencies = dependencies;
  47. }
  48. public String getId() {
  49. return id;
  50. }
  51. public String[] getDependencies() {
  52. return dependencies;
  53. }
  54. public boolean isValid() {
  55. // Simulate validation logic (replace with actual validation)
  56. return true;
  57. }
  58. }
  59. public static void main(String[] args) {
  60. // Example Usage
  61. LogStream streamA = new LogStream("streamA", new String[]{"streamB", "streamC"});
  62. LogStream streamB = new LogStream("streamB", new String[] {});
  63. LogStream streamC = new LogStream("streamC", new String[] {"streamA"});
  64. LogStreamDependencyResolver resolver = new LogStreamDependencyResolver();
  65. resolver.registerLogStream("streamA", streamA);
  66. resolver.registerLogStream("streamB", streamB);
  67. resolver.registerLogStream("streamC", streamC);
  68. resolver.startRetryInterval("streamA", 5); // Retry every 5 seconds
  69. resolver.startRetryInterval("streamC", 10); // Retry every 10 seconds
  70. //Demonstrating validation
  71. System.out.println("Validating streamA dependencies: " + resolver.validateDependencies("streamA"));
  72. System.out.println("Validating streamD dependencies: " + resolver.validateDependencies("streamD")); //Stream D doesn't exist
  73. }
  74. }

Add your comment