NIFI-2406: Ensure that hearbeat monitor continues to run while instance is running. This way if a node sends heartbeat to this node as elected coordinator changes, we notify the node accordingly. Handle Exceptions more gracefully in leader election code. Tweaked some handling of how nodes reconnect to the cluster to ensure more stability with cluster

Signed-off-by: Yolanda M. Davis <ymdavis@apache.org>

This closes #729
This commit is contained in:
Mark Payne 2016-07-27 11:55:02 -04:00 committed by Yolanda M. Davis
parent d094130a26
commit c1c052af71
15 changed files with 328 additions and 71 deletions

View File

@ -18,6 +18,7 @@ package org.apache.nifi.cluster.protocol;
import java.util.Set;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@ -61,4 +62,11 @@ public interface ClusterCoordinationProtocolSender {
* @param msg the message that indicates which node's status changed and what it changed to
*/
void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, NodeStatusChangeMessage msg);
/**
* Sends a request to the given hostname and port to request its connection status
*
* @return the connection status returned from the node at the given hostname & port
*/
NodeConnectionStatus requestNodeConnectionStatus(String hostname, int port);
}

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
@ -105,7 +106,12 @@ public class ClusterCoordinationProtocolSenderListener implements ClusterCoordin
}
@Override
public void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, NodeStatusChangeMessage msg) {
public void notifyNodeStatusChange(final Set<NodeIdentifier> nodesToNotify, final NodeStatusChangeMessage msg) {
sender.notifyNodeStatusChange(nodesToNotify, msg);
}
@Override
public NodeConnectionStatus requestNodeConnectionStatus(final String hostname, final int port) {
return sender.requestNodeConnectionStatus(hostname, port);
}
}

View File

@ -26,13 +26,19 @@ import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketListener;
@ -173,8 +179,10 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
}
stopWatch.stop();
final NodeIdentifier nodeId = getNodeIdentifier(request);
final String from = nodeId == null ? hostname : nodeId.toString();
logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {} millis",
requestId, request.getType(), receivedMessage.length, hostname, stopWatch.getDuration(TimeUnit.MILLISECONDS));
requestId, request.getType(), receivedMessage.length, from, stopWatch.getDuration(TimeUnit.MILLISECONDS));
} catch (final IOException | ProtocolException e) {
logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
@ -185,6 +193,27 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
}
}
private NodeIdentifier getNodeIdentifier(final ProtocolMessage message) {
if (message == null) {
return null;
}
switch (message.getType()) {
case CONNECTION_REQUEST:
return ((ConnectionRequestMessage) message).getConnectionRequest().getProposedNodeIdentifier();
case HEARTBEAT:
return ((HeartbeatMessage) message).getHeartbeat().getNodeIdentifier();
case DISCONNECTION_REQUEST:
return ((DisconnectMessage) message).getNodeId();
case FLOW_REQUEST:
return ((FlowRequestMessage) message).getNodeId();
case RECONNECTION_REQUEST:
return ((ReconnectionRequestMessage) message).getNodeId();
default:
return null;
}
}
private String getRequestorDN(Socket socket) {
try {
return CertificateUtils.extractPeerDNFromSSLSocket(socket);

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -28,6 +29,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
@ -35,6 +37,8 @@ import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@ -45,6 +49,8 @@ import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A protocol sender for sending protocol messages from the cluster manager to
@ -57,6 +63,7 @@ import org.apache.nifi.util.NiFiProperties;
*
*/
public class StandardClusterCoordinationProtocolSender implements ClusterCoordinationProtocolSender {
private static final Logger logger = LoggerFactory.getLogger(StandardClusterCoordinationProtocolSender.class);
private final ProtocolContext<ProtocolMessage> protocolContext;
private final SocketConfiguration socketConfiguration;
@ -182,6 +189,44 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin
}
}
@Override
public NodeConnectionStatus requestNodeConnectionStatus(final String hostname, final int port) {
Objects.requireNonNull(hostname);
final NodeConnectionStatusRequestMessage msg = new NodeConnectionStatusRequestMessage();
final byte[] msgBytes;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
marshaller.marshal(msg, baos);
msgBytes = baos.toByteArray();
} catch (final IOException e) {
throw new ProtocolException("Failed to marshal NodeIdentifierRequestMessage", e);
}
try (final Socket socket = createSocket(hostname, port, true)) {
// marshal message to output stream
socket.getOutputStream().write(msgBytes);
final ProtocolMessage response;
try {
// unmarshall response and return
final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
response = unmarshaller.unmarshal(socket.getInputStream());
} catch (final IOException ioe) {
throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
}
if (MessageType.NODE_CONNECTION_STATUS_RESPONSE == response.getType()) {
return ((NodeConnectionStatusResponseMessage) response).getNodeConnectionStatus();
} else {
throw new ProtocolException("Expected message type '" + MessageType.NODE_CONNECTION_STATUS_RESPONSE + "' but found '" + response.getType() + "'");
}
} catch (final IOException ioe) {
throw new ProtocolException("Failed to request Node Identifer from " + hostname + ":" + port, ioe);
}
}
@Override
public void notifyNodeStatusChange(final Set<NodeIdentifier> nodesToNotify, final NodeStatusChangeMessage msg) {
if (nodesToNotify.isEmpty()) {
@ -222,6 +267,8 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin
} catch (final IOException ioe) {
throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe);
}
logger.debug("Notified {} of status change {}", nodeId, msg);
}
});
}

