NIFI-483: Use ZooKeeper's Leader Election to determine Primary Node. This closes #301

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2016-03-23 13:16:13 -04:00 committed by Matt Gilman
parent 0d3bd2c401
commit 1ac05266a5
19 changed files with 498 additions and 412 deletions

View File

@ -456,6 +456,12 @@ language governing permissions and limitations under the License. -->
<nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
<nifi.cluster.manager.safemode.duration>0 sec</nifi.cluster.manager.safemode.duration>
<!-- nifi.properties: zookeeper properties -->
<nifi.zookeeper.connect.string></nifi.zookeeper.connect.string>
<nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>
<nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout>
<nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>
<!-- nifi.properties: kerberos properties -->
<nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>
<nifi.kerberos.service.principal />

View File

@ -174,6 +174,12 @@ public class NiFiProperties extends Properties {
public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address";
public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port";
// zookeeper properties
public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string";
public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout";
public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout";
public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node";
// cluster manager properties
public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager";
public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address";
@ -226,6 +232,9 @@ public class NiFiProperties extends Properties {
public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";
// cluster common defaults
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";

View File

@ -19,7 +19,6 @@ package org.apache.nifi.cluster.protocol;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
@ -56,14 +55,6 @@ public interface ClusterManagerProtocolSender {
*/
void disconnect(DisconnectMessage msg) throws ProtocolException;
/**
* Sends an "assign primary role" message to a node.
*
* @param msg a message
* @throws ProtocolException if communication failed
*/
void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException;
/**
* Sets the {@link BulletinRepository} that can be used to report bulletins
*

View File

@ -31,7 +31,6 @@ import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@ -56,7 +55,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
private final ProtocolContext<ProtocolMessage> protocolContext;
private final SocketConfiguration socketConfiguration;
private int handshakeTimeoutSeconds;
private volatile BulletinRepository bulletinRepository;
public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
if (socketConfiguration == null) {
@ -71,7 +69,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
@Override
public void setBulletinRepository(final BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
}
/**
@ -183,30 +180,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
}
}
/**
* Assigns the primary role to a node.
*
* @param msg a message
*
* @throws ProtocolException if the message failed to be sent
*/
@Override
public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException {
Socket socket = null;
try {
socket = createSocket(msg.getNodeId(), true);
try {
// marshal message to output stream
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
marshaller.marshal(msg, socket.getOutputStream());
} catch (final IOException ioe) {
throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
}
} finally {
SocketUtils.closeQuietly(socket);
}
}
private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException {
// update socket timeout, if handshake timeout was set; otherwise use socket's current timeout

View File

@ -26,7 +26,6 @@ import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
@ -108,10 +107,4 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
public void disconnect(DisconnectMessage msg) throws ProtocolException {
sender.disconnect(msg);
}
@Override
public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException {
sender.assignPrimaryRole(msg);
}
}

View File

@ -27,7 +27,6 @@ import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
import org.apache.nifi.cluster.protocol.message.PingMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
@ -92,8 +91,4 @@ public class ObjectFactory {
public ControllerStartupFailureMessage createControllerStartupFailureMessage() {
return new ControllerStartupFailureMessage();
}
public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() {
return new PrimaryRoleAssignmentMessage();
}
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.message;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
/**
*/
@XmlRootElement(name = "primaryRoleAssignmentMessage")
public class PrimaryRoleAssignmentMessage extends ProtocolMessage {
private NodeIdentifier nodeId;
private boolean primary;
@XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
public NodeIdentifier getNodeId() {
return nodeId;
}
public void setNodeId(NodeIdentifier nodeId) {
this.nodeId = nodeId;
}
public boolean isPrimary() {
return primary;
}
public void setPrimary(boolean primary) {
this.primary = primary;
}
@Override
public MessageType getType() {
return MessageType.PRIMARY_ROLE;
}
}

View File

@ -31,7 +31,6 @@ public abstract class ProtocolMessage {
FLOW_RESPONSE,
HEARTBEAT,
PING,
PRIMARY_ROLE,
RECONNECTION_REQUEST,
RECONNECTION_RESPONSE,
SERVICE_BROADCAST,

View File

@ -23,9 +23,7 @@ import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.node.Node.Status;
@ -141,22 +139,6 @@ public interface ClusterManager extends NodeInformant {
*/
List<Event> getNodeEvents(final String nodeId);
/**
* Revokes the primary role from the current primary node and assigns the primary role to given given node ID.
*
* If role revocation fails, then the current primary node is set to disconnected while retaining the primary role and no role assignment is performed.
*
* If role assignment fails, then the given node is set to disconnected and is given the primary role.
*
* @param nodeId the node identifier
* @param userDn the Distinguished Name of the user requesting that the Primary Node be assigned
*
* @throws UnknownNodeException if the node with the given identifier does not exist
* @throws IneligiblePrimaryNodeException if the node with the given identifier is not eligible to be the primary node
* @throws PrimaryRoleAssignmentException if the cluster was unable to change the primary role to the requested node
*/
void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;
/**
* @return the primary node of the cluster or null if no primary node exists
*/

View File

@ -93,12 +93,10 @@ import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.NodeReconnectionException;
import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
@ -118,7 +116,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
@ -551,9 +548,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
servicesBroadcaster.start();
}
// start in safe mode
executeSafeModeTask();
// Load and start running Reporting Tasks
final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks();
if (serializedReportingTasks != null && serializedReportingTasks.length > 0) {
@ -1312,88 +1306,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
/**
* Messages the node to have the primary role. If the messaging fails, then the node is marked as disconnected.
*
* @param nodeId the node ID to assign primary role
*
* @return true if primary role assigned; false otherwise
*/
private boolean assignPrimaryRole(final NodeIdentifier nodeId) {
writeLock.lock();
try {
// create primary role message
final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
msg.setNodeId(nodeId);
msg.setPrimary(true);
logger.info("Attempting to assign primary role to node: " + nodeId);
// message
senderListener.assignPrimaryRole(msg);
logger.info("Assigned primary role to node: " + nodeId);
addBulletin(nodeId, Severity.INFO, "Node assigned primary role");
// true indicates primary role assigned
return true;
} catch (final ProtocolException ex) {
logger.warn("Failed attempt to assign primary role to node " + nodeId + " due to " + ex);
addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role to node due to: " + ex);
// mark node as disconnected and log/record the event
final Node node = getRawNode(nodeId.getId());
node.setStatus(Status.DISCONNECTED);
addEvent(node.getNodeId(), "Disconnected because of failed attempt to assign primary role.");
addBulletin(nodeId, Severity.WARNING, "Node disconnected because of failed attempt to assign primary role");
// false indicates primary role failed to be assigned
return false;
} finally {
writeLock.unlock("assignPrimaryRole");
}
}
/**
* Messages the node with the given node ID to no longer have the primary role. If the messaging fails, then the node is marked as disconnected.
*
* @return true if the primary role was revoked from the node; false otherwise
*/
private boolean revokePrimaryRole(final NodeIdentifier nodeId) {
writeLock.lock();
try {
// create primary role message
final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
msg.setNodeId(nodeId);
msg.setPrimary(false);
logger.info("Attempting to revoke primary role from node: " + nodeId);
// send message
senderListener.assignPrimaryRole(msg);
logger.info("Revoked primary role from node: " + nodeId);
addBulletin(nodeId, Severity.INFO, "Primary Role revoked from node");
// true indicates primary role was revoked
return true;
} catch (final ProtocolException ex) {
logger.warn("Failed attempt to revoke primary role from node " + nodeId + " due to " + ex);
// mark node as disconnected and log/record the event
final Node node = getRawNode(nodeId.getId());
node.setStatus(Status.DISCONNECTED);
addEvent(node.getNodeId(), "Disconnected because of failed attempt to revoke primary role.");
addBulletin(node, Severity.ERROR, "Node disconnected because of failed attempt to revoke primary role");
// false indicates primary role failed to be revoked
return false;
} finally {
writeLock.unlock("revokePrimaryRole");
}
}
private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(),
@ -1778,12 +1690,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// get a raw reference to the node (if it doesn't exist, node will be null)
node = getRawNode(resolvedNodeIdentifier.getId());
// if the node thinks it has the primary role, but the manager has assigned the role to a different node, then revoke the role
if (mostRecentHeartbeat.isPrimary() && !isPrimaryNode(resolvedNodeIdentifier)) {
addEvent(resolvedNodeIdentifier, "Heartbeat indicates node is running as primary node. Revoking primary role because primary role is assigned to a different node.");
revokePrimaryRole(resolvedNodeIdentifier);
}
final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected();
if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
@ -1871,6 +1777,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// record heartbeat
node.setHeartbeat(mostRecentHeartbeat);
if (mostRecentHeartbeat.isPrimary()) {
setPrimaryNodeId(node.getNodeId());
}
}
} catch (final Exception e) {
logger.error("Failed to process heartbeat from {}:{} due to {}",
@ -1984,47 +1894,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
@Override
public void setPrimaryNode(final String nodeId, final String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException {
writeLock.lock();
try {
final Node node = getNode(nodeId);
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
} else if (Status.CONNECTED != node.getStatus()) {
throw new IneligiblePrimaryNodeException("Node must be connected before it can be assigned as the primary node.");
}
// revoke primary role
final Node primaryNode;
if ((primaryNode = getPrimaryNode()) != null) {
if (primaryNode.getStatus() == Status.DISCONNECTED) {
throw new PrimaryRoleAssignmentException("A disconnected, primary node exists. Delete the node before assigning the primary role to a different node.");
} else if (revokePrimaryRole(primaryNode.getNodeId())) {
addEvent(primaryNode.getNodeId(), "Role revoked from this node as part of primary role reassignment.");
} else {
throw new PrimaryRoleAssignmentException(
"Failed to revoke primary role from node. Primary node is now disconnected. Delete the node before assigning the primary role to a different node.");
}
}
// change the primary node ID to the given node
setPrimaryNodeId(node.getNodeId());
// assign primary role
if (assignPrimaryRole(node.getNodeId())) {
addEvent(node.getNodeId(), "Role assigned to this node as part of primary role reassignment. Action performed by " + userDn);
addBulletin(node, Severity.INFO, "Primary Role assigned to node by " + userDn);
} else {
throw new PrimaryRoleAssignmentException(
"Cluster manager assigned primary role to node, but the node failed to accept the assignment. Cluster manager disconnected node.");
}
} finally {
writeLock.unlock("setPrimaryNode");
}
}
private int getClusterProtocolHeartbeatSeconds() {
return (int) FormatUtils.getTimeDuration(properties.getClusterProtocolHeartbeatInterval(), TimeUnit.SECONDS);
}
@ -4508,66 +4377,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
private void executeSafeModeTask() {
new Thread(new Runnable() {
private final long threadStartTime = System.currentTimeMillis();
@Override
public void run() {
logger.info("Entering safe mode...");
final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS);
final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
boolean exitSafeMode = false;
while (isRunning()) {
writeLock.lock();
try {
final long currentTime = System.currentTimeMillis();
if (timeToElect < currentTime) {
final Set<NodeIdentifier> connectedNodeIds = getNodeIds(Status.CONNECTED);
if (!connectedNodeIds.isEmpty()) {
// get first connected node ID
final NodeIdentifier connectedNodeId = connectedNodeIds.iterator().next();
if (assignPrimaryRole(connectedNodeId)) {
try {
setPrimaryNodeId(connectedNodeId);
exitSafeMode = true;
} catch (final DaoException de) {
final String message = String.format("Failed to persist primary node ID '%s' in cluster dataflow.", connectedNodeId);
logger.warn(message);
addBulletin(connectedNodeId, Severity.WARNING, message);
revokePrimaryRole(connectedNodeId);
}
}
}
}
if (!isInSafeMode()) {
// a primary node has been selected outside of this thread
exitSafeMode = true;
logger.info("Exiting safe mode because " + primaryNodeId + " has been assigned the primary role.");
break;
}
} finally {
writeLock.unlock("executeSafeModeTask");
}
if (!exitSafeMode) {
// sleep for a bit
try {
Thread.sleep(1000);
} catch (final InterruptedException ie) {
return;
}
}
}
}
}).start();
}
/**
* This timer task simply processes any pending heartbeats. This timer task is not strictly needed, as HeartbeatMonitoringTimerTask will do this. However, this task is scheduled much more

View File

@ -129,6 +129,15 @@
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>

View File

@ -83,6 +83,9 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.label.StandardLabel;
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
@ -231,6 +234,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public static final String ROOT_GROUP_ID_ALIAS = "root";
public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
public static final String PRIMARY_NODE_ROLE_NAME = "primary-node";
private final AtomicInteger maxTimerDrivenThreads;
private final AtomicInteger maxEventDrivenThreads;
@ -277,6 +281,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private ProcessGroup rootGroup;
private final List<Connectable> startConnectablesAfterInitialization;
private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
private final LeaderElectionManager leaderElectionManager;
/**
* true if controller is configured to operate in a clustered environment
@ -327,12 +333,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private boolean clustered;
private String clusterManagerDN;
// guarded by rwLock
/**
* true if controller is the primary of the cluster
*/
private boolean primary;
// guarded by rwLock
/**
* true if connected to a cluster
@ -527,6 +527,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
if (configuredForClustering) {
leaderElectionManager = new CuratorLeaderElectionManager(4);
} else {
leaderElectionManager = null;
}
}
private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
@ -1159,6 +1164,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalStateException("Controller already stopped or still stopping...");
}
if (leaderElectionManager != null) {
leaderElectionManager.stop();
}
if (kill) {
this.timerDrivenEngineRef.get().shutdownNow();
this.eventDrivenEngineRef.get().shutdownNow();
@ -1365,7 +1374,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
} finally {
writeLock.unlock();
}
@ -3119,6 +3128,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// update the bulletin repository
if (isChanging) {
if (clustered) {
leaderElectionManager.register(PRIMARY_NODE_ROLE_NAME, new LeaderElectionStateChangeListener() {
@Override
public void onLeaderElection() {
setPrimary(true);
}
@Override
public void onLeaderRelinquish() {
setPrimary(false);
}
});
leaderElectionManager.start();
stateManagerProvider.enableClusterProvider();
if (zooKeeperStateServer != null) {
@ -3157,6 +3179,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
}
} else {
leaderElectionManager.unregister(PRIMARY_NODE_ROLE_NAME);
if (zooKeeperStateServer != null) {
zooKeeperStateServer.shutdown();
}
@ -3170,7 +3194,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
} finally {
writeLock.unlock();
}
@ -3180,51 +3204,38 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @return true if this instance is the primary node in the cluster; false otherwise
*/
public boolean isPrimary() {
rwLock.readLock().lock();
try {
return primary;
} finally {
rwLock.readLock().unlock();
}
return leaderElectionManager != null && leaderElectionManager.isLeader(PRIMARY_NODE_ROLE_NAME);
}
public void setPrimary(final boolean primary) {
rwLock.writeLock().lock();
try {
// no update, so return
if (this.primary == primary) {
return;
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = getGroup(getRootGroupId());
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
LOG.info("Setting primary flag from '" + this.primary + "' to '" + primary + "'");
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = getGroup(getRootGroupId());
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
}
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
}
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
}
// update primary
this.primary = primary;
eventDrivenWorkerQueue.setPrimary(primary);
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
} finally {
rwLock.writeLock().unlock();
}
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
}
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
}
// update primary
eventDrivenWorkerQueue.setPrimary(primary);
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
// Emit a bulletin detailing the fact that the primary node state has changed
final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
bulletinRepository.addBulletin(bulletin);
}
static boolean areEqual(final String a, final String b) {
@ -3603,7 +3614,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.connected = connected;
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
} finally {
rwLock.writeLock().unlock();
}

View File

@ -55,7 +55,6 @@ import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@ -342,7 +341,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
case RECONNECTION_REQUEST:
case DISCONNECTION_REQUEST:
case FLOW_REQUEST:
case PRIMARY_ROLE:
return true;
default:
return false;
@ -380,14 +378,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
}, "Disconnect from Cluster").start();
return null;
case PRIMARY_ROLE:
new Thread(new Runnable() {
@Override
public void run() {
handlePrimaryRoleAssignment((PrimaryRoleAssignmentMessage) request);
}
}, "Set Primary Role Status").start();
return null;
default:
throw new ProtocolException("Handler cannot handle message type: " + request.getType());
@ -512,14 +502,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
}
private void handlePrimaryRoleAssignment(final PrimaryRoleAssignmentMessage msg) {
writeLock.lock();
try {
controller.setPrimary(msg.isPrimary());
} finally {
writeLock.unlock();
}
}
private void handleReconnectionRequest(final ReconnectionRequestMessage request) {
writeLock.lock();
@ -747,9 +729,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setConnected(true);
// set primary
controller.setPrimary(response.isPrimary());
// start the processors as indicated by the dataflow
controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
@ -862,7 +841,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
private class SaveHolder {
private final Calendar saveTime;
private final boolean shouldArchive;
@ -871,22 +849,4 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
shouldArchive = archive;
}
}
public boolean isPrimary() {
readLock.lock();
try {
return controller.isPrimary();
} finally {
readLock.unlock();
}
}
public void setPrimary(boolean primary) {
writeLock.lock();
try {
controller.setPrimary(primary);
} finally {
writeLock.unlock();
}
}
}

