NIFI-2617: Instead of retrying an infinite number of times to communicate with ZooKeeper, should try only for a short period in CuratorLeaderElectionManager. This is because web requests may be blocked waiting on a determination of which node is cluster coordinator

Signed-off-by: Yolanda M. Davis <ymdavis@apache.org>

This closes #906
This commit is contained in:
Mark Payne 2016-08-22 10:27:26 -04:00 committed by Yolanda M. Davis
parent 5fab783b50
commit 1fcec3747b
1 changed files with 10 additions and 6 deletions

View File

@ -28,7 +28,7 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant; import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
@ -63,7 +63,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
stopped = false; stopped = false;
final RetryPolicy retryPolicy = new RetryForever(5000); final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
curatorClient = CuratorFrameworkFactory.builder() curatorClient = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getConnectString()) .connectString(zkConfig.getConnectString())
.sessionTimeoutMs(zkConfig.getSessionTimeoutMillis()) .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
@ -177,9 +177,13 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]"; return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]";
} }
private synchronized LeaderRole getLeaderRole(final String roleName) {
return leaderRoles.get(roleName);
}
@Override @Override
public synchronized boolean isLeader(final String roleName) { public boolean isLeader(final String roleName) {
final LeaderRole role = leaderRoles.get(roleName); final LeaderRole role = getLeaderRole(roleName);
if (role == null) { if (role == null) {
return false; return false;
} }
@ -188,8 +192,8 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
} }
@Override @Override
public synchronized String getLeader(final String roleName) { public String getLeader(final String roleName) {
final LeaderRole role = leaderRoles.get(roleName); final LeaderRole role = getLeaderRole(roleName);
if (role == null) { if (role == null) {
return null; return null;
} }