View File

@ -37,13 +37,15 @@ public class NodeConnectionStatusAdapter extends XmlAdapter<AdaptedNodeConnectio
@Override
public AdaptedNodeConnectionStatus marshal(final NodeConnectionStatus toAdapt) throws Exception {
final AdaptedNodeConnectionStatus adapted = new AdaptedNodeConnectionStatus();
adapted.setUpdateId(toAdapt.getUpdateIdentifier());
adapted.setNodeId(toAdapt.getNodeIdentifier());
adapted.setConnectionRequestTime(toAdapt.getConnectionRequestTime());
adapted.setDisconnectCode(toAdapt.getDisconnectCode());
adapted.setDisconnectReason(toAdapt.getDisconnectReason());
adapted.setState(toAdapt.getState());
adapted.setRoles(toAdapt.getRoles());
if (toAdapt != null) {
adapted.setUpdateId(toAdapt.getUpdateIdentifier());
adapted.setNodeId(toAdapt.getNodeIdentifier());
adapted.setConnectionRequestTime(toAdapt.getConnectionRequestTime());
adapted.setDisconnectCode(toAdapt.getDisconnectCode());
adapted.setDisconnectReason(toAdapt.getDisconnectReason());
adapted.setState(toAdapt.getState());
adapted.setRoles(toAdapt.getRoles());
}
return adapted;
}
}

View File

