diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java index b49f57ca79..986231efd4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java @@ -18,6 +18,7 @@ package org.apache.nifi.cluster.protocol; import java.util.Set; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -61,4 +62,11 @@ public interface ClusterCoordinationProtocolSender { * @param msg the message that indicates which node's status changed and what it changed to */ void notifyNodeStatusChange(Set nodesToNotify, NodeStatusChangeMessage msg); + + /** + * Sends a request to the given hostname and port to request its connection status + * + * @return the connection status returned from the node at the given hostname & port + */ + NodeConnectionStatus requestNodeConnectionStatus(String hostname, int port); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java index e97712a726..ae3a0e5057 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Set; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; @@ -105,7 +106,12 @@ public class ClusterCoordinationProtocolSenderListener implements ClusterCoordin } @Override - public void notifyNodeStatusChange(Set nodesToNotify, NodeStatusChangeMessage msg) { + public void notifyNodeStatusChange(final Set nodesToNotify, final NodeStatusChangeMessage msg) { sender.notifyNodeStatusChange(nodesToNotify, msg); } + + @Override + public NodeConnectionStatus requestNodeConnectionStatus(final String hostname, final int port) { + return sender.requestNodeConnectionStatus(hostname, port); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java index c6a4883804..8958988799 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java @@ -26,13 +26,19 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.DisconnectMessage; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.io.socket.ServerSocketConfiguration; import org.apache.nifi.io.socket.SocketListener; @@ -173,8 +179,10 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi } stopWatch.stop(); + final NodeIdentifier nodeId = getNodeIdentifier(request); + final String from = nodeId == null ? hostname : nodeId.toString(); logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {} millis", - requestId, request.getType(), receivedMessage.length, hostname, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + requestId, request.getType(), receivedMessage.length, from, stopWatch.getDuration(TimeUnit.MILLISECONDS)); } catch (final IOException | ProtocolException e) { logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); @@ -185,6 +193,27 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi } } + private NodeIdentifier getNodeIdentifier(final ProtocolMessage message) { + if (message == null) { + return null; + } + + switch (message.getType()) { + case CONNECTION_REQUEST: + return ((ConnectionRequestMessage) message).getConnectionRequest().getProposedNodeIdentifier(); + case HEARTBEAT: + return ((HeartbeatMessage) message).getHeartbeat().getNodeIdentifier(); + case DISCONNECTION_REQUEST: + return ((DisconnectMessage) message).getNodeId(); + case FLOW_REQUEST: + return ((FlowRequestMessage) message).getNodeId(); + case RECONNECTION_REQUEST: + return ((ReconnectionRequestMessage) message).getNodeId(); + default: + return null; + } + } + private String getRequestorDN(Socket socket) { try { return CertificateUtils.extractPeerDNFromSSLSocket(socket); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java index 4b58886ea6..e8099610db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -28,6 +29,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolContext; @@ -35,6 +37,8 @@ import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; +import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; +import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; @@ -45,6 +49,8 @@ import org.apache.nifi.io.socket.SocketUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A protocol sender for sending protocol messages from the cluster manager to @@ -57,6 +63,7 @@ import org.apache.nifi.util.NiFiProperties; * */ public class StandardClusterCoordinationProtocolSender implements ClusterCoordinationProtocolSender { + private static final Logger logger = LoggerFactory.getLogger(StandardClusterCoordinationProtocolSender.class); private final ProtocolContext protocolContext; private final SocketConfiguration socketConfiguration; @@ -182,6 +189,44 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin } } + @Override + public NodeConnectionStatus requestNodeConnectionStatus(final String hostname, final int port) { + Objects.requireNonNull(hostname); + + final NodeConnectionStatusRequestMessage msg = new NodeConnectionStatusRequestMessage(); + + final byte[] msgBytes; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + final ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, baos); + msgBytes = baos.toByteArray(); + } catch (final IOException e) { + throw new ProtocolException("Failed to marshal NodeIdentifierRequestMessage", e); + } + + try (final Socket socket = createSocket(hostname, port, true)) { + // marshal message to output stream + socket.getOutputStream().write(msgBytes); + + final ProtocolMessage response; + try { + // unmarshall response and return + final ProtocolMessageUnmarshaller unmarshaller = protocolContext.createUnmarshaller(); + response = unmarshaller.unmarshal(socket.getInputStream()); + } catch (final IOException ioe) { + throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); + } + + if (MessageType.NODE_CONNECTION_STATUS_RESPONSE == response.getType()) { + return ((NodeConnectionStatusResponseMessage) response).getNodeConnectionStatus(); + } else { + throw new ProtocolException("Expected message type '" + MessageType.NODE_CONNECTION_STATUS_RESPONSE + "' but found '" + response.getType() + "'"); + } + } catch (final IOException ioe) { + throw new ProtocolException("Failed to request Node Identifer from " + hostname + ":" + port, ioe); + } + } + @Override public void notifyNodeStatusChange(final Set nodesToNotify, final NodeStatusChangeMessage msg) { if (nodesToNotify.isEmpty()) { @@ -222,6 +267,8 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin } catch (final IOException ioe) { throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe); } + + logger.debug("Notified {} of status change {}", nodeId, msg); } }); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java index 0093c3e741..21d0bdab57 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java @@ -37,13 +37,15 @@ public class NodeConnectionStatusAdapter extends XmlAdapter currentStatus.getUpdateIdentifier()) { - updated = replaceNodeStatus(nodeId, currentStatus, proposedStatus); - } else { - updated = true; - } + updated = replaceNodeStatus(nodeId, currentStatus, proposedStatus); } } } @@ -325,6 +321,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl case UNKNOWN: severity = Severity.ERROR; break; + case LACK_OF_HEARTBEAT: + severity = Severity.WARNING; + break; default: severity = Severity.INFO; break; @@ -569,7 +568,31 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl .orElse(null); if (electedNodeId == null && warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress); + logger.debug("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}," + + "but there is no node with this address. Will attempt to communicate with node to determine its information", electedNodeAddress); + + try { + final NodeConnectionStatus connectionStatus = senderListener.requestNodeConnectionStatus(electedNodeHostname, electedNodePort); + logger.debug("Received NodeConnectionStatus {}", connectionStatus); + + if (connectionStatus == null) { + return null; + } + + final NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), connectionStatus); + if (existingStatus == null) { + return connectionStatus.getNodeIdentifier(); + } else { + return existingStatus.getNodeIdentifier(); + } + } catch (final Exception e) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address. " + + "Attempted to determine the node's information but failed to retrieve its information due to {}", electedNodeAddress, e.toString()); + + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } } return electedNodeId; @@ -639,7 +662,15 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still // believe that this node is connected to the cluster. final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)); + if (notifyAllNodes) { + logger.debug("Notifying all nodes that status changed from {} to {}", currentStatus, status); + } else { + logger.debug("Notifying cluster coordinator that node status changed from {} to {}", currentStatus, status); + } + notifyOthersOfNodeStatusChange(status, notifyAllNodes); + } else { + logger.debug("Not notifying other nodes that status changed because previous state of {} is same as new state of {}", currentState, status.getState()); } } @@ -766,11 +797,20 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl case NODE_STATUS_CHANGE: handleNodeStatusChange((NodeStatusChangeMessage) protocolMessage); return null; + case NODE_CONNECTION_STATUS_REQUEST: + return handleNodeConnectionStatusRequest(); default: throw new ProtocolException("Cannot handle Protocol Message " + protocolMessage + " because it is not of the correct type"); } } + private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() { + final NodeConnectionStatus connectionStatus = nodeStatuses.get(getLocalNodeIdentifier()); + final NodeConnectionStatusResponseMessage msg = new NodeConnectionStatusResponseMessage(); + msg.setNodeConnectionStatus(connectionStatus); + return msg; + } + private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) { final StringBuilder sb = new StringBuilder(); @@ -828,33 +868,28 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl boolean updated = false; while (!updated) { final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId()); - if (oldStatus == null || updatedStatus.getUpdateIdentifier() >= oldStatus.getUpdateIdentifier()) { - // Either remove the value from the map or update the map depending on the connection state - if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { - updated = nodeStatuses.remove(nodeId, oldStatus); - } else { - updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus); - } - if (updated) { - logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); - - final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); - final String summary = summarizeStatusChange(oldStatus, status); - if (!StringUtils.isEmpty(summary)) { - addNodeEvent(nodeId, summary); - } - - // Update our counter so that we are in-sync with the cluster on the - // most up-to-date version of the NodeConnectionStatus' Update Identifier. - // We do this so that we can accurately compare status updates that are generated - // locally against those generated from other nodes in the cluster. - NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier()); - } + // Either remove the value from the map or update the map depending on the connection state + if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { + updated = nodeStatuses.remove(nodeId, oldStatus); } else { - updated = true; - logger.info("Received Node Status update that indicates that {} should change to {} but disregarding because the current state of {} is newer", - nodeId, updatedStatus, oldStatus); + updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus); + } + + if (updated) { + logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); + + final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); + final String summary = summarizeStatusChange(oldStatus, status); + if (!StringUtils.isEmpty(summary)) { + addNodeEvent(nodeId, summary); + } + + // Update our counter so that we are in-sync with the cluster on the + // most up-to-date version of the NodeConnectionStatus' Update Identifier. + // We do this so that we can accurately compare status updates that are generated + // locally against those generated from other nodes in the cluster. + NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier()); } } @@ -952,7 +987,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public boolean canHandle(final ProtocolMessage msg) { - return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.NODE_STATUS_CHANGE == msg.getType(); + return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.NODE_STATUS_CHANGE == msg.getType() + || MessageType.NODE_CONNECTION_STATUS_REQUEST == msg.getType(); } private boolean isMutableRequest(final String method) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 6f0ad0f07e..319bd84e00 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -396,21 +395,6 @@ public class TestNodeClusterCoordinator { // Ensure that no status change message was send Thread.sleep(1000); assertTrue(nodeStatuses.isEmpty()); - - // Status should not have changed because our status id is too small. - NodeConnectionStatus curStatus = coordinator.getConnectionStatus(nodeId1); - assertEquals(NodeConnectionState.CONNECTED, curStatus.getState()); - - // Verify that resetMap updates only the newer statuses - final NodeConnectionStatus node2Disconnecting = new NodeConnectionStatus(nodeId2, NodeConnectionState.DISCONNECTING, Collections.emptySet()); - final Map resetMap = new HashMap<>(); - resetMap.put(nodeId1, oldStatus); - resetMap.put(nodeId2, node2Disconnecting); - coordinator.resetNodeStatuses(resetMap); - - curStatus = coordinator.getConnectionStatus(nodeId1); - assertEquals(NodeConnectionState.CONNECTED, curStatus.getState()); - assertEquals(NodeConnectionState.DISCONNECTING, coordinator.getConnectionStatus(nodeId2).getState()); } @Test(timeout = 5000) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 6b952f369e..47620290a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -605,6 +605,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager = null; heartbeater = null; } + + if (heartbeatMonitor != null) { + heartbeatMonitor.start(); + } } @Override @@ -617,10 +621,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return ResourceFactory.getControllerResource(); } - public HeartbeatMonitor getHeartbeatMonitor() { - return heartbeatMonitor; - } - private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) { final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION); if (implementationClassName == null) { @@ -1297,6 +1297,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager.stop(); } + if (heartbeatMonitor != null) { + heartbeatMonitor.stop(); + } + if (kill) { this.timerDrivenEngineRef.get().shutdownNow(); this.eventDrivenEngineRef.get().shutdownNow(); @@ -3311,6 +3315,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override public synchronized void onLeaderRelinquish() { + LOG.info("This node is no longer the elected Active Cluster Coordinator"); heartbeatMonitor.stop(); if (clusterCoordinator != null) { @@ -3320,6 +3325,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderElection() { + LOG.info("This node elected Active Cluster Coordinator"); heartbeatMonitor.start(); if (clusterCoordinator != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index 94c83df6ab..9d076c022c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -221,7 +221,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { logger.info("{} This node has been elected Leader for Role '{}'", this, roleName); if (listener != null) { - listener.onLeaderElection(); + try { + listener.onLeaderElection(); + } catch (final Exception e) { + logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e); + logger.error("", e); + leader = false; + return; + } } // Curator API states that we lose the leadership election when we return from this method, @@ -241,7 +248,12 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { logger.info("{} This node is no longer leader for role '{}'", this, roleName); if (listener != null) { - listener.onLeaderRelinquish(); + try { + listener.onLeaderRelinquish(); + } catch (final Exception e) { + logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e); + logger.error("", e); + } } } }