NIFI-7389 Makes Missable heartbeat counts configurable

This closes #4236.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Sushil Kumar 2020-04-24 14:50:01 -07:00 committed by Andy LoPresto
parent 6c2701abef
commit 996688b419
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
7 changed files with 15 additions and 4 deletions

View File

@ -203,6 +203,7 @@ public abstract class NiFiProperties {
// cluster common properties // cluster common properties
public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval"; public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval";
public static final String CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX = "nifi.cluster.protocol.heartbeat.missable.max";
public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure"; public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure";
// cluster node properties // cluster node properties
@ -305,6 +306,7 @@ public abstract class NiFiProperties {
// cluster common defaults // cluster common defaults
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec"; public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
public static final int DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX = 8;
public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms"; public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms";
public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3; public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3;
public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec"; public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec";

View File

@ -1629,8 +1629,8 @@ It just depends on the resources available and how the Administrator decides to
*Heartbeats*: The nodes communicate their health and status to the currently elected Cluster Coordinator via "heartbeats", *Heartbeats*: The nodes communicate their health and status to the currently elected Cluster Coordinator via "heartbeats",
which let the Coordinator know they are still connected to the cluster and working properly. By default, the nodes emit which let the Coordinator know they are still connected to the cluster and working properly. By default, the nodes emit
heartbeats every 5 seconds, and if the Cluster Coordinator does not receive a heartbeat from a node within 40 seconds, it heartbeats every 5 seconds, and if the Cluster Coordinator does not receive a heartbeat from a node within 40 seconds (= 5 seconds * 8), it
disconnects the node due to "lack of heartbeat". The 5-second setting is configurable in the _nifi.properties_ file (see disconnects the node due to "lack of heartbeat". The 5-second and 8 times settings are configurable in the _nifi.properties_ file (see
the <<cluster_common_properties>> section for more information). The reason that the Cluster Coordinator the <<cluster_common_properties>> section for more information). The reason that the Cluster Coordinator
disconnects the node is because the Coordinator needs to ensure that every node in the cluster is in sync, and if a node disconnects the node is because the Coordinator needs to ensure that every node in the cluster is in sync, and if a node
is not heard from regularly, the Coordinator cannot be sure it is still in sync with the rest of the cluster. If, after is not heard from regularly, the Coordinator cannot be sure it is still in sync with the rest of the cluster. If, after
@ -3334,6 +3334,7 @@ When setting up a NiFi cluster, these properties should be configured the same w
|==== |====
|*Property*|*Description* |*Property*|*Description*
|`nifi.cluster.protocol.heartbeat.interval`|The interval at which nodes should emit heartbeats to the Cluster Coordinator. The default value is `5 sec`. |`nifi.cluster.protocol.heartbeat.interval`|The interval at which nodes should emit heartbeats to the Cluster Coordinator. The default value is `5 sec`.
|`nifi.cluster.protocol.heartbeat.missable.max`|Maximum number of heartbeats a Cluster Coordinator can miss for a node in the cluster before the Cluster Coordinator updates the node status to Disconnected. The default value is `8`.
|`nifi.cluster.protocol.is.secure`|This indicates whether cluster communications are secure. The default value is `false`. |`nifi.cluster.protocol.is.secure`|This indicates whether cluster communications are secure. The default value is `false`.
|==== |====

View File

@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
private final int heartbeatIntervalMillis; private final int heartbeatIntervalMillis;
private final int missableHeartbeatCount;
private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class); private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
protected final ClusterCoordinator clusterCoordinator; protected final ClusterCoordinator clusterCoordinator;
protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
@ -51,6 +52,9 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
this.missableHeartbeatCount = nifiProperties.getIntegerProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX,
NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX);
// Register an event listener so that if any nodes are removed, we also remove the heartbeat. // Register an event listener so that if any nodes are removed, we also remove the heartbeat.
// Otherwise, we'll have a condition where a node is removed from the Cluster Coordinator, but its heartbeat has already been received. // Otherwise, we'll have a condition where a node is removed from the Cluster Coordinator, but its heartbeat has already been received.
// As a result, when it is processed, we will ask the node to reconnect, adding it back to the cluster. // As a result, when it is processed, we will ask the node to reconnect, adding it back to the cluster.
@ -158,8 +162,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
procStopWatch.stop(); procStopWatch.stop();
logger.info("Finished processing {} heartbeats in {}", latestHeartbeats.size(), procStopWatch.getDuration()); logger.info("Finished processing {} heartbeats in {}", latestHeartbeats.size(), procStopWatch.getDuration());
// Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval) // Disconnect any node that hasn't sent a heartbeat in a long time (CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX times the heartbeat interval)
final long maxMillis = heartbeatIntervalMillis * 8; final long maxMillis = heartbeatIntervalMillis * missableHeartbeatCount;
final long currentTimestamp = System.currentTimeMillis(); final long currentTimestamp = System.currentTimeMillis();
final long threshold = currentTimestamp - maxMillis; final long threshold = currentTimestamp - maxMillis;

View File

@ -200,6 +200,7 @@ nifi.security.user.knox.audiences=
# cluster common properties (all nodes must have same values) # # cluster common properties (all nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=5 sec nifi.cluster.protocol.heartbeat.interval=5 sec
nifi.cluster.protocol.heartbeat.missable.max=8
nifi.cluster.protocol.is.secure=false nifi.cluster.protocol.is.secure=false
# cluster node properties (only configure for cluster nodes) # # cluster node properties (only configure for cluster nodes) #

View File

@ -200,6 +200,7 @@ nifi.security.user.knox.audiences=
# cluster common properties (all nodes must have same values) # # cluster common properties (all nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=5 sec nifi.cluster.protocol.heartbeat.interval=5 sec
nifi.cluster.protocol.heartbeat.missable.max=8
nifi.cluster.protocol.is.secure=false nifi.cluster.protocol.is.secure=false
# cluster node properties (only configure for cluster nodes) # # cluster node properties (only configure for cluster nodes) #

View File

@ -178,6 +178,7 @@
<!-- nifi.properties: cluster common properties (cluster manager and nodes must have same values) --> <!-- nifi.properties: cluster common properties (cluster manager and nodes must have same values) -->
<nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval> <nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval>
<nifi.cluster.protocol.heartbeat.missable.max>8</nifi.cluster.protocol.heartbeat.missable.max>
<nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure> <nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure>
<!-- nifi.properties: cluster node properties (only configure for cluster nodes) --> <!-- nifi.properties: cluster node properties (only configure for cluster nodes) -->

View File

@ -212,6 +212,7 @@ nifi.security.user.knox.audiences=${nifi.security.user.knox.audiences}
# cluster common properties (all nodes must have same values) # # cluster common properties (all nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=${nifi.cluster.protocol.heartbeat.interval} nifi.cluster.protocol.heartbeat.interval=${nifi.cluster.protocol.heartbeat.interval}
nifi.cluster.protocol.heartbeat.missable.max=${nifi.cluster.protocol.heartbeat.missable.max}
nifi.cluster.protocol.is.secure=${nifi.cluster.protocol.is.secure} nifi.cluster.protocol.is.secure=${nifi.cluster.protocol.is.secure}
# cluster node properties (only configure for cluster nodes) # # cluster node properties (only configure for cluster nodes) #