diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java index 1d4ea69460..318b1a0737 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java @@ -104,8 +104,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { } @Override - public boolean isComplete() { - return getMergedResponse() != null; + public synchronized boolean isComplete() { + return failure != null || mergedResponse != null || requestsCompleted.get() >= responseMap.size(); } @Override @@ -125,6 +125,10 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { public synchronized NodeResponse getMergedResponse(final boolean triggerCallback) { if (failure != null) { + if (completedResultFetchedCallback != null) { + completedResultFetchedCallback.run(); + } + throw failure; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 3c782a7ac1..88a883660c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -77,6 +77,36 @@ public class TestThreadPoolRequestReplicator { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties"); } + @Test + public void testFailedRequestsAreCleanedUp() { + withReplicator(replicator -> { + final Set nodeIds = new HashSet<>(); + nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false)); + final URI uri = new URI("http://localhost:8080/processors/1"); + final Entity entity = new ProcessorEntity(); + + // set the user + final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS)); + SecurityContextHolder.getContext().setAuthentication(authentication); + + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); + + // We should get back the same response object + assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); + + assertEquals(HttpMethod.GET, response.getMethod()); + assertEquals(nodeIds, response.getNodesInvolved()); + + assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); + + final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS); + assertEquals(8000, nodeResponse.getNodeId().getApiPort()); + assertEquals(ClientResponse.Status.FORBIDDEN.getStatusCode(), nodeResponse.getStatus()); + + assertNull(replicator.getClusterResponse(response.getRequestIdentifier())); + }, Status.FORBIDDEN, 0L, null); + } + /** * If we replicate a request, whenever we obtain the merged response from * the AsyncClusterResponse object, the response should no longer be