NIFI-3933:

- When monitoring heartbeats use the connected nodes as the basis for the check. This addresses the case when a node is terminated and no corresponding heartbeats exist.
This commit is contained in:
Matt Gilman 2017-05-22 15:28:30 -04:00 committed by Mark Payne
parent a1b07b1e9c
commit d33c4c72d4
4 changed files with 119 additions and 41 deletions

View File

@ -58,6 +58,13 @@ public interface HeartbeatMonitor {
*/
void purgeHeartbeats();
/**
* Returns when the heartbeats were purged last.
*
* @return when the heartbeats were purged last
*/
long getPurgeTimestamp();
/**
* @return the address that heartbeats should be sent to when this node is elected coordinator.
*/

View File

@ -154,19 +154,38 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
// Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval)
final long maxMillis = heartbeatIntervalMillis * 8;
final long threshold = System.currentTimeMillis() - maxMillis;
for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
if (heartbeat.getTimestamp() < threshold) {
final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp());
final long currentTimestamp = System.currentTimeMillis();
final long threshold = currentTimestamp - maxMillis;
clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT,
"Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds");
// consider all connected nodes
for (final NodeIdentifier nodeIdentifier : clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED)) {
final NodeHeartbeat heartbeat = latestHeartbeats.get(nodeIdentifier);
try {
removeHeartbeat(heartbeat.getNodeIdentifier());
} catch (final Exception e) {
logger.warn("Failed to remove heartbeat for {} due to {}", heartbeat.getNodeIdentifier(), e.toString());
logger.warn("", e);
// consider the most recent heartbeat for this node
if (heartbeat == null) {
final long purgeTimestamp = getPurgeTimestamp();
// if there is no heartbeat for this node, see if we purged the heartbeats beyond the allowed heartbeat threshold
if (purgeTimestamp < threshold) {
final long secondsSinceLastPurge = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - purgeTimestamp);
clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, DisconnectionCode.LACK_OF_HEARTBEAT,
"Have not received a heartbeat from node in " + secondsSinceLastPurge + " seconds");
}
} else {
// see if the heartbeat occurred before the allowed heartbeat threshold
if (heartbeat.getTimestamp() < threshold) {
final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - heartbeat.getTimestamp());
clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, DisconnectionCode.LACK_OF_HEARTBEAT,
"Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds");
try {
removeHeartbeat(nodeIdentifier);
} catch (final Exception e) {
logger.warn("Failed to remove heartbeat for {} due to {}", nodeIdentifier, e.toString());
logger.warn("", e);
}
}
}
}

View File

@ -16,18 +16,6 @@
*/
package org.apache.nifi.cluster.coordination.heartbeat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@ -38,16 +26,29 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Uses Apache ZooKeeper to advertise the address to send heartbeats to, and
* then relies on the NiFi Cluster Protocol to receive heartbeat messages from
@ -60,6 +61,8 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
private final String heartbeatAddress;
private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>();
private volatile long purgeTimestamp = System.currentTimeMillis();
protected static final Unmarshaller nodeIdentifierUnmarshaller;
static {
@ -136,6 +139,12 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
public synchronized void purgeHeartbeats() {
logger.debug("Purging old heartbeats");
heartbeatMessages.clear();
purgeTimestamp = System.currentTimeMillis();
}
@Override
public synchronized long getPurgeTimestamp() {
return purgeTimestamp;
}
@Override

View File

@ -17,20 +17,6 @@
package org.apache.nifi.cluster.coordination.heartbeat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
@ -46,6 +32,22 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestAbstractHeartbeatMonitor {
private NodeIdentifier nodeId;
private TestFriendlyHeartbeatMonitor monitor;
@ -131,6 +133,38 @@ public class TestAbstractHeartbeatMonitor {
assertTrue(requestedToConnect.isEmpty());
}
@Test
public void testDisconnectionOfTerminatedNodeDueToLackOfHeartbeat() throws Exception {
final NodeIdentifier nodeId1 = nodeId;
final NodeIdentifier nodeId2 = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 6666, "localhost", null, null, false);
final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter();
final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
// set state to connecting
adapter.requestNodeConnect(nodeId1);
adapter.requestNodeConnect(nodeId2);
// ensure each node is connected
assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTING).containsAll(Arrays.asList(nodeId1, nodeId2)));
// let each node heartbeat in
monitor.addHeartbeat(createHeartbeat(nodeId1, NodeConnectionState.CONNECTED));
monitor.addHeartbeat(createHeartbeat(nodeId2, NodeConnectionState.CONNECTED));
monitor.waitForProcessed();
// ensure each node is now connected
assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTED).containsAll(Arrays.asList(nodeId1, nodeId2)));
// purge the heartbeats, simulate nodeId2 termination by only having a nodeId1 heartbeat be present
monitor.purgeHeartbeats();
monitor.addHeartbeat(createHeartbeat(nodeId1, NodeConnectionState.CONNECTED));
monitor.waitForProcessed();
// the node that did not heartbeat in should be disconnected
assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTED).contains(nodeId1));
assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.DISCONNECTED).contains(nodeId2));
}
@Test
public void testConnectingNodeMarkedConnectedWhenHeartbeatReceived() throws InterruptedException {
@ -339,8 +373,9 @@ public class TestAbstractHeartbeatMonitor {
private static class TestFriendlyHeartbeatMonitor extends AbstractHeartbeatMonitor {
private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new HashMap<>();
private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new ConcurrentHashMap<>();
private final Object mutex = new Object();
private long purgeTimestamp = System.currentTimeMillis();
public TestFriendlyHeartbeatMonitor(ClusterCoordinator clusterCoordinator, NiFiProperties nifiProperties) {
super(clusterCoordinator, nifiProperties);
@ -348,7 +383,7 @@ public class TestAbstractHeartbeatMonitor {
@Override
protected synchronized Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
return heartbeats;
return Collections.unmodifiableMap(heartbeats);
}
@Override
@ -372,6 +407,14 @@ public class TestAbstractHeartbeatMonitor {
@Override
public synchronized void purgeHeartbeats() {
heartbeats.clear();
purgeTimestamp = System.currentTimeMillis();
}
@Override
public long getPurgeTimestamp() {
// deduct 90 because it is greater than 10 ms (interval defined in this test) * 8 (defined by the threshold in the heartbeat monitor)...
// it will ensure that the amount of time since the last heartbeat is outside the allowed threshold leading to the disconnection of the node
return purgeTimestamp - 90;
}
void waitForProcessed() throws InterruptedException {