NIFI-2324: Log number of requests per URI if we have too many outstanding requests to replicate

This closes #683

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Mark Payne 2016-07-19 16:59:14 -04:00 committed by jpercivall
parent 76fe751c22
commit 108c815988
1 changed files with 20 additions and 13 deletions

View File

@ -160,7 +160,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
});
maintenanceExecutor.scheduleWithFixedDelay(new PurgeExpiredRequestsTask(), 3, 3, TimeUnit.SECONDS);
maintenanceExecutor.scheduleWithFixedDelay(() -> purgeExpiredRequests(), 1, 1, TimeUnit.SECONDS);
}
@Override
@ -301,9 +301,18 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
verifyState(method, uri.getPath());
}
final int numRequests = responseMap.size();
int numRequests = responseMap.size();
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
logger.debug("Cannot replicate request because there are {} outstanding HTTP Requests already", numRequests);
numRequests = purgeExpiredRequests();
}
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
final Map<String, Long> countsByUri = responseMap.values().stream().collect(
Collectors.groupingBy(
StandardAsyncClusterResponse::getURIPath,
Collectors.counting()));
logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
}
@ -741,9 +750,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
void onCompletion(NodeResponse nodeResponse);
}
private class PurgeExpiredRequestsTask implements Runnable {
@Override
public void run() {
private synchronized int purgeExpiredRequests() {
final Set<String> expiredRequestIds = ThreadPoolRequestReplicator.this.responseMap.entrySet().stream()
.filter(entry -> entry.getValue().isOlderThan(30, TimeUnit.SECONDS)) // older than 30 seconds
.filter(entry -> entry.getValue().isComplete()) // is complete
@ -751,6 +758,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
.collect(Collectors.toSet());
expiredRequestIds.forEach(id -> onResponseConsumed(id));
}
return responseMap.size();
}
}