@ -25,6 +25,8 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.PingMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@ -86,4 +88,13 @@ public class ObjectFactory {
public NodeStatusChangeMessage createNodeStatusChangeMessage() {
return new NodeStatusChangeMessage();
}
public NodeConnectionStatusRequestMessage createNodeConnectionStatusRequestMessage() {
return new NodeConnectionStatusRequestMessage();
}
public NodeConnectionStatusResponseMessage createNodeConnectionStatusResponsetMessage() {
return new NodeConnectionStatusResponseMessage();
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.message;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "nodeConnectionStatusRequestMessage")
public class NodeConnectionStatusRequestMessage extends ProtocolMessage {
@Override
public MessageType getType() {
return MessageType.NODE_CONNECTION_STATUS_REQUEST;
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.message;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@XmlRootElement(name = "nodeIdentifierResponseMessage")
public class NodeConnectionStatusResponseMessage extends ProtocolMessage {
private NodeConnectionStatus nodeConnectionStatus;
@Override
public MessageType getType() {
return MessageType.NODE_CONNECTION_STATUS_RESPONSE;
}
public NodeConnectionStatus getNodeConnectionStatus() {
return nodeConnectionStatus;
}
public void setNodeConnectionStatus(final NodeConnectionStatus connectionStatus) {
this.nodeConnectionStatus = connectionStatus;
}
}

View File

@ -32,6 +32,8 @@ public abstract class ProtocolMessage {
RECONNECTION_RESPONSE,
SERVICE_BROADCAST,
HEARTBEAT,
NODE_CONNECTION_STATUS_REQUEST,
NODE_CONNECTION_STATUS_RESPONSE,
NODE_STATUS_CHANGE;
}

View File

@ -35,6 +35,8 @@ import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.web.Revision;
import org.junit.Test;
@ -65,4 +67,33 @@ public class TestJaxbProtocolUtils {
assertEquals(revisions, unmarshalledMsg.getConnectionResponse().getComponentRevisions());
}
@Test
public void testRoundTripConnectionStatusRequest() throws JAXBException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final NodeConnectionStatusRequestMessage msg = new NodeConnectionStatusRequestMessage();
JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
assertTrue(unmarshalled instanceof NodeConnectionStatusRequestMessage);
}
@Test
public void testRoundTripConnectionStatusResponse() throws JAXBException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final NodeConnectionStatusResponseMessage msg = new NodeConnectionStatusResponseMessage();
final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, true);
final NodeConnectionStatus nodeStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
msg.setNodeConnectionStatus(nodeStatus);
JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
assertTrue(unmarshalled instanceof NodeConnectionStatusResponseMessage);
final NodeConnectionStatusResponseMessage unmarshalledMsg = (NodeConnectionStatusResponseMessage) unmarshalled;
final NodeConnectionStatus unmarshalledStatus = unmarshalledMsg.getNodeConnectionStatus();
assertEquals(nodeStatus, unmarshalledStatus);
}
}

View File

@ -57,7 +57,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
@Override
public synchronized final void start() {
if (!stopped) {
throw new IllegalStateException("Heartbeat Monitor cannot be started because it is already started");
logger.debug("Attempted to start Heartbeat Monitor but it is already started");
return;
}
stopped = false;
@ -150,8 +151,20 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
final long threshold = System.currentTimeMillis() - maxMillis;
for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
if (heartbeat.getTimestamp() < threshold) {
clusterCoordinator.requestNodeDisconnect(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT,
"Latest heartbeat from Node has expired");
final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp());
if (!clusterCoordinator.isActiveClusterCoordinator()) {
// Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so
// on a very large delay. So before we kick the node out of the cluster, we want to first check what the
// ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate
// destination for heartbeats.
logger.debug("Have not received a heartbeat from node in " + secondsSinceLastHeartbeat +
" seconds but it appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect.");
return;
}
clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT,
"Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds");
try {
removeHeartbeat(heartbeat.getNodeIdentifier());
@ -206,20 +219,20 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
switch (disconnectionCode) {
case LACK_OF_HEARTBEAT:
case UNABLE_TO_COMMUNICATE:
case NODE_SHUTDOWN:
case NOT_YET_CONNECTED:
case STARTUP_FAILURE: {
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously "
+ "disconnected due to " + disconnectionCode + ". Issuing reconnection request.");
clusterCoordinator.requestNodeConnect(nodeId, null);
break;
}
default: {
// disconnected nodes should not heartbeat, so we need to issue a disconnection request.
logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request.");
clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE,
DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString());
clusterCoordinator.requestNodeDisconnect(nodeId, disconnectionCode, connectionStatus.getDisconnectReason());
removeHeartbeat(nodeId);
break;
}
}

View File

