NIFI-13649 Check Cluster Node Address Status before marking CONNECTED (#9168)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Bryan Bende 2024-08-12 17:27:52 -04:00 committed by GitHub
parent b4266a559c
commit e349b7e7b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 85 additions and 2 deletions

View File

@ -161,6 +161,14 @@ public interface ClusterCoordinator {
*/ */
boolean isBlockedByFirewall(Set<String> nodeIdentities); boolean isBlockedByFirewall(Set<String> nodeIdentities);
/**
* Checks if the API of the given node is reachable.
*
* @param nodeId the node id to check
* @return true if the API is reachable, false otherwise
*/
boolean isApiReachable(NodeIdentifier nodeId);
/** /**
* Reports that some event occurred that is relevant to the cluster * Reports that some event occurred that is relevant to the cluster
* *

View File

@ -288,6 +288,11 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
return; return;
} }
if (!clusterCoordinator.isApiReachable(nodeId)) {
logger.info("Node API Address [{}] not reachable: cluster connection request deferred pending successful network connection", nodeId);
return;
}
// connection complete // connection complete
clusterCoordinator.finishNodeConnection(nodeId); clusterCoordinator.finishNodeConnection(nodeId);
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected."); clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");

View File

@ -73,6 +73,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -97,6 +100,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class); private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
private static final String EVENT_CATEGORY = "Clustering"; private static final String EVENT_CATEGORY = "Clustering";
private static final Duration NODE_API_TIMEOUT = Duration.ofSeconds(10);
private static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/counters/[a-f0-9\\-]{36}"); private static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/counters/[a-f0-9\\-]{36}");
@ -676,6 +680,21 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return true; return true;
} }
@Override
public boolean isApiReachable(final NodeIdentifier nodeId) {
final String apiAddress = nodeId.getApiAddress();
final int apiPort = nodeId.getApiPort();
try {
try (final Socket soc = new Socket()) {
soc.connect(new InetSocketAddress(apiAddress, apiPort), (int) NODE_API_TIMEOUT.toMillis());
}
return true;
} catch (final Exception e) {
logger.debug("Node is not reachable at API address {} and port {}", apiAddress, apiPort, e);
return false;
}
}
@Override @Override
public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) { public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) {
eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event); eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event);

View File

@ -173,6 +173,39 @@ public class TestAbstractHeartbeatMonitor {
assertTrue(requestedToConnect.isEmpty()); assertTrue(requestedToConnect.isEmpty());
} }
@Test
public void testConnectingNodeNotMarkedConnectedWhenHeartbeatReceivedAndApiUnreachable() throws InterruptedException {
final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>());
final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
@Override
public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
super.requestNodeConnect(nodeId);
requestedToConnect.add(nodeId);
}
@Override
public synchronized void finishNodeConnection(final NodeIdentifier nodeId) {
super.finishNodeConnection(nodeId);
connected.add(nodeId);
}
@Override
public boolean isApiReachable(final NodeIdentifier nodeId) {
return false;
}
};
final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
adapter.requestNodeConnect(nodeId); // set state to 'connecting'
requestedToConnect.clear();
monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED));
monitor.waitForProcessed();
assertEquals(0, connected.size());
}
private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) { private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state); final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state);
@ -259,6 +292,11 @@ public class TestAbstractHeartbeatMonitor {
return false; return false;
} }
@Override
public boolean isApiReachable(final NodeIdentifier nodeId) {
return true;
}
@Override @Override
public synchronized void reportEvent(NodeIdentifier nodeId, Severity severity, String event) { public synchronized void reportEvent(NodeIdentifier nodeId, Severity severity, String event) {
events.add(new ReportedEvent(nodeId, severity, event)); events.add(new ReportedEvent(nodeId, severity, event));

View File

@ -210,6 +210,7 @@ public class BootstrapListener {
case CLUSTER_STATUS: case CLUSTER_STATUS:
logger.info("Received CLUSTER_STATUS request from Bootstrap"); logger.info("Received CLUSTER_STATUS request from Bootstrap");
final String clusterStatus = getClusterStatus(); final String clusterStatus = getClusterStatus();
logger.debug("Responding to CLUSTER_STATUS request from Bootstrap with {}", clusterStatus);
sendAnswer(socket.getOutputStream(), clusterStatus); sendAnswer(socket.getOutputStream(), clusterStatus);
break; break;
case DECOMMISSION: case DECOMMISSION:

View File

@ -33,6 +33,8 @@ import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
@ -398,8 +400,8 @@ public abstract class ApplicationResource {
ensureFlowInitialized(); ensureFlowInitialized();
// If not connected to the cluster, we do not replicate // If not connected or not connecting to the cluster, we do not replicate
if (!isConnectedToCluster()) { if (!isConnectedToCluster() && !isConnectingToCluster()) {
return false; return false;
} }
@ -1054,6 +1056,16 @@ public abstract class ApplicationResource {
return isClustered() && clusterCoordinator.isConnected(); return isClustered() && clusterCoordinator.isConnected();
} }
boolean isConnectingToCluster() {
if (!isClustered()) {
return false;
}
final NodeIdentifier nodeId = clusterCoordinator.getLocalNodeIdentifier();
final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeId);
return nodeConnectionStatus != null && nodeConnectionStatus.getState() == NodeConnectionState.CONNECTING;
}
boolean isClustered() { boolean isClustered() {
return clusterCoordinator != null; return clusterCoordinator != null;
} }