From c1c052af71d4cfc8a40974e1482d5b0f3c78c902 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 27 Jul 2016 11:55:02 -0400 Subject: [PATCH] NIFI-2406: Ensure that hearbeat monitor continues to run while instance is running. This way if a node sends heartbeat to this node as elected coordinator changes, we notify the node accordingly. Handle Exceptions more gracefully in leader election code. Tweaked some handling of how nodes reconnect to the cluster to ensure more stability with cluster Signed-off-by: Yolanda M. Davis This closes #729 --- .../ClusterCoordinationProtocolSender.java | 8 ++ ...terCoordinationProtocolSenderListener.java | 8 +- .../protocol/impl/SocketProtocolListener.java | 31 +++++- ...dardClusterCoordinationProtocolSender.java | 47 ++++++++ .../message/NodeConnectionStatusAdapter.java | 16 +-- .../protocol/jaxb/message/ObjectFactory.java | 11 ++ .../NodeConnectionStatusRequestMessage.java | 30 +++++ .../NodeConnectionStatusResponseMessage.java | 40 +++++++ .../protocol/message/ProtocolMessage.java | 2 + .../jaxb/message/TestJaxbProtocolUtils.java | 31 ++++++ .../heartbeat/AbstractHeartbeatMonitor.java | 25 ++++- .../node/NodeClusterCoordinator.java | 104 ++++++++++++------ .../node/TestNodeClusterCoordinator.java | 16 --- .../nifi/controller/FlowController.java | 14 ++- .../CuratorLeaderElectionManager.java | 16 ++- 15 files changed, 328 insertions(+), 71 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusRequestMessage.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusResponseMessage.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java index b49f57ca79..986231efd4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java @@ -18,6 +18,7 @@ package org.apache.nifi.cluster.protocol; import java.util.Set; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -61,4 +62,11 @@ public interface ClusterCoordinationProtocolSender { * @param msg the message that indicates which node's status changed and what it changed to */ void notifyNodeStatusChange(Set nodesToNotify, NodeStatusChangeMessage msg); + + /** + * Sends a request to the given hostname and port to request its connection status + * + * @return the connection status returned from the node at the given hostname & port + */ + NodeConnectionStatus requestNodeConnectionStatus(String hostname, int port); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java index e97712a726..ae3a0e5057 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Set; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; @@ -105,7 +106,12 @@ public class ClusterCoordinationProtocolSenderListener implements ClusterCoordin } @Override - public void notifyNodeStatusChange(Set nodesToNotify, NodeStatusChangeMessage msg) { + public void notifyNodeStatusChange(final Set nodesToNotify, final NodeStatusChangeMessage msg) { sender.notifyNodeStatusChange(nodesToNotify, msg); } + + @Override + public NodeConnectionStatus requestNodeConnectionStatus(final String hostname, final int port) { + return sender.requestNodeConnectionStatus(hostname, port); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java index c6a4883804..8958988799 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java @@ -26,13 +26,19 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.DisconnectMessage; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.io.socket.ServerSocketConfiguration; import org.apache.nifi.io.socket.SocketListener; @@ -173,8 +179,10 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi } stopWatch.stop(); + final NodeIdentifier nodeId = getNodeIdentifier(request); + final String from = nodeId == null ? hostname : nodeId.toString(); logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {} millis", - requestId, request.getType(), receivedMessage.length, hostname, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + requestId, request.getType(), receivedMessage.length, from, stopWatch.getDuration(TimeUnit.MILLISECONDS)); } catch (final IOException | ProtocolException e) { logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); @@ -185,6 +193,27 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi } } + private NodeIdentifier getNodeIdentifier(final ProtocolMessage message) { + if (message == null) { + return null; + } + + switch (message.getType()) { + case CONNECTION_REQUEST: + return ((ConnectionRequestMessage) message).getConnectionRequest().getProposedNodeIdentifier(); + case HEARTBEAT: + return ((HeartbeatMessage) message).getHeartbeat().getNodeIdentifier(); + case DISCONNECTION_REQUEST: + return ((DisconnectMessage) message).getNodeId(); + case FLOW_REQUEST: + return ((FlowRequestMessage) message).getNodeId(); + case RECONNECTION_REQUEST: + return ((ReconnectionRequestMessage) message).getNodeId(); + default: + return null; + } + } + private String getRequestorDN(Socket socket) { try { return CertificateUtils.extractPeerDNFromSSLSocket(socket); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java index 4b58886ea6..e8099610db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -28,6 +29,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolContext; @@ -35,6 +37,8 @@ import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; +import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; +import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; @@ -45,6 +49,8 @@ import org.apache.nifi.io.socket.SocketUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A protocol sender for sending protocol messages from the cluster manager to @@ -57,6 +63,7 @@ import org.apache.nifi.util.NiFiProperties; * */ public class StandardClusterCoordinationProtocolSender implements ClusterCoordinationProtocolSender { + private static final Logger logger = LoggerFactory.getLogger(StandardClusterCoordinationProtocolSender.class); private final ProtocolContext protocolContext; private final SocketConfiguration socketConfiguration; @@ -182,6 +189,44 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin } } + @Override + public NodeConnectionStatus requestNodeConnectionStatus(final String hostname, final int port) { + Objects.requireNonNull(hostname); + + final NodeConnectionStatusRequestMessage msg = new NodeConnectionStatusRequestMessage(); + + final byte[] msgBytes; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + final ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, baos); + msgBytes = baos.toByteArray(); + } catch (final IOException e) { + throw new ProtocolException("Failed to marshal NodeIdentifierRequestMessage", e); + } + + try (final Socket socket = createSocket(hostname, port, true)) { + // marshal message to output stream + socket.getOutputStream().write(msgBytes); + + final ProtocolMessage response; + try { + // unmarshall response and return + final ProtocolMessageUnmarshaller unmarshaller = protocolContext.createUnmarshaller(); + response = unmarshaller.unmarshal(socket.getInputStream()); + } catch (final IOException ioe) { + throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); + } + + if (MessageType.NODE_CONNECTION_STATUS_RESPONSE == response.getType()) { + return ((NodeConnectionStatusResponseMessage) response).getNodeConnectionStatus(); + } else { + throw new ProtocolException("Expected message type '" + MessageType.NODE_CONNECTION_STATUS_RESPONSE + "' but found '" + response.getType() + "'"); + } + } catch (final IOException ioe) { + throw new ProtocolException("Failed to request Node Identifer from " + hostname + ":" + port, ioe); + } + } + @Override public void notifyNodeStatusChange(final Set nodesToNotify, final NodeStatusChangeMessage msg) { if (nodesToNotify.isEmpty()) { @@ -222,6 +267,8 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin } catch (final IOException ioe) { throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe); } + + logger.debug("Notified {} of status change {}", nodeId, msg); } }); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java index 0093c3e741..21d0bdab57 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java @@ -37,13 +37,15 @@ public class NodeConnectionStatusAdapter extends XmlAdapter currentStatus.getUpdateIdentifier()) { - updated = replaceNodeStatus(nodeId, currentStatus, proposedStatus); - } else { - updated = true; - } + updated = replaceNodeStatus(nodeId, currentStatus, proposedStatus); } } } @@ -325,6 +321,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl case UNKNOWN: severity = Severity.ERROR; break; + case LACK_OF_HEARTBEAT: + severity = Severity.WARNING; + break; default: severity = Severity.INFO; break; @@ -569,7 +568,31 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl .orElse(null); if (electedNodeId == null && warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress); + logger.debug("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}," + + "but there is no node with this address. Will attempt to communicate with node to determine its information", electedNodeAddress); + + try { + final NodeConnectionStatus connectionStatus = senderListener.requestNodeConnectionStatus(electedNodeHostname, electedNodePort); + logger.debug("Received NodeConnectionStatus {}", connectionStatus); + + if (connectionStatus == null) { + return null; + } + + final NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), connectionStatus); + if (existingStatus == null) { + return connectionStatus.getNodeIdentifier(); + } else { + return existingStatus.getNodeIdentifier(); + } + } catch (final Exception e) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address. " + + "Attempted to determine the node's information but failed to retrieve its information due to {}", electedNodeAddress, e.toString()); + + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } } return electedNodeId; @@ -639,7 +662,15 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still // believe that this node is connected to the cluster. final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)); + if (notifyAllNodes) { + logger.debug("Notifying all nodes that status changed from {} to {}", currentStatus, status); + } else { + logger.debug("Notifying cluster coordinator that node status changed from {} to {}", currentStatus, status); + } + notifyOthersOfNodeStatusChange(status, notifyAllNodes); + } else { + logger.debug("Not notifying other nodes that status changed because previous state of {} is same as new state of {}", currentState, status.getState()); } } @@ -766,11 +797,20 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl case NODE_STATUS_CHANGE: handleNodeStatusChange((NodeStatusChangeMessage) protocolMessage); return null; + case NODE_CONNECTION_STATUS_REQUEST: + return handleNodeConnectionStatusRequest(); default: throw new ProtocolException("Cannot handle Protocol Message " + protocolMessage + " because it is not of the correct type"); } } + private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() { + final NodeConnectionStatus connectionStatus = nodeStatuses.get(getLocalNodeIdentifier()); + final NodeConnectionStatusResponseMessage msg = new NodeConnectionStatusResponseMessage(); + msg.setNodeConnectionStatus(connectionStatus); + return msg; + } + private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) { final StringBuilder sb = new StringBuilder(); @@ -828,33 +868,28 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl boolean updated = false; while (!updated) { final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId()); - if (oldStatus == null || updatedStatus.getUpdateIdentifier() >= oldStatus.getUpdateIdentifier()) { - // Either remove the value from the map or update the map depending on the connection state - if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { - updated = nodeStatuses.remove(nodeId, oldStatus); - } else { - updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus); - } - if (updated) { - logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); - - final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); - final String summary = summarizeStatusChange(oldStatus, status); - if (!StringUtils.isEmpty(summary)) { - addNodeEvent(nodeId, summary); - } - - // Update our counter so that we are in-sync with the cluster on the - // most up-to-date version of the NodeConnectionStatus' Update Identifier. - // We do this so that we can accurately compare status updates that are generated - // locally against those generated from other nodes in the cluster. - NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier()); - } + // Either remove the value from the map or update the map depending on the connection state + if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { + updated = nodeStatuses.remove(nodeId, oldStatus); } else { - updated = true; - logger.info("Received Node Status update that indicates that {} should change to {} but disregarding because the current state of {} is newer", - nodeId, updatedStatus, oldStatus); + updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus); + } + + if (updated) { + logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); + + final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); + final String summary = summarizeStatusChange(oldStatus, status); + if (!StringUtils.isEmpty(summary)) { + addNodeEvent(nodeId, summary); + } + + // Update our counter so that we are in-sync with the cluster on the + // most up-to-date version of the NodeConnectionStatus' Update Identifier. + // We do this so that we can accurately compare status updates that are generated + // locally against those generated from other nodes in the cluster. + NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier()); } } @@ -952,7 +987,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public boolean canHandle(final ProtocolMessage msg) { - return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.NODE_STATUS_CHANGE == msg.getType(); + return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.NODE_STATUS_CHANGE == msg.getType() + || MessageType.NODE_CONNECTION_STATUS_REQUEST == msg.getType(); } private boolean isMutableRequest(final String method) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 6f0ad0f07e..319bd84e00 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -396,21 +395,6 @@ public class TestNodeClusterCoordinator { // Ensure that no status change message was send Thread.sleep(1000); assertTrue(nodeStatuses.isEmpty()); - - // Status should not have changed because our status id is too small. - NodeConnectionStatus curStatus = coordinator.getConnectionStatus(nodeId1); - assertEquals(NodeConnectionState.CONNECTED, curStatus.getState()); - - // Verify that resetMap updates only the newer statuses - final NodeConnectionStatus node2Disconnecting = new NodeConnectionStatus(nodeId2, NodeConnectionState.DISCONNECTING, Collections.emptySet()); - final Map resetMap = new HashMap<>(); - resetMap.put(nodeId1, oldStatus); - resetMap.put(nodeId2, node2Disconnecting); - coordinator.resetNodeStatuses(resetMap); - - curStatus = coordinator.getConnectionStatus(nodeId1); - assertEquals(NodeConnectionState.CONNECTED, curStatus.getState()); - assertEquals(NodeConnectionState.DISCONNECTING, coordinator.getConnectionStatus(nodeId2).getState()); } @Test(timeout = 5000) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 6b952f369e..47620290a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -605,6 +605,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager = null; heartbeater = null; } + + if (heartbeatMonitor != null) { + heartbeatMonitor.start(); + } } @Override @@ -617,10 +621,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return ResourceFactory.getControllerResource(); } - public HeartbeatMonitor getHeartbeatMonitor() { - return heartbeatMonitor; - } - private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) { final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION); if (implementationClassName == null) { @@ -1297,6 +1297,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager.stop(); } + if (heartbeatMonitor != null) { + heartbeatMonitor.stop(); + } + if (kill) { this.timerDrivenEngineRef.get().shutdownNow(); this.eventDrivenEngineRef.get().shutdownNow(); @@ -3311,6 +3315,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override public synchronized void onLeaderRelinquish() { + LOG.info("This node is no longer the elected Active Cluster Coordinator"); heartbeatMonitor.stop(); if (clusterCoordinator != null) { @@ -3320,6 +3325,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderElection() { + LOG.info("This node elected Active Cluster Coordinator"); heartbeatMonitor.start(); if (clusterCoordinator != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index 94c83df6ab..9d076c022c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -221,7 +221,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { logger.info("{} This node has been elected Leader for Role '{}'", this, roleName); if (listener != null) { - listener.onLeaderElection(); + try { + listener.onLeaderElection(); + } catch (final Exception e) { + logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e); + logger.error("", e); + leader = false; + return; + } } // Curator API states that we lose the leadership election when we return from this method, @@ -241,7 +248,12 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { logger.info("{} This node is no longer leader for role '{}'", this, roleName); if (listener != null) { - listener.onLeaderRelinquish(); + try { + listener.onLeaderRelinquish(); + } catch (final Exception e) { + logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e); + logger.error("", e); + } } } }