diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 8609353400..f4fcc851a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -160,7 +160,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } }); - maintenanceExecutor.scheduleWithFixedDelay(new PurgeExpiredRequestsTask(), 3, 3, TimeUnit.SECONDS); + maintenanceExecutor.scheduleWithFixedDelay(() -> purgeExpiredRequests(), 1, 1, TimeUnit.SECONDS); } @Override @@ -301,9 +301,18 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { verifyState(method, uri.getPath()); } - final int numRequests = responseMap.size(); + int numRequests = responseMap.size(); if (numRequests >= MAX_CONCURRENT_REQUESTS) { - logger.debug("Cannot replicate request because there are {} outstanding HTTP Requests already", numRequests); + numRequests = purgeExpiredRequests(); + } + + if (numRequests >= MAX_CONCURRENT_REQUESTS) { + final Map countsByUri = responseMap.values().stream().collect( + Collectors.groupingBy( + StandardAsyncClusterResponse::getURIPath, + Collectors.counting())); + + logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri); throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); } @@ -741,16 +750,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { void onCompletion(NodeResponse nodeResponse); } - private class PurgeExpiredRequestsTask implements Runnable { - @Override - public void run() { - final Set expiredRequestIds = ThreadPoolRequestReplicator.this.responseMap.entrySet().stream() - .filter(entry -> entry.getValue().isOlderThan(30, TimeUnit.SECONDS)) // older than 30 seconds - .filter(entry -> entry.getValue().isComplete()) // is complete - .map(entry -> entry.getKey()) // get the request id - .collect(Collectors.toSet()); + private synchronized int purgeExpiredRequests() { + final Set expiredRequestIds = ThreadPoolRequestReplicator.this.responseMap.entrySet().stream() + .filter(entry -> entry.getValue().isOlderThan(30, TimeUnit.SECONDS)) // older than 30 seconds + .filter(entry -> entry.getValue().isComplete()) // is complete + .map(entry -> entry.getKey()) // get the request id + .collect(Collectors.toSet()); - expiredRequestIds.forEach(id -> onResponseConsumed(id)); - } + expiredRequestIds.forEach(id -> onResponseConsumed(id)); + return responseMap.size(); } }