NIFI-5096: Periodically poll ZooKeeper to determine the leader for each registered role in Leader Election. This avoids a condition whereby a node may occasionally fail to receive notification that it is no longer the elected leader.

NIFI-5096: More proactively setting leadership to false when ZooKeeper/Curator ConnectionState changes

This closes #2646
This commit is contained in:
Mark Payne 2018-04-19 09:05:32 -04:00 committed by Matt Gilman
parent 262bf011e4
commit 54eb6bc232
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
1 changed files with 62 additions and 5 deletions

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.controller.leader.election;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@ -36,6 +34,9 @@ import org.apache.zookeeper.common.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class CuratorLeaderElectionManager implements LeaderElectionManager {
private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
@ -112,7 +113,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
final boolean isParticipant = participantId != null && !participantId.trim().isEmpty();
if (!isStopped()) {
final ElectionListener electionListener = new ElectionListener(roleName, listener);
final ElectionListener electionListener = new ElectionListener(roleName, listener, participantId);
final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
if (isParticipant) {
leaderSelector.autoRequeue();
@ -358,12 +359,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
private final String roleName;
private final LeaderElectionStateChangeListener listener;
private final String participantId;
private volatile boolean leader;
public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) {
public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
this.roleName = roleName;
this.listener = listener;
this.participantId = participantId;
}
public boolean isLeader() {
@ -373,9 +376,37 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
logger.info("{} Connection State changed to {}", this, newState.name());
if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
if (leader == true) {
logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
}
leader = false;
}
super.stateChanged(client, newState);
}
/**
* Reach out to ZooKeeper to verify that this node still is the leader. We do this because at times, a node will lose
* its position as leader but the Curator client will fail to notify us, perhaps due to network failure, etc.
*
* @return <code>true</code> if this node is still the elected leader according to ZooKeeper, false otherwise
*/
private boolean verifyLeader() {
final String leader = getLeader(roleName);
if (leader == null) {
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName);
return false;
}
final boolean match = leader.equals(participantId);
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}",
roleName, leader, participantId, match);
return match;
}
@Override
public void takeLeadership(final CuratorFramework client) throws Exception {
leader = true;
@ -396,7 +427,9 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
// Curator API states that we lose the leadership election when we return from this method,
// so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
try {
while (!isStopped()) {
int failureCount = 0;
int loopCount = 0;
while (!isStopped() && leader) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
@ -404,6 +437,30 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
Thread.currentThread().interrupt();
return;
}
if (leader && ++loopCount % 50 == 0) {
// While Curator is supposed to interrupt this thread when we are no longer the leader, we have occasionally
// seen occurrences where the thread does not get interrupted. As a result, we will reach out to ZooKeeper
// periodically to determine whether or not this node is still the elected leader.
try {
final boolean stillLeader = verifyLeader();
failureCount = 0; // we got a response, so we were successful in communicating with zookeeper. Set failureCount back to 0.
if (!stillLeader) {
logger.info("According to ZooKeeper, this node is no longer the leader for Role '{}'. Will relinquish leadership.", roleName);
break;
}
} catch (final Exception e) {
failureCount++;
if (failureCount > 1) {
logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+ "but failed to communicate with ZooKeeper. This is the second failed attempt, so will relinquish leadership of this role.", roleName, e);
} else {
logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+ "but failed to communicate with ZooKeeper. Will wait a bit and attempt to communicate with ZooKeeper again before relinquishing this role.", roleName, e);
}
}
}
}
} finally {
leader = false;