diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index a27dac332e..f6ad8454f7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -161,6 +161,14 @@ public interface ClusterCoordinator { */ boolean isBlockedByFirewall(Set nodeIdentities); + /** + * Checks if the API of the given node is reachable. + * + * @param nodeId the node id to check + * @return true if the API is reachable, false otherwise + */ + boolean isApiReachable(NodeIdentifier nodeId); + /** * Reports that some event occurred that is relevant to the cluster * diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index c700453a1b..d154e07e45 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -288,6 +288,11 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { return; } + if (!clusterCoordinator.isApiReachable(nodeId)) { + logger.info("Node API Address [{}] not reachable: cluster connection request deferred pending successful network connection", nodeId); + return; + } + // connection complete clusterCoordinator.finishNodeConnection(nodeId); clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected."); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 18651939fb..d74b5a1a60 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -73,6 +73,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -97,6 +100,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class); private static final String EVENT_CATEGORY = "Clustering"; + private static final Duration NODE_API_TIMEOUT = Duration.ofSeconds(10); private static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/counters/[a-f0-9\\-]{36}"); @@ -676,6 +680,21 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return true; } + @Override + public boolean isApiReachable(final NodeIdentifier nodeId) { + final String apiAddress = nodeId.getApiAddress(); + final int apiPort = nodeId.getApiPort(); + try { + try (final Socket soc = new Socket()) { + soc.connect(new InetSocketAddress(apiAddress, apiPort), (int) NODE_API_TIMEOUT.toMillis()); + } + return true; + } catch (final Exception e) { + logger.debug("Node is not reachable at API address {} and port {}", apiAddress, apiPort, e); + return false; + } + } + @Override public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) { eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index f90e039129..45cbf391ae 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -173,6 +173,39 @@ public class TestAbstractHeartbeatMonitor { assertTrue(requestedToConnect.isEmpty()); } + @Test + public void testConnectingNodeNotMarkedConnectedWhenHeartbeatReceivedAndApiUnreachable() throws InterruptedException { + final Set requestedToConnect = Collections.synchronizedSet(new HashSet<>()); + final Set connected = Collections.synchronizedSet(new HashSet<>()); + final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() { + @Override + public synchronized void requestNodeConnect(final NodeIdentifier nodeId) { + super.requestNodeConnect(nodeId); + requestedToConnect.add(nodeId); + } + + @Override + public synchronized void finishNodeConnection(final NodeIdentifier nodeId) { + super.finishNodeConnection(nodeId); + connected.add(nodeId); + } + + @Override + public boolean isApiReachable(final NodeIdentifier nodeId) { + return false; + } + }; + + final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter); + + adapter.requestNodeConnect(nodeId); // set state to 'connecting' + requestedToConnect.clear(); + + monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED)); + monitor.waitForProcessed(); + + assertEquals(0, connected.size()); + } private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) { final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state); @@ -259,6 +292,11 @@ public class TestAbstractHeartbeatMonitor { return false; } + @Override + public boolean isApiReachable(final NodeIdentifier nodeId) { + return true; + } + @Override public synchronized void reportEvent(NodeIdentifier nodeId, Severity severity, String event) { events.add(new ReportedEvent(nodeId, severity, event)); diff --git a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java index 55a5207d6d..d6537603d7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java +++ b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -210,6 +210,7 @@ public class BootstrapListener { case CLUSTER_STATUS: logger.info("Received CLUSTER_STATUS request from Bootstrap"); final String clusterStatus = getClusterStatus(); + logger.debug("Responding to CLUSTER_STATUS request from Bootstrap with {}", clusterStatus); sendAnswer(socket.getOutputStream(), clusterStatus); break; case DECOMMISSION: diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 6f22cd7454..dbc539014e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -33,6 +33,8 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; @@ -398,8 +400,8 @@ public abstract class ApplicationResource { ensureFlowInitialized(); - // If not connected to the cluster, we do not replicate - if (!isConnectedToCluster()) { + // If not connected or not connecting to the cluster, we do not replicate + if (!isConnectedToCluster() && !isConnectingToCluster()) { return false; } @@ -1054,6 +1056,16 @@ public abstract class ApplicationResource { return isClustered() && clusterCoordinator.isConnected(); } + boolean isConnectingToCluster() { + if (!isClustered()) { + return false; + } + + final NodeIdentifier nodeId = clusterCoordinator.getLocalNodeIdentifier(); + final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeId); + return nodeConnectionStatus != null && nodeConnectionStatus.getState() == NodeConnectionState.CONNECTING; + } + boolean isClustered() { return clusterCoordinator != null; }