NIFI-3548: Fixed bug that caused failed requests to not get removed from 'request map' and also results in that preventing the purging logic, which would then unintentially throw exceptions. This closes #1555

This commit is contained in:
Mark Payne 2017-03-02 13:55:15 -05:00 committed by Matt Gilman
parent 5990db39ae
commit 4ed64e7561
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 36 additions and 2 deletions

View File

@ -104,8 +104,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
} }
@Override @Override
public boolean isComplete() { public synchronized boolean isComplete() {
return getMergedResponse() != null; return failure != null || mergedResponse != null || requestsCompleted.get() >= responseMap.size();
} }
@Override @Override
@ -125,6 +125,10 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
public synchronized NodeResponse getMergedResponse(final boolean triggerCallback) { public synchronized NodeResponse getMergedResponse(final boolean triggerCallback) {
if (failure != null) { if (failure != null) {
if (completedResultFetchedCallback != null) {
completedResultFetchedCallback.run();
}
throw failure; throw failure;
} }

View File

@ -77,6 +77,36 @@ public class TestThreadPoolRequestReplicator {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties"); System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
} }
@Test
public void testFailedRequestsAreCleanedUp() {
withReplicator(replicator -> {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false));
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
// We should get back the same response object
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
assertEquals(HttpMethod.GET, response.getMethod());
assertEquals(nodeIds, response.getNodesInvolved());
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS);
assertEquals(8000, nodeResponse.getNodeId().getApiPort());
assertEquals(ClientResponse.Status.FORBIDDEN.getStatusCode(), nodeResponse.getStatus());
assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
}, Status.FORBIDDEN, 0L, null);
}
/** /**
* If we replicate a request, whenever we obtain the merged response from * If we replicate a request, whenever we obtain the merged response from
* the AsyncClusterResponse object, the response should no longer be * the AsyncClusterResponse object, the response should no longer be