NIFI-2999: When Cluster Coordinator changes, purge any old heartbeats so that we don't disconnect a node due to very old heartbeats

This closes #1210
This commit is contained in:
Mark Payne 2016-11-11 11:11:36 -05:00 committed by Oleg Zhurakousky
parent b9ef0fb847
commit b73ba7f8d4
4 changed files with 23 additions and 0 deletions

View File

@ -53,6 +53,11 @@ public interface HeartbeatMonitor {
*/
void removeHeartbeat(NodeIdentifier nodeId);
/**
* Clears all heartbeats that have been received
*/
void purgeHeartbeats();
/**
* @return the address that heartbeats should be sent to when this node is elected coordinator.
*/

View File

@ -132,6 +132,12 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
heartbeatMessages.remove(nodeId);
}
@Override
public synchronized void purgeHeartbeats() {
logger.debug("Purging old heartbeats");
heartbeatMessages.clear();
}
@Override
public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
switch (msg.getType()) {

View File

@ -369,6 +369,11 @@ public class TestAbstractHeartbeatMonitor {
heartbeats.remove(nodeId);
}
@Override
public synchronized void purgeHeartbeats() {
heartbeats.clear();
}
void waitForProcessed() throws InterruptedException {
synchronized (mutex) {
mutex.wait();

View File

@ -3350,6 +3350,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public synchronized void onLeaderElection() {
LOG.info("This node elected Active Cluster Coordinator");
// Purge any heartbeats that we already have. If we don't do this, we can have a scenario where we receive heartbeats
// from a node, and then another node becomes Cluster Coordinator. As a result, we stop receiving heartbeats. Now that
// we are again the Cluster Coordinator, we will detect that there are old heartbeat messages and start disconnecting
// nodes due to a lack of heartbeat. By purging the heartbeats here, we remove any old heartbeat messages so that this
// does not happen.
FlowController.this.heartbeatMonitor.purgeHeartbeats();
}
}, participantId);
}