NIFI-3668: Fix purging expired replicate requests.

This closes #1646.

Newly created async response is added before checking map size nor
purging expired ones. If there are already 100 remaining requests,
the added request will not be executed nor removed.
This commit is contained in:
Koji Kawamura 2017-04-03 15:56:21 +09:00 committed by Mark Payne
parent 84f1fb3959
commit 5e62b4ae72
1 changed files with 39 additions and 35 deletions

View File

@ -316,11 +316,47 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
}
// verify all of the nodes exist and are in the proper state
for (final NodeIdentifier nodeId : nodeIds) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
if (status == null) {
throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster");
}
if (status.getState() != NodeConnectionState.CONNECTED) {
throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected");
}
}
logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
// Update headers to indicate the current revision so that we can
// prevent multiple users changing the flow at the same time
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
long verifyClusterStateNanos = -1;
if (performVerification) {
final long start = System.nanoTime();
verifyClusterState(method, uri.getPath());
verifyClusterStateNanos = System.nanoTime() - start;
}
int numRequests = responseMap.size();
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
numRequests = purgeExpiredRequests();
}
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
final Map<String, Long> countsByUri = responseMap.values().stream().collect(
Collectors.groupingBy(
StandardAsyncClusterResponse::getURIPath,
Collectors.counting()));
logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
}
// create a response object if one was not already passed to us
if (response == null) {
// create the request objects and replicate to all nodes.
@ -342,44 +378,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
responseMapper, completionCallback, responseConsumedCallback, merge);
responseMapper, completionCallback, responseConsumedCallback, merge);
responseMap.put(requestId, response);
}
// verify all of the nodes exist and are in the proper state
for (final NodeIdentifier nodeId : nodeIds) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
if (status == null) {
throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster");
}
if (status.getState() != NodeConnectionState.CONNECTED) {
throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected");
}
}
logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
if (performVerification) {
final long start = System.nanoTime();
verifyClusterState(method, uri.getPath());
final long nanos = System.nanoTime() - start;
response.addTiming("Verify Cluster State", "All Nodes", nanos);
}
int numRequests = responseMap.size();
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
numRequests = purgeExpiredRequests();
}
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
final Map<String, Long> countsByUri = responseMap.values().stream().collect(
Collectors.groupingBy(
StandardAsyncClusterResponse::getURIPath,
Collectors.counting()));
logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
if (verifyClusterStateNanos > -1) {
response.addTiming("Verify Cluster State", "All Nodes", verifyClusterStateNanos);
}
logger.debug("For Request ID {}, response object is {}", requestId, response);