View File

@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.leader.election;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryForever;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.common.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CuratorLeaderElectionManager implements LeaderElectionManager {
private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
private final FlowEngine leaderElectionMonitorEngine;
private final int sessionTimeoutMs;
private final int connectionTimeoutMs;
private final String rootPath;
private final String connectString;
private CuratorFramework curatorClient;
private volatile boolean stopped = true;
private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
private final Map<String, LeaderElectionStateChangeListener> registeredRoles = new HashMap<>();
public CuratorLeaderElectionManager(final int threadPoolSize) {
leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
final NiFiProperties properties = NiFiProperties.getInstance();
connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
if (connectString == null || connectString.trim().isEmpty()) {
throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties");
}
sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
try {
PathUtils.validatePath(rootPath);
} catch (final IllegalArgumentException e) {
throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
}
}
@Override
public synchronized void start() {
if (!stopped) {
return;
}
stopped = false;
final RetryPolicy retryPolicy = new RetryForever(5000);
curatorClient = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
curatorClient.start();
// Call #register for each already-registered role. This will
// cause us to start listening for leader elections for that
// role again
for (final Map.Entry<String, LeaderElectionStateChangeListener> entry : registeredRoles.entrySet()) {
register(entry.getKey(), entry.getValue());
}
logger.info("{} started", this);
}
private int getTimePeriod(final NiFiProperties properties, final String propertyName, final String defaultValue) {
final String timeout = properties.getProperty(propertyName, defaultValue);
try {
return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
logger.warn("Value of '" + propertyName + "' property is set to '" + timeout + "', which is not a valid time period. Using default of " + defaultValue);
return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
}
}
@Override
public synchronized void register(final String roleName) {
register(roleName, null);
}
@Override
public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener) {
logger.debug("{} Registering new Leader Selector for role {}", this, roleName);
if (leaderRoles.containsKey(roleName)) {
logger.warn("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName);
return;
}
final String leaderPath = (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName;
try {
PathUtils.validatePath(rootPath);
} catch (final IllegalArgumentException e) {
throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name");
}
registeredRoles.put(roleName, listener);
if (!isStopped()) {
final ElectionListener electionListener = new ElectionListener(roleName, listener);
final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
leaderSelector.autoRequeue();
leaderSelector.start();
final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener);
leaderRoles.put(roleName, leaderRole);
}
logger.info("{} Registered new Leader Selector for role {}", this, roleName);
}
@Override
public synchronized void unregister(final String roleName) {
registeredRoles.remove(roleName);
final LeaderRole leaderRole = leaderRoles.remove(roleName);
final LeaderSelector leaderSelector = leaderRole.getLeaderSelector();
if (leaderSelector == null) {
logger.warn("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName);
return;
}
leaderSelector.close();
}
@Override
public synchronized void stop() {
stopped = true;
for (final LeaderRole role : leaderRoles.values()) {
final LeaderSelector selector = role.getLeaderSelector();
selector.close();
}
leaderRoles.clear();
if (curatorClient != null) {
curatorClient.close();
curatorClient = null;
}
logger.info("{} stopped and closed", this);
}
@Override
public boolean isStopped() {
return stopped;
}
@Override
public String toString() {
return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]";
}
@Override
public synchronized boolean isLeader(final String roleName) {
final LeaderRole role = leaderRoles.get(roleName);
if (role == null) {
return false;
}
return role.isLeader();
}
private static class LeaderRole {
private final LeaderSelector leaderSelector;
private final ElectionListener electionListener;
public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener) {
this.leaderSelector = leaderSelector;
this.electionListener = electionListener;
}
public LeaderSelector getLeaderSelector() {
return leaderSelector;
}
public boolean isLeader() {
return electionListener.isLeader();
}
}
private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener {
private final String roleName;
private final LeaderElectionStateChangeListener listener;
private volatile boolean leader;
public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) {
this.roleName = roleName;
this.listener = listener;
}
public boolean isLeader() {
return leader;
}
@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
logger.info("{} Connection State changed to {}", this, newState.name());
super.stateChanged(client, newState);
}
@Override
public void takeLeadership(final CuratorFramework client) throws Exception {
leader = true;
logger.info("{} This node has been elected Leader for Role '{}'", this, roleName);
if (listener != null) {
leaderElectionMonitorEngine.submit(new Runnable() {
@Override
public void run() {
listener.onLeaderElection();
}
});
}
// Curator API states that we lose the leadership election when we return from this method,
// so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
try {
while (!isStopped()) {
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {
logger.info("{} has been interrupted; no longer leader for role '{}'", this, roleName);
Thread.currentThread().interrupt();
return;
}
}
} finally {
leader = false;
logger.info("{} This node is no longer leader for role '{}'", this, roleName);
if (listener != null) {
leaderElectionMonitorEngine.submit(new Runnable() {
@Override
public void run() {
listener.onLeaderRelinquish();
}
});
}
}
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.leader.election;
public interface LeaderElectionManager {
/**
* Starts managing leader elections for all registered roles
*/
void start();
/**
* Adds a new role for which a leader is required
*
* @param roleName the name of the role
*/
void register(String roleName);
/**
* Adds a new role for which a leader is required
*
* @param roleName the name of the role
* @param listener a listener that will be called when the node gains or relinquishes
* the role of leader
*/
void register(String roleName, LeaderElectionStateChangeListener listener);
/**
* Removes the role with the given name from this manager. If this
* node is the elected leader for the given role, this node will relinquish
* the leadership role
*
* @param roleName the name of the role to unregister
*/
void unregister(String roleName);
/**
* Returns a boolean value indicating whether or not this node
* is the elected leader for the given role
*
* @param roleName the name of the role
* @return <code>true</code> if the node is the elected leader, <code>false</code> otherwise.
*/
boolean isLeader(String roleName);
/**
* @return <code>true</code> if the manager is stopped, false otherwise.
*/
boolean isStopped();
/**
* Stops managing leader elections and relinquishes the role as leader
* for all registered roles. If the LeaderElectionManager is later started
* again, all previously registered roles will still be registered.
*/
void stop();
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.leader.election;
/**
* Callback interface that can be used to listen for state changes so that the node
* can be notified when it becomes the Elected Leader for a role or is no longer the
* Elected Leader
*/
public interface LeaderElectionStateChangeListener {
/**
* This method is invoked whenever this node is elected leader
*/
void onLeaderElection();
/**
* This method is invoked whenever this node no longer is the elected leader.
*/
void onLeaderRelinquish();
}

View File

@ -168,6 +168,12 @@ nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads}
nifi.cluster.node.unicast.manager.address=${nifi.cluster.node.unicast.manager.address}
nifi.cluster.node.unicast.manager.protocol.port=${nifi.cluster.node.unicast.manager.protocol.port}
# zookeeper properties, used for cluster management #
nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}
nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}
nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout}
nifi.zookeeper.root.node=${nifi.zookeeper.root.node}
# cluster manager properties (only configure for cluster manager) #
nifi.cluster.is.manager=${nifi.cluster.is.manager}
nifi.cluster.manager.address=${nifi.cluster.manager.address}

View File

@ -760,12 +760,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
clusterManager.requestReconnection(nodeDTO.getNodeId(), userDn);
} else if (Node.Status.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
clusterManager.requestDisconnection(nodeDTO.getNodeId(), userDn);
} else {
// handle primary
final Boolean primary = nodeDTO.isPrimary();
if (primary != null && primary) {
clusterManager.setPrimaryNode(nodeDTO.getNodeId(), userDn);
}
}
final String nodeId = nodeDTO.getNodeId();

13
pom.xml
View File

@ -765,6 +765,17 @@ language governing permissions and limitations under the License. -->
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
<!-- Test Dependencies for testing interactions with ZooKeeper -->
<dependency>
@ -779,6 +790,8 @@ language governing permissions and limitations under the License. -->
<version>6.8.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>