From 21503f6353c33063b7acff5915a94397aad72926 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 17 Aug 2022 09:38:34 -0400 Subject: [PATCH] NIFI-10362: When asynchronous node disconnect is issued, do not send disconnect to node if the node becomes reconnected in the interim. Also, addressed the issue in which a disconnected node acts on a replicated request during the first phase by detect that it's the first phase if configured for cluster, not when only when connected to a cluster. This closes #6308 Signed-off-by: David Handermann --- .../coordination/node/NodeClusterCoordinator.java | 13 +++++++++++++ .../apache/nifi/web/api/ApplicationResource.java | 2 +- .../system/clustering/FlowSynchronizationIT.java | 2 ++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 19678ed1db..ccee9bb74c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -989,6 +989,19 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl Exception lastException = null; for (int i = 0; i < attempts; i++) { + // If the node is restarted, it will attempt to reconnect. In that case, we don't want to disconnect the node + // again. So we instead log the fact that the state has now transitioned to this point and consider the task completed. + final NodeConnectionState currentConnectionState = getConnectionState(nodeId); + if (currentConnectionState == NodeConnectionState.CONNECTING || currentConnectionState == NodeConnectionState.CONNECTED) { + reportEvent(nodeId, Severity.INFO, String.format( + "State of Node %s has now transitioned from DISCONNECTED to %s so will no longer attempt to notify node that it is disconnected.", nodeId, currentConnectionState)); + future.completeExceptionally(new IllegalStateException("Node was marked as disconnected but its state transitioned from DISCONNECTED back to " + currentConnectionState + + " before the node could be notified. This typically indicates that the node was restarted.")); + + return; + } + + // Try to send disconnect notice to the node try { senderListener.disconnect(request); reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index ffde6d9683..8006caa6ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -371,7 +371,7 @@ public abstract class ApplicationResource { */ protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) { final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); - return transactionId != null && isConnectedToCluster(); + return transactionId != null && isClustered(); } /** diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index 1ead90a772..a6da0107ed 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -245,6 +245,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT { getClientUtil().enableControllerService(countService); getClientUtil().enableControllerService(sleepService); getClientUtil().startReportingTask(reportingTask); + getClientUtil().waitForValidProcessor(count.getId()); // Now that service was enabled, wait for processor to become valid. getClientUtil().startProcessGroupComponents(group.getId()); getClientUtil().startProcessor(terminate); getClientUtil().startProcessor(generate); @@ -603,6 +604,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT { getClientUtil().enableControllerService(countService); getClientUtil().enableControllerService(sleepService); getClientUtil().startReportingTask(reportingTask); + getClientUtil().waitForValidProcessor(count.getId()); // Now that service was enabled, wait for processor to become valid. getClientUtil().startProcessGroupComponents(group.getId()); getClientUtil().startProcessor(terminate); getClientUtil().startProcessor(generate);