From b73ba7f8d4f6319881c26b8faad121ceb12041ab Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 11 Nov 2016 11:11:36 -0500 Subject: [PATCH] NIFI-2999: When Cluster Coordinator changes, purge any old heartbeats so that we don't disconnect a node due to very old heartbeats This closes #1210 --- .../cluster/coordination/heartbeat/HeartbeatMonitor.java | 5 +++++ .../heartbeat/ClusterProtocolHeartbeatMonitor.java | 6 ++++++ .../heartbeat/TestAbstractHeartbeatMonitor.java | 5 +++++ .../java/org/apache/nifi/controller/FlowController.java | 7 +++++++ 4 files changed, 23 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java index 988ba755e1..6a0937d99c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java @@ -53,6 +53,11 @@ public interface HeartbeatMonitor { */ void removeHeartbeat(NodeIdentifier nodeId); + /** + * Clears all heartbeats that have been received + */ + void purgeHeartbeats(); + /** * @return the address that heartbeats should be sent to when this node is elected coordinator. */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index 6a8e575c6c..3e98368fdf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -132,6 +132,12 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im heartbeatMessages.remove(nodeId); } + @Override + public synchronized void purgeHeartbeats() { + logger.debug("Purging old heartbeats"); + heartbeatMessages.clear(); + } + @Override public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { switch (msg.getType()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 690cda8dea..66102311bc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -369,6 +369,11 @@ public class TestAbstractHeartbeatMonitor { heartbeats.remove(nodeId); } + @Override + public synchronized void purgeHeartbeats() { + heartbeats.clear(); + } + void waitForProcessed() throws InterruptedException { synchronized (mutex) { mutex.wait(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ec8b288a4f..b790526ca3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3350,6 +3350,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderElection() { LOG.info("This node elected Active Cluster Coordinator"); + + // Purge any heartbeats that we already have. If we don't do this, we can have a scenario where we receive heartbeats + // from a node, and then another node becomes Cluster Coordinator. As a result, we stop receiving heartbeats. Now that + // we are again the Cluster Coordinator, we will detect that there are old heartbeat messages and start disconnecting + // nodes due to a lack of heartbeat. By purging the heartbeats here, we remove any old heartbeat messages so that this + // does not happen. + FlowController.this.heartbeatMonitor.purgeHeartbeats(); } }, participantId); }