NIFI-2406 This closes #820. Addressed regression introduced in NIFI-2406 where the cluster does not recognize a new Cluster Coordinator when the coordinator is shutdown

This commit is contained in:
Mark Payne 2016-08-09 11:11:08 -04:00 committed by joewitt
parent 18f4150015
commit 42df02f014
6 changed files with 41 additions and 27 deletions

View File

@ -164,7 +164,6 @@ public class NiFiProperties extends Properties {
public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address"; public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address";
public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port"; public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port";
public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads"; public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads";
public static final String REQUEST_REPLICATION_CLAIM_TIMEOUT = "nifi.cluster.request.replication.claim.timeout";
public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout";
public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout";
public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file"; public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file";

View File

@ -42,7 +42,6 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
protected final ClusterCoordinator clusterCoordinator; protected final ClusterCoordinator clusterCoordinator;
protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
protected volatile long latestHeartbeatTime;
private volatile ScheduledFuture<?> future; private volatile ScheduledFuture<?> future;
private volatile boolean stopped = true; private volatile boolean stopped = true;
@ -57,8 +56,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
@Override @Override
public synchronized final void start() { public synchronized final void start() {
if (!stopped) { if (!stopped) {
logger.debug("Attempted to start Heartbeat Monitor but it is already started"); logger.info("Attempted to start Heartbeat Monitor but it is already started. Stopping heartbeat monitor and re-starting it.");
return; stop();
} }
stopped = false; stopped = false;
@ -125,6 +124,15 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
* Visible for testing. * Visible for testing.
*/ */
protected synchronized void monitorHeartbeats() { protected synchronized void monitorHeartbeats() {
if (!clusterCoordinator.isActiveClusterCoordinator()) {
// Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so
// on a very large delay. So before we kick the node out of the cluster, we want to first check what the
// ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate
// destination for heartbeats.
logger.debug("It appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect.");
return;
}
final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = getLatestHeartbeats(); final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = getLatestHeartbeats();
if (latestHeartbeats == null || latestHeartbeats.isEmpty()) { if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
logger.debug("Received no new heartbeats. Will not disconnect any nodes due to lack of heartbeat"); logger.debug("Received no new heartbeats. Will not disconnect any nodes due to lack of heartbeat");
@ -153,16 +161,6 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
if (heartbeat.getTimestamp() < threshold) { if (heartbeat.getTimestamp() < threshold) {
final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp()); final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp());
if (!clusterCoordinator.isActiveClusterCoordinator()) {
// Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so
// on a very large delay. So before we kick the node out of the cluster, we want to first check what the
// ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate
// destination for heartbeats.
logger.debug("Have not received a heartbeat from node in " + secondsSinceLastHeartbeat +
" seconds but it appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect.");
return;
}
clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT, clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT,
"Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds"); "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds");

View File

@ -65,6 +65,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
private final String clusterNodesPath; private final String clusterNodesPath;
private volatile Map<String, NodeIdentifier> clusterNodeIds = new HashMap<>(); private volatile Map<String, NodeIdentifier> clusterNodeIds = new HashMap<>();
private volatile CuratorFramework curatorClient;
private final String heartbeatAddress; private final String heartbeatAddress;
private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>(); private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>();
@ -112,7 +113,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
@Override @Override
public void onStart() { public void onStart() {
final RetryPolicy retryPolicy = new RetryForever(5000); final RetryPolicy retryPolicy = new RetryForever(5000);
final CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy); zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy);
curatorClient.start(); curatorClient.start();
@ -136,14 +137,13 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
try { try {
try { try {
curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8)); curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
curatorClient.close();
logger.info("Successfully published Cluster Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress); logger.info("Successfully published Cluster Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress);
return; return;
} catch (final NoNodeException nne) { } catch (final NoNodeException nne) {
// ensure that parents are created, using a wide-open ACL because the parents contain no data // ensure that parents are created, using a wide-open ACL because the parents contain no data
// and the path is not in any way sensitive. // and the path is not in any way sensitive.
try { try {
curatorClient.create().creatingParentContainersIfNeeded().forPath(path); curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (final NodeExistsException nee) { } catch (final NodeExistsException nee) {
// This is okay. Node already exists. // This is okay. Node already exists.
} }
@ -174,6 +174,9 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
@Override @Override
public void onStop() { public void onStop() {
if (curatorClient != null) {
curatorClient.close();
}
} }
@Override @Override

View File

@ -75,6 +75,11 @@ public class TestAbstractHeartbeatMonitor {
public synchronized void requestNodeConnect(final NodeIdentifier nodeId, String userDn) { public synchronized void requestNodeConnect(final NodeIdentifier nodeId, String userDn) {
requestedToConnect.add(nodeId); requestedToConnect.add(nodeId);
} }
@Override
public boolean isActiveClusterCoordinator() {
return true;
}
}; };
final TestFriendlyHeartbeatMonitor monitor = createMonitor(coordinator); final TestFriendlyHeartbeatMonitor monitor = createMonitor(coordinator);
@ -141,6 +146,11 @@ public class TestAbstractHeartbeatMonitor {
super.finishNodeConnection(nodeId); super.finishNodeConnection(nodeId);
connected.add(nodeId); connected.add(nodeId);
} }
@Override
public boolean isActiveClusterCoordinator() {
return true;
}
}; };
final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter); final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);

View File

@ -605,10 +605,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager = null; leaderElectionManager = null;
heartbeater = null; heartbeater = null;
} }
if (heartbeatMonitor != null) {
heartbeatMonitor.start();
}
} }
@Override @Override
@ -3316,7 +3312,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override @Override
public synchronized void onLeaderRelinquish() { public synchronized void onLeaderRelinquish() {
LOG.info("This node is no longer the elected Active Cluster Coordinator"); LOG.info("This node is no longer the elected Active Cluster Coordinator");
heartbeatMonitor.stop();
// We do not want to stop the heartbeat monitor. This is because even though ZooKeeper offers guarantees
// that watchers will see changes on a ZNode in the order they happened, there does not seem to be any
// guarantee that Curator will notify us that our leadership was gained or loss in the order that it happened.
// As a result, if nodes connect/disconnect from cluster quickly, we could invoke stop() then start() or
// start() then stop() in the wrong order, which can cause the cluster to behavior improperly. As a result, we simply
// call start() when we become the leader, and this will ensure that initialization is handled. The heartbeat monitor
// then will check the zookeeper znode to check if it is the cluster coordinator before kicking any nodes out of the
// cluster.
if (clusterCoordinator != null) { if (clusterCoordinator != null) {
clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR); clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR);
@ -3326,7 +3330,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override @Override
public synchronized void onLeaderElection() { public synchronized void onLeaderElection() {
LOG.info("This node elected Active Cluster Coordinator"); LOG.info("This node elected Active Cluster Coordinator");
heartbeatMonitor.start(); heartbeatMonitor.start(); // ensure heartbeat monitor is started
if (clusterCoordinator != null) { if (clusterCoordinator != null) {
clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR); clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR);
@ -3885,7 +3889,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
heartbeatLogger.debug(usae.getMessage()); heartbeatLogger.debug(usae.getMessage());
} }
} catch (final Throwable ex) { } catch (final Throwable ex) {
heartbeatLogger.warn("Failed to send heartbeat due to: " + ex, ex); heartbeatLogger.warn("Failed to send heartbeat due to: " + ex);
if (heartbeatLogger.isDebugEnabled()) {
heartbeatLogger.warn("", ex);
}
} }
} }
} }

View File

@ -171,9 +171,6 @@ nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout} nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}
nifi.cluster.firewall.file=${nifi.cluster.firewall.file} nifi.cluster.firewall.file=${nifi.cluster.firewall.file}
# How long a request should be allowed to hold a 'lock' on a component. #
nifi.cluster.request.replication.claim.timeout=${nifi.cluster.request.replication.claim.timeout}
# zookeeper properties, used for cluster management # # zookeeper properties, used for cluster management #
nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string} nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}
nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout} nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}