@ -61,6 +61,7 @@ import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderLi
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@ -185,7 +186,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}).forPath(coordinatorPath);
final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
logger.info("Determined that Cluster Coordinator is located at {}", address);
return address;
} catch (Exception e) {
throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
@ -206,12 +207,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
boolean updated = false;
while (!updated) {
final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeId);
if (currentStatus == null || proposedStatus.getUpdateIdentifier() > currentStatus.getUpdateIdentifier()) {
updated = replaceNodeStatus(nodeId, currentStatus, proposedStatus);
} else {
updated = true;
}
updated = replaceNodeStatus(nodeId, currentStatus, proposedStatus);
}
}
}
@ -325,6 +321,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
case UNKNOWN:
severity = Severity.ERROR;
break;
case LACK_OF_HEARTBEAT:
severity = Severity.WARNING;
break;
default:
severity = Severity.INFO;
break;
@ -569,7 +568,31 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
.orElse(null);
if (electedNodeId == null && warnOnError) {
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress);
logger.debug("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {},"
+ "but there is no node with this address. Will attempt to communicate with node to determine its information", electedNodeAddress);
try {
final NodeConnectionStatus connectionStatus = senderListener.requestNodeConnectionStatus(electedNodeHostname, electedNodePort);
logger.debug("Received NodeConnectionStatus {}", connectionStatus);
if (connectionStatus == null) {
return null;
}
final NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), connectionStatus);
if (existingStatus == null) {
return connectionStatus.getNodeIdentifier();
} else {
return existingStatus.getNodeIdentifier();
}
} catch (final Exception e) {
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address. "
+ "Attempted to determine the node's information but failed to retrieve its information due to {}", electedNodeAddress, e.toString());
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
}
}
return electedNodeId;
@ -639,7 +662,15 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still
// believe that this node is connected to the cluster.
final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR));
if (notifyAllNodes) {
logger.debug("Notifying all nodes that status changed from {} to {}", currentStatus, status);
} else {
logger.debug("Notifying cluster coordinator that node status changed from {} to {}", currentStatus, status);
}
notifyOthersOfNodeStatusChange(status, notifyAllNodes);
} else {
logger.debug("Not notifying other nodes that status changed because previous state of {} is same as new state of {}", currentState, status.getState());
}
}
@ -766,11 +797,20 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
case NODE_STATUS_CHANGE:
handleNodeStatusChange((NodeStatusChangeMessage) protocolMessage);
return null;
case NODE_CONNECTION_STATUS_REQUEST:
return handleNodeConnectionStatusRequest();
default:
throw new ProtocolException("Cannot handle Protocol Message " + protocolMessage + " because it is not of the correct type");
}
}
private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() {
final NodeConnectionStatus connectionStatus = nodeStatuses.get(getLocalNodeIdentifier());
final NodeConnectionStatusResponseMessage msg = new NodeConnectionStatusResponseMessage();
msg.setNodeConnectionStatus(connectionStatus);
return msg;
}
private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) {
final StringBuilder sb = new StringBuilder();
@ -828,33 +868,28 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
boolean updated = false;
while (!updated) {
final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId());
if (oldStatus == null || updatedStatus.getUpdateIdentifier() >= oldStatus.getUpdateIdentifier()) {
// Either remove the value from the map or update the map depending on the connection state
if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
updated = nodeStatuses.remove(nodeId, oldStatus);
} else {
updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus);
}
if (updated) {
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
final String summary = summarizeStatusChange(oldStatus, status);
if (!StringUtils.isEmpty(summary)) {
addNodeEvent(nodeId, summary);
}
// Update our counter so that we are in-sync with the cluster on the
// most up-to-date version of the NodeConnectionStatus' Update Identifier.
// We do this so that we can accurately compare status updates that are generated
// locally against those generated from other nodes in the cluster.
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
}
// Either remove the value from the map or update the map depending on the connection state
if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
updated = nodeStatuses.remove(nodeId, oldStatus);
} else {
updated = true;
logger.info("Received Node Status update that indicates that {} should change to {} but disregarding because the current state of {} is newer",
nodeId, updatedStatus, oldStatus);
updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus);
}
if (updated) {
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
final String summary = summarizeStatusChange(oldStatus, status);
if (!StringUtils.isEmpty(summary)) {
addNodeEvent(nodeId, summary);
}
// Update our counter so that we are in-sync with the cluster on the
// most up-to-date version of the NodeConnectionStatus' Update Identifier.
// We do this so that we can accurately compare status updates that are generated
// locally against those generated from other nodes in the cluster.
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
}
}
@ -952,7 +987,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public boolean canHandle(final ProtocolMessage msg) {
return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.NODE_STATUS_CHANGE == msg.getType();
return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.NODE_STATUS_CHANGE == msg.getType()
|| MessageType.NODE_CONNECTION_STATUS_REQUEST == msg.getType();
}
private boolean isMutableRequest(final String method) {

View File

@ -27,7 +27,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -396,21 +395,6 @@ public class TestNodeClusterCoordinator {
// Ensure that no status change message was send
Thread.sleep(1000);
assertTrue(nodeStatuses.isEmpty());
// Status should not have changed because our status id is too small.
NodeConnectionStatus curStatus = coordinator.getConnectionStatus(nodeId1);
assertEquals(NodeConnectionState.CONNECTED, curStatus.getState());
// Verify that resetMap updates only the newer statuses
final NodeConnectionStatus node2Disconnecting = new NodeConnectionStatus(nodeId2, NodeConnectionState.DISCONNECTING, Collections.emptySet());
final Map<NodeIdentifier, NodeConnectionStatus> resetMap = new HashMap<>();
resetMap.put(nodeId1, oldStatus);
resetMap.put(nodeId2, node2Disconnecting);
coordinator.resetNodeStatuses(resetMap);
curStatus = coordinator.getConnectionStatus(nodeId1);
assertEquals(NodeConnectionState.CONNECTED, curStatus.getState());
assertEquals(NodeConnectionState.DISCONNECTING, coordinator.getConnectionStatus(nodeId2).getState());
}
@Test(timeout = 5000)

View File

@ -605,6 +605,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager = null;
heartbeater = null;
}
if (heartbeatMonitor != null) {
heartbeatMonitor.start();
}
}
@Override
@ -617,10 +621,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return ResourceFactory.getControllerResource();
}
public HeartbeatMonitor getHeartbeatMonitor() {
return heartbeatMonitor;
}
private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
@ -1297,6 +1297,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager.stop();
}
if (heartbeatMonitor != null) {
heartbeatMonitor.stop();
}
if (kill) {
this.timerDrivenEngineRef.get().shutdownNow();
this.eventDrivenEngineRef.get().shutdownNow();
@ -3311,6 +3315,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() {
@Override
public synchronized void onLeaderRelinquish() {
LOG.info("This node is no longer the elected Active Cluster Coordinator");
heartbeatMonitor.stop();
if (clusterCoordinator != null) {
@ -3320,6 +3325,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public synchronized void onLeaderElection() {
LOG.info("This node elected Active Cluster Coordinator");
heartbeatMonitor.start();
if (clusterCoordinator != null) {

View File

@ -221,7 +221,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
logger.info("{} This node has been elected Leader for Role '{}'", this, roleName);
if (listener != null) {
listener.onLeaderElection();
try {
listener.onLeaderElection();
} catch (final Exception e) {
logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e);
logger.error("", e);
leader = false;
return;
}
}
// Curator API states that we lose the leadership election when we return from this method,
@ -241,7 +248,12 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
logger.info("{} This node is no longer leader for role '{}'", this, roleName);
if (listener != null) {
listener.onLeaderRelinquish();
try {
listener.onLeaderRelinquish();
} catch (final Exception e) {
logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e);
logger.error("", e);
}
}
}
}