mirror of https://github.com/apache/nifi.git
NIFI-2605: - Fixing a regression bug where nodes would potentially be elected leader for Cluster Coordinator role when they do not have the correct flow
- Ensure that we log which node is the cluster coordinator on startup instead of just indicating that there is one. If we later determine that there is none, ensure that we register for the role This closes #900 Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
parent
a2ed0f0610
commit
c2ae7a6d7c
|
@ -23,7 +23,7 @@ import org.apache.curator.RetryPolicy;
|
|||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.RetryNTimes;
|
||||
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
|
||||
import org.apache.nifi.cluster.protocol.ProtocolContext;
|
||||
import org.apache.nifi.cluster.protocol.ProtocolException;
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
|
||||
import org.apache.nifi.cluster.protocol.ProtocolContext;
|
||||
import org.apache.nifi.cluster.protocol.ProtocolException;
|
||||
|
|
|
@ -39,10 +39,10 @@ import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
|
|||
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
|
||||
import org.apache.nifi.cluster.event.Event;
|
||||
import org.apache.nifi.cluster.event.NodeEvent;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
|
||||
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.protocol.ComponentRevision;
|
||||
import org.apache.nifi.cluster.protocol.ConnectionRequest;
|
||||
import org.apache.nifi.cluster.protocol.ConnectionResponse;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
|
||||
/**
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
|
||||
/**
|
||||
* Signals that an operation to be performed on a cluster has been invoked at an illegal or inappropriate time.
|
||||
*
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
|
||||
/**
|
||||
* Represents the exceptional case when the cluster is unable to service a request because no nodes are connected.
|
||||
*
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
|
||||
/**
|
||||
* Represents the exceptional case when the cluster is unable to service a request because no nodes returned a response. When the given request is not mutable the nodes are left in their previous
|
||||
* state.
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
|
||||
/**
|
||||
* Represents the exceptional case when a disconnection request to a node failed.
|
||||
*
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
|
||||
/**
|
||||
* Represents the exceptional case when a reconnection request to a node failed.
|
||||
*
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
|
||||
/**
|
||||
* Represents the exceptional case when a request is made for a node that does not exist.
|
||||
*
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.cluster.integration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -40,7 +41,7 @@ public class Cluster {
|
|||
private final Set<Node> nodes = new HashSet<>();
|
||||
private final TestingServer zookeeperServer;
|
||||
|
||||
public Cluster() {
|
||||
public Cluster() throws IOException {
|
||||
try {
|
||||
zookeeperServer = new TestingServer();
|
||||
} catch (final Exception e) {
|
||||
|
@ -114,7 +115,8 @@ public class Cluster {
|
|||
addProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString());
|
||||
addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true");
|
||||
|
||||
final Node node = new Node(NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps));
|
||||
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps);
|
||||
final Node node = new Node(nifiProperties);
|
||||
node.start();
|
||||
nodes.add(node);
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ public class ClusterConnectionIT {
|
|||
}
|
||||
|
||||
@Before
|
||||
public void createCluster() {
|
||||
public void createCluster() throws IOException {
|
||||
cluster = new Cluster();
|
||||
cluster.start();
|
||||
}
|
||||
|
@ -140,9 +140,8 @@ public class ClusterConnectionIT {
|
|||
cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRestartAllNodes() throws IOException {
|
||||
public void testRestartAllNodes() throws IOException, InterruptedException {
|
||||
final Node firstNode = cluster.createNode();
|
||||
final Node secondNode = cluster.createNode();
|
||||
final Node thirdNode = cluster.createNode();
|
||||
|
@ -164,7 +163,13 @@ public class ClusterConnectionIT {
|
|||
firstNode.start();
|
||||
secondNode.start();
|
||||
|
||||
cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
|
||||
|
||||
firstNode.waitUntilConnected(20, TimeUnit.SECONDS);
|
||||
System.out.println("\n\n\n**** Node 1 Re-Connected ****\n\n\n");
|
||||
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
|
||||
System.out.println("**** Node 2 Re-Connected ****");
|
||||
thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
|
||||
System.out.println("**** Node 3 Re-Connected ****");
|
||||
|
||||
// wait for all 3 nodes to agree that node 2 is connected
|
||||
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
|
||||
|
@ -205,7 +210,7 @@ public class ClusterConnectionIT {
|
|||
otherNode.assertNodeConnects(nodeToSuspend.getIdentifier(), 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 60000)
|
||||
public void testNodeInheritsClusterTopologyOnHeartbeat() throws InterruptedException {
|
||||
final Node node1 = cluster.createNode();
|
||||
final Node node2 = cluster.createNode();
|
||||
|
|
|
@ -87,8 +87,9 @@ public class Node {
|
|||
|
||||
private ScheduledExecutorService executor = new FlowEngine(8, "Node tasks", true);
|
||||
|
||||
|
||||
public Node(final NiFiProperties properties) {
|
||||
this(new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null), properties);
|
||||
this(createNodeId(), properties);
|
||||
}
|
||||
|
||||
public Node(final NodeIdentifier nodeId, final NiFiProperties properties) {
|
||||
|
@ -121,6 +122,10 @@ public class Node {
|
|||
}
|
||||
|
||||
|
||||
private static NodeIdentifier createNodeId() {
|
||||
return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null);
|
||||
}
|
||||
|
||||
public synchronized void start() {
|
||||
running = true;
|
||||
|
||||
|
@ -148,6 +153,7 @@ public class Node {
|
|||
StringEncryptor.createEncryptor(nodeProperties), revisionManager, Mockito.mock(Authorizer.class));
|
||||
|
||||
flowService.start();
|
||||
|
||||
flowService.load(null);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
package org.apache.nifi.cluster.exception;
|
||||
|
||||
/**
|
||||
* The base exception class for cluster related exceptions.
|
|
@ -15,7 +15,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.manager.exception;
|
||||
package org.apache.nifi.cluster.exception;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
|
||||
public class NoClusterCoordinatorException extends ClusterException {
|
||||
private static final long serialVersionUID = -1782098541351698293L;
|
|
@ -591,16 +591,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
// kicking everyone out. This way, we instead inherit the cluster flow before we attempt to be
|
||||
// the coordinator.
|
||||
LOG.info("Checking if there is already a Cluster Coordinator Elected...");
|
||||
final NodeIdentifier electedCoordinatorNodeId = clusterCoordinator.getElectedActiveCoordinatorNode();
|
||||
if (electedCoordinatorNodeId == null) {
|
||||
final String clusterCoordinatorAddress = leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
|
||||
if (StringUtils.isEmpty(clusterCoordinatorAddress)) {
|
||||
LOG.info("It appears that no Cluster Coordinator has been Elected yet. Registering for Cluster Coordinator Role.");
|
||||
registerForClusterCoordinator();
|
||||
registerForClusterCoordinator(true);
|
||||
} else {
|
||||
LOG.info("The Elected Cluster Coordinator is {}. Will not register to be elected for this role until after connecting "
|
||||
+ "to the cluster and inheriting the cluster's flow.", electedCoordinatorNodeId);
|
||||
// At this point, we have determined that there is a Cluster Coordinator elected. It is important to note, though,
|
||||
// that if we are running an embedded ZooKeeper, and we have just restarted the cluster (at least the nodes that run the
|
||||
// embedded ZooKeeper), that we could possibly determine that the Cluster Coordinator is at an address that is not really
|
||||
// valid. This is because the latest stable ZooKeeper does not support "Container ZNodes" and as a result the ZNodes that
|
||||
// are created are persistent, not ephemeral. Upon restart, we can get this persisted value, even though the node that belongs
|
||||
// to that address has not started. ZooKeeper/Curator will recognize this after a while and delete the ZNode. As a result,
|
||||
// we may later determine that there is in fact no Cluster Coordinator. If this happens, we will automatically register for
|
||||
// Cluster Coordinator through the StandardFlowService.
|
||||
LOG.info("The Election for Cluster Coordinator has already begun (Leader is {}). Will not register to be elected for this role until after connecting "
|
||||
+ "to the cluster and inheriting the cluster's flow.", clusterCoordinatorAddress);
|
||||
registerForClusterCoordinator(false);
|
||||
}
|
||||
|
||||
leaderElectionManager.start();
|
||||
heartbeatMonitor.start();
|
||||
} else {
|
||||
heartbeater = null;
|
||||
}
|
||||
|
@ -3321,8 +3331,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
return configuredForClustering;
|
||||
}
|
||||
|
||||
private void registerForClusterCoordinator() {
|
||||
final String participantId = heartbeatMonitor.getHeartbeatAddress();
|
||||
void registerForClusterCoordinator(final boolean participate) {
|
||||
final String participantId = participate ? heartbeatMonitor.getHeartbeatAddress() : null;
|
||||
|
||||
leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() {
|
||||
@Override
|
||||
|
@ -3342,12 +3352,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
@Override
|
||||
public synchronized void onLeaderElection() {
|
||||
LOG.info("This node elected Active Cluster Coordinator");
|
||||
heartbeatMonitor.start(); // ensure heartbeat monitor is started
|
||||
}
|
||||
}, participantId);
|
||||
}
|
||||
|
||||
private void registerForPrimaryNode() {
|
||||
void registerForPrimaryNode() {
|
||||
final String participantId = heartbeatMonitor.getHeartbeatAddress();
|
||||
|
||||
leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new LeaderElectionStateChangeListener() {
|
||||
|
@ -3401,7 +3410,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
|
||||
// Participate in Leader Election for Heartbeat Monitor. Start the heartbeat monitor
|
||||
// if/when we become leader and stop it when we lose leader role
|
||||
registerForClusterCoordinator();
|
||||
registerForClusterCoordinator(true);
|
||||
|
||||
leaderElectionManager.start();
|
||||
stateManagerProvider.enableClusterProvider();
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
|||
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.protocol.ConnectionRequest;
|
||||
import org.apache.nifi.cluster.protocol.ConnectionResponse;
|
||||
import org.apache.nifi.cluster.protocol.DataFlow;
|
||||
|
@ -787,6 +788,17 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
// we received a successful connection response from manager
|
||||
break;
|
||||
}
|
||||
} catch (final NoClusterCoordinatorException ncce) {
|
||||
logger.warn("There is currently no Cluster Coordinator. This often happens upon restart of NiFi when running an embedded ZooKeeper. Will register this node "
|
||||
+ "to become the active Cluster Coordinator and will attempt to connect to cluster again");
|
||||
controller.registerForClusterCoordinator(true);
|
||||
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (final InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
} catch (final Exception pe) {
|
||||
// could not create a socket and communicate with manager
|
||||
logger.warn("Failed to connect to cluster due to: " + pe);
|
||||
|
@ -798,6 +810,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
try {
|
||||
Thread.sleep(response == null ? 5000 : response.getTryLaterSeconds());
|
||||
} catch (final InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.curator.retry.RetryNTimes;
|
|||
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
|
||||
import org.apache.nifi.engine.FlowEngine;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.common.PathUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -62,16 +63,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
|
|||
|
||||
stopped = false;
|
||||
|
||||
final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
|
||||
curatorClient = CuratorFrameworkFactory.builder()
|
||||
.connectString(zkConfig.getConnectString())
|
||||
.sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
|
||||
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
|
||||
.retryPolicy(retryPolicy)
|
||||
.defaultData(new byte[0])
|
||||
.build();
|
||||
|
||||
curatorClient.start();
|
||||
curatorClient = createClient();
|
||||
|
||||
// Call #register for each already-registered role. This will
|
||||
// cause us to start listening for leader elections for that
|
||||
|
@ -84,52 +76,60 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
|
|||
logger.info("{} started", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void register(final String roleName) {
|
||||
register(roleName, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(String roleName, LeaderElectionStateChangeListener listener) {
|
||||
register(roleName, listener, null);
|
||||
}
|
||||
|
||||
private String getElectionPath(final String roleName) {
|
||||
final String rootPath = zkConfig.getRootPath();
|
||||
final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName;
|
||||
return leaderPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
|
||||
logger.debug("{} Registering new Leader Selector for role {}", this, roleName);
|
||||
|
||||
if (leaderRoles.containsKey(roleName)) {
|
||||
// If we already have a Leader Role registered and either the Leader Role is participating in election,
|
||||
// or the given participant id == null (don't want to participant in election) then we're done.
|
||||
final LeaderRole currentRole = leaderRoles.get(roleName);
|
||||
if (currentRole != null && (currentRole.isParticipant() || participantId == null)) {
|
||||
logger.info("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName);
|
||||
return;
|
||||
}
|
||||
|
||||
final String rootPath = zkConfig.getRootPath();
|
||||
final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName;
|
||||
final String leaderPath = getElectionPath(roleName);
|
||||
|
||||
try {
|
||||
PathUtils.validatePath(rootPath);
|
||||
PathUtils.validatePath(leaderPath);
|
||||
} catch (final IllegalArgumentException e) {
|
||||
throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name");
|
||||
}
|
||||
|
||||
registeredRoles.put(roleName, new RegisteredRole(participantId, listener));
|
||||
|
||||
final boolean isParticipant = participantId != null && !participantId.trim().isEmpty();
|
||||
|
||||
if (!isStopped()) {
|
||||
final ElectionListener electionListener = new ElectionListener(roleName, listener);
|
||||
final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
|
||||
if (isParticipant) {
|
||||
leaderSelector.autoRequeue();
|
||||
if (participantId != null) {
|
||||
leaderSelector.setId(participantId);
|
||||
leaderSelector.start();
|
||||
}
|
||||
|
||||
leaderSelector.start();
|
||||
|
||||
final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener);
|
||||
final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener, isParticipant);
|
||||
|
||||
leaderRoles.put(roleName, leaderRole);
|
||||
}
|
||||
|
||||
logger.info("{} Registered new Leader Selector for role {}", this, roleName);
|
||||
if (isParticipant) {
|
||||
logger.info("{} Registered new Leader Selector for role {}; this node is an active participant in the election.", this, roleName);
|
||||
} else {
|
||||
logger.info("{} Registered new Leader Selector for role {}; this node is a silent observer in the election.", this, roleName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -151,9 +151,15 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
|
|||
public synchronized void stop() {
|
||||
stopped = true;
|
||||
|
||||
for (final LeaderRole role : leaderRoles.values()) {
|
||||
for (final Map.Entry<String, LeaderRole> entry : leaderRoles.entrySet()) {
|
||||
final LeaderRole role = entry.getValue();
|
||||
final LeaderSelector selector = role.getLeaderSelector();
|
||||
|
||||
try {
|
||||
selector.close();
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to close Leader Selector for {}", entry.getKey(), e);
|
||||
}
|
||||
}
|
||||
|
||||
leaderRoles.clear();
|
||||
|
@ -192,9 +198,13 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
|
|||
|
||||
@Override
|
||||
public String getLeader(final String roleName) {
|
||||
if (isStopped()) {
|
||||
return determineLeaderExternal(roleName);
|
||||
}
|
||||
|
||||
final LeaderRole role = getLeaderRole(roleName);
|
||||
if (role == null) {
|
||||
return null;
|
||||
return determineLeaderExternal(roleName);
|
||||
}
|
||||
|
||||
Participant participant;
|
||||
|
@ -217,14 +227,92 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
|
|||
return participantId;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Determines whether or not leader election has already begun for the role with the given name
|
||||
*
|
||||
* @param roleName the role of interest
|
||||
* @return <code>true</code> if leader election has already begun, <code>false</code> if it has not or if unable to determine this.
|
||||
*/
|
||||
@Override
|
||||
public boolean isLeaderElected(final String roleName) {
|
||||
final String leaderAddress = determineLeaderExternal(roleName);
|
||||
return !StringUtils.isEmpty(leaderAddress);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Use a new Curator client to determine which node is the elected leader for the given role.
|
||||
*
|
||||
* @param roleName the name of the role
|
||||
* @return the id of the elected leader, or <code>null</code> if no leader has been selected or if unable to determine
|
||||
* the leader from ZooKeeper
|
||||
*/
|
||||
private String determineLeaderExternal(final String roleName) {
|
||||
final CuratorFramework client = createClient();
|
||||
try {
|
||||
final LeaderSelectorListener electionListener = new LeaderSelectorListener() {
|
||||
@Override
|
||||
public void stateChanged(CuratorFramework client, ConnectionState newState) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void takeLeadership(CuratorFramework client) throws Exception {
|
||||
}
|
||||
};
|
||||
|
||||
final String electionPath = getElectionPath(roleName);
|
||||
|
||||
// Note that we intentionally do not auto-requeue here, and we do not start the selector. We do not
|
||||
// want to join the leader election. We simply want to observe.
|
||||
final LeaderSelector selector = new LeaderSelector(client, electionPath, electionListener);
|
||||
|
||||
try {
|
||||
final Participant leader = selector.getLeader();
|
||||
return leader == null ? null : leader.getId();
|
||||
} catch (final KeeperException.NoNodeException nne) {
|
||||
// If there is no ZNode, then there is no elected leader.
|
||||
return null;
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Unable to determine the Elected Leader for role '{}' due to {}; assuming no leader has been elected", roleName, e.toString());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.warn("", e);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
} finally {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
private CuratorFramework createClient() {
|
||||
// Create a new client because we don't want to try indefinitely for this to occur.
|
||||
final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
|
||||
|
||||
final CuratorFramework client = CuratorFrameworkFactory.builder()
|
||||
.connectString(zkConfig.getConnectString())
|
||||
.sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
|
||||
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
|
||||
.retryPolicy(retryPolicy)
|
||||
.defaultData(new byte[0])
|
||||
.build();
|
||||
|
||||
client.start();
|
||||
return client;
|
||||
}
|
||||
|
||||
|
||||
private static class LeaderRole {
|
||||
|
||||
private final LeaderSelector leaderSelector;
|
||||
private final ElectionListener electionListener;
|
||||
private final boolean participant;
|
||||
|
||||
public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener) {
|
||||
public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener, final boolean participant) {
|
||||
this.leaderSelector = leaderSelector;
|
||||
this.electionListener = electionListener;
|
||||
this.participant = participant;
|
||||
}
|
||||
|
||||
public LeaderSelector getLeaderSelector() {
|
||||
|
@ -234,6 +322,10 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
|
|||
public boolean isLeader() {
|
||||
return electionListener.isLeader();
|
||||
}
|
||||
|
||||
public boolean isParticipant() {
|
||||
return participant;
|
||||
}
|
||||
}
|
||||
|
||||
private static class RegisteredRole {
|
||||
|
|
|
@ -24,14 +24,9 @@ public interface LeaderElectionManager {
|
|||
void start();
|
||||
|
||||
/**
|
||||
* Adds a new role for which a leader is required
|
||||
*
|
||||
* @param roleName the name of the role
|
||||
*/
|
||||
void register(String roleName);
|
||||
|
||||
/**
|
||||
* Adds a new role for which a leader is required, without providing a Participant ID
|
||||
* Adds a new role for which a leader is required, without participating in the leader election. I.e., this node
|
||||
* will not be elected leader but will passively observe changes to the leadership. This allows calls to {@link #isLeader(String)}
|
||||
* and {@link #getLeader(String)} to know which node is currently elected the leader.
|
||||
*
|
||||
* @param roleName the name of the role
|
||||
* @param listener a listener that will be called when the node gains or relinquishes
|
||||
|
@ -40,7 +35,8 @@ public interface LeaderElectionManager {
|
|||
void register(String roleName, LeaderElectionStateChangeListener listener);
|
||||
|
||||
/**
|
||||
* Adds a new role for which a leader is required, providing the given value for this node as the Participant ID
|
||||
* Adds a new role for which a leader is required, providing the given value for this node as the Participant ID. If the Participant ID
|
||||
* is <code>null</code>, this node will never be elected leader but will passively observe changes to the leadership.
|
||||
*
|
||||
* @param roleName the name of the role
|
||||
* @param listener a listener that will be called when the node gains or relinquishes
|
||||
|
@ -90,4 +86,12 @@ public interface LeaderElectionManager {
|
|||
* again, all previously registered roles will still be registered.
|
||||
*/
|
||||
void stop();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if a leader has been elected for the given role, <code>false</code> otherwise.
|
||||
*
|
||||
* @param roleName the name of the role
|
||||
* @return <code>true</code> if a leader has been elected, <code>false</code> otherwise.
|
||||
*/
|
||||
boolean isLeaderElected(String roleName);
|
||||
}
|
||||
|
|
|
@ -28,10 +28,6 @@ public class StandaloneLeaderElectionManager implements LeaderElectionManager {
|
|||
public void start() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(final String roleName) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(final String roleName, final LeaderElectionStateChangeListener listener) {
|
||||
}
|
||||
|
@ -62,4 +58,9 @@ public class StandaloneLeaderElectionManager implements LeaderElectionManager {
|
|||
@Override
|
||||
public void stop() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLeaderElected(String roleName) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class NarCloseable implements Closeable {
|
|||
frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader();
|
||||
} catch (final Exception e) {
|
||||
// This should never happen in a running instance, but it will occur in unit tests
|
||||
logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without change ClassLoaders.");
|
||||
logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without changing ClassLoaders.");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", e);
|
||||
}
|
||||
|
|
|
@ -96,6 +96,7 @@
|
|||
<logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" level="ERROR" />
|
||||
<logger name="org.apache.zookeeper.server.quorum" level="ERROR" />
|
||||
<logger name="org.apache.zookeeper.ZooKeeper" level="ERROR" />
|
||||
<logger name="org.apache.zookeeper.server.PrepRequestProcessor" level="ERROR" />
|
||||
|
||||
<logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF" />
|
||||
<logger name="org.apache.curator.ConnectionState" level="OFF" />
|
||||
|
|
|
@ -23,9 +23,9 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.nifi.authorization.AccessDeniedException;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
|
||||
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.controller.repository.claim.ContentDirection;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
|
|
|
@ -40,9 +40,9 @@ import org.apache.nifi.authorization.user.NiFiUser;
|
|||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
|
||||
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
|
||||
|
|
|
@ -30,8 +30,8 @@ import org.apache.nifi.authorization.user.NiFiUser;
|
|||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.controller.Snippet;
|
||||
|
|
|
@ -19,7 +19,8 @@ package org.apache.nifi.web.api.config;
|
|||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.ext.ExceptionMapper;
|
||||
import javax.ws.rs.ext.Provider;
|
||||
import org.apache.nifi.cluster.manager.exception.ClusterException;
|
||||
|
||||
import org.apache.nifi.cluster.exception.ClusterException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.nifi.web.api.config;
|
|||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.ext.ExceptionMapper;
|
||||
|
||||
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
|
Loading…
Reference in New Issue