diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index a873c44cfc..90115d5d79 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -164,7 +164,6 @@ public class NiFiProperties extends Properties { public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address"; public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port"; public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads"; - public static final String REQUEST_REPLICATION_CLAIM_TIMEOUT = "nifi.cluster.request.replication.claim.timeout"; public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 116ef3ebbc..c216ed32c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -42,7 +42,6 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { protected final ClusterCoordinator clusterCoordinator; protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); - protected volatile long latestHeartbeatTime; private volatile ScheduledFuture future; private volatile boolean stopped = true; @@ -57,8 +56,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { @Override public synchronized final void start() { if (!stopped) { - logger.debug("Attempted to start Heartbeat Monitor but it is already started"); - return; + logger.info("Attempted to start Heartbeat Monitor but it is already started. Stopping heartbeat monitor and re-starting it."); + stop(); } stopped = false; @@ -125,6 +124,15 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { * Visible for testing. */ protected synchronized void monitorHeartbeats() { + if (!clusterCoordinator.isActiveClusterCoordinator()) { + // Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so + // on a very large delay. So before we kick the node out of the cluster, we want to first check what the + // ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate + // destination for heartbeats. + logger.debug("It appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect."); + return; + } + final Map latestHeartbeats = getLatestHeartbeats(); if (latestHeartbeats == null || latestHeartbeats.isEmpty()) { logger.debug("Received no new heartbeats. Will not disconnect any nodes due to lack of heartbeat"); @@ -153,16 +161,6 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { if (heartbeat.getTimestamp() < threshold) { final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp()); - if (!clusterCoordinator.isActiveClusterCoordinator()) { - // Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so - // on a very large delay. So before we kick the node out of the cluster, we want to first check what the - // ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate - // destination for heartbeats. - logger.debug("Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + - " seconds but it appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect."); - return; - } - clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT, "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index f206a077d8..09dccad75a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -65,6 +65,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im private final String clusterNodesPath; private volatile Map clusterNodeIds = new HashMap<>(); + private volatile CuratorFramework curatorClient; private final String heartbeatAddress; private final ConcurrentMap heartbeatMessages = new ConcurrentHashMap<>(); @@ -112,7 +113,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im @Override public void onStart() { final RetryPolicy retryPolicy = new RetryForever(5000); - final CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), + curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy); curatorClient.start(); @@ -136,14 +137,13 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im try { try { curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8)); - curatorClient.close(); logger.info("Successfully published Cluster Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress); return; } catch (final NoNodeException nne) { // ensure that parents are created, using a wide-open ACL because the parents contain no data // and the path is not in any way sensitive. try { - curatorClient.create().creatingParentContainersIfNeeded().forPath(path); + curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); } catch (final NodeExistsException nee) { // This is okay. Node already exists. } @@ -174,6 +174,9 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im @Override public void onStop() { + if (curatorClient != null) { + curatorClient.close(); + } } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 9ef0a14c96..0f1ce20d03 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -75,6 +75,11 @@ public class TestAbstractHeartbeatMonitor { public synchronized void requestNodeConnect(final NodeIdentifier nodeId, String userDn) { requestedToConnect.add(nodeId); } + + @Override + public boolean isActiveClusterCoordinator() { + return true; + } }; final TestFriendlyHeartbeatMonitor monitor = createMonitor(coordinator); @@ -141,6 +146,11 @@ public class TestAbstractHeartbeatMonitor { super.finishNodeConnection(nodeId); connected.add(nodeId); } + + @Override + public boolean isActiveClusterCoordinator() { + return true; + } }; final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 47620290a6..fdb6f58d0c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -605,10 +605,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager = null; heartbeater = null; } - - if (heartbeatMonitor != null) { - heartbeatMonitor.start(); - } } @Override @@ -3316,7 +3312,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderRelinquish() { LOG.info("This node is no longer the elected Active Cluster Coordinator"); - heartbeatMonitor.stop(); + + // We do not want to stop the heartbeat monitor. This is because even though ZooKeeper offers guarantees + // that watchers will see changes on a ZNode in the order they happened, there does not seem to be any + // guarantee that Curator will notify us that our leadership was gained or loss in the order that it happened. + // As a result, if nodes connect/disconnect from cluster quickly, we could invoke stop() then start() or + // start() then stop() in the wrong order, which can cause the cluster to behavior improperly. As a result, we simply + // call start() when we become the leader, and this will ensure that initialization is handled. The heartbeat monitor + // then will check the zookeeper znode to check if it is the cluster coordinator before kicking any nodes out of the + // cluster. if (clusterCoordinator != null) { clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR); @@ -3326,7 +3330,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderElection() { LOG.info("This node elected Active Cluster Coordinator"); - heartbeatMonitor.start(); + heartbeatMonitor.start(); // ensure heartbeat monitor is started if (clusterCoordinator != null) { clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR); @@ -3885,7 +3889,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R heartbeatLogger.debug(usae.getMessage()); } } catch (final Throwable ex) { - heartbeatLogger.warn("Failed to send heartbeat due to: " + ex, ex); + heartbeatLogger.warn("Failed to send heartbeat due to: " + ex); + if (heartbeatLogger.isDebugEnabled()) { + heartbeatLogger.warn("", ex); + } } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index e2d3385a69..a65b26570a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -171,9 +171,6 @@ nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout} nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout} nifi.cluster.firewall.file=${nifi.cluster.firewall.file} -# How long a request should be allowed to hold a 'lock' on a component. # -nifi.cluster.request.replication.claim.timeout=${nifi.cluster.request.replication.claim.timeout} - # zookeeper properties, used for cluster management # nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string} nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}