1. import java.util.HashMap;
  2. import java.util.Map;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. public class ApiPayloadWrapper {
  6. private static final ConcurrentHashMap<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();
  7. public static <T> T wrapPayload(String apiEndpoint, Runnable payloadHandler, int maxRequestsPerSecond) {
  8. // Get or create a rate limiter for the endpoint.
  9. RateLimiter limiter = rateLimiters.computeIfAbsent(apiEndpoint, k -> new RateLimiter(maxRequestsPerSecond));
  10. // Acquire a token from the rate limiter. Blocks until a token is available.
  11. try {
  12. limiter.acquire();
  13. } catch (InterruptedException e) {
  14. Thread.currentThread().interrupt(); // Restore interrupted status
  15. throw new RuntimeException("Rate limit interrupted", e);
  16. }
  17. // Execute the payload handler.
  18. try {
  19. payloadHandler.run();
  20. } catch (Exception e) {
  21. //Wrap the exception with a maintenance error.
  22. throw new MaintenanceError("API Payload Error: " + e.getMessage(), e);
  23. }
  24. return null; // Return null or the result of the payload handler, if applicable.
  25. }
  26. public static class RateLimiter {
  27. private final int maxRequestsPerSecond;
  28. private final AtomicInteger requestCount = new AtomicInteger(0);
  29. private long lastResetTime = System.currentTimeMillis();
  30. public RateLimiter(int maxRequestsPerSecond) {
  31. this.maxRequestsPerSecond = maxRequestsPerSecond;
  32. }
  33. public synchronized boolean acquire() throws InterruptedException {
  34. long currentTime = System.currentTimeMillis();
  35. if (currentTime - lastResetTime > 1000) {
  36. // Reset the counter if a second has passed.
  37. requestCount.set(0);
  38. lastResetTime = currentTime;
  39. }
  40. if (requestCount.get() < maxRequestsPerSecond) {
  41. requestCount.incrementAndGet();
  42. return true;
  43. } else {
  44. // Wait until a token is available.
  45. long sleepTime = 1000 - (currentTime - lastResetTime) % 1000; //Ensure we don't wait longer than 1s
  46. Thread.sleep(sleepTime);
  47. return acquire(); //Recursive call to try again
  48. }
  49. }
  50. }
  51. public static class MaintenanceError extends RuntimeException {
  52. private final Throwable cause;
  53. public MaintenanceError(String message, Throwable cause) {
  54. super(message);
  55. this.cause = cause;
  56. }
  57. public Throwable getCause() {
  58. return cause;
  59. }
  60. }
  61. public static void main(String[] args) {
  62. // Example Usage
  63. try {
  64. // Simulate an API endpoint.
  65. Runnable apiCall = () -> {
  66. System.out.println("Making API call...");
  67. //Simulate a possible error
  68. if(Math.random() < 0.2){
  69. throw new RuntimeException("Simulated API error");
  70. }
  71. System.out.println("API call completed.");
  72. };
  73. // Wrap the API call with a rate limiter.
  74. T result = wrapPayload("exampleEndpoint", apiCall, 5); // 5 requests per second
  75. if (result != null) {
  76. System.out.println("API call result: " + result);
  77. }
  78. } catch (MaintenanceError e) {
  79. System.err.println("Maintenance Error: " + e.getMessage());
  80. System.err.println("Caused by: " + e.getCause().getMessage());
  81. }
  82. }
  83. }

Add your comment