NIFI-8196: When node is reconnected to cluster, ensure that it re-registers for election of cluster coordinator / primary node. On startup, if cluster coordinator is already registered and is 'this node' then register silently as coordinator and do not join the cluster until there is no Cluster Coordinator or another node is elected. This allows the zookeeper session timeout to elapse.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2021-02-04 15:06:43 -05:00 committed by Bryan Bende
parent b77dbd5030
commit 03fd59eb2f
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
14 changed files with 201 additions and 60 deletions

View File

@ -98,7 +98,9 @@ the user presses Ctrl-C. At that time, it will initiate shutdown of the applicat
To run NiFi in the background, instead run `bin/nifi.sh start`. This will initiate the application to To run NiFi in the background, instead run `bin/nifi.sh start`. This will initiate the application to
begin running. To check the status and see if NiFi is currently running, execute the command `bin/nifi.sh status`. NiFi can be shutdown by executing the command `bin/nifi.sh stop`. begin running. To check the status and see if NiFi is currently running, execute the command `bin/nifi.sh status`. NiFi can be shutdown by executing the command `bin/nifi.sh stop`.
Issuing `bin/nifi.sh start` executes the `nifi.sh` script that starts NiFi in the background and then exits. If you want `nifi.sh` to wait for NiFi to finish scheduling all components before exiting, use the `--wait-for-init` flag with an optional timeout specified in seconds: `bin/nifi.sh start --wait-for-init 120`. If the timeout is not provided, the default timeout of 360 seconds will be used. Issuing `bin/nifi.sh start` executes the `nifi.sh` script that starts NiFi in the background and then exits. If you want `nifi.sh` to wait for NiFi to finish scheduling all components before
exiting, use the `--wait-for-init` flag with an optional timeout specified in seconds: `bin/nifi.sh start --wait-for-init 120`. If the timeout is not provided, the default timeout of 15 minutes will
be used.
If NiFi was installed with Homebrew, run the commands `nifi start` or `nifi stop` from anywhere in your file system to start or stop NiFi. If NiFi was installed with Homebrew, run the commands `nifi start` or `nifi stop` from anywhere in your file system to start or stop NiFi.

View File

@ -179,7 +179,9 @@ drwxr-xr-x 116 alopresto staff 3.6K Apr 6 15:50 lib/
+ +
image::nifi-home-dir-listing.png["NiFi Home directory contents"] image::nifi-home-dir-listing.png["NiFi Home directory contents"]
. At this point, NiFi can be started with default configuration values (available at link:http://localhost:8080/nifi[`http://localhost:8080/nifi`^]). . At this point, NiFi can be started with default configuration values (available at link:http://localhost:8080/nifi[`http://localhost:8080/nifi`^]).
* `./bin/nifi.sh start` -- Starts NiFi. (It takes some time for NiFi to finish scheduling all components. Issuing `bin/nifi.sh start` executes the `nifi.sh` script that starts NiFi in the background and then exits. If you want `nifi.sh` to wait for NiFi to finish scheduling all components before exiting, use the `--wait-for-init` flag with an optional timeout specified in seconds: `bin/nifi.sh start --wait-for-init 120`. If the timeout is not provided, the default timeout of 360 seconds will be used.) * `./bin/nifi.sh start` -- Starts NiFi. (It takes some time for NiFi to finish scheduling all components. Issuing `bin/nifi.sh start` executes the `nifi.sh` script that starts NiFi in the
background and then exits. If you want `nifi.sh` to wait for NiFi to finish scheduling all components before exiting, use the `--wait-for-init` flag with an optional timeout specified in seconds:
`bin/nifi.sh start --wait-for-init 120`. If the timeout is not provided, the default timeout of 15 minutes will be used.)
* `tail -f logs/nifi-app.log` -- Tails the application log which will indicate when the application has started * `tail -f logs/nifi-app.log` -- Tails the application log which will indicate when the application has started
+ +
image::nifi-app-log-ui-available.png["NiFi application log listing available URLS"] image::nifi-app-log-ui-available.png["NiFi application log listing available URLS"]

View File

@ -17,10 +17,6 @@
package org.apache.nifi.cluster.protocol; package org.apache.nifi.cluster.protocol;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
@ -31,8 +27,16 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.io.socket.SocketConfiguration; import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils; import org.apache.nifi.io.socket.SocketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Objects;
public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
private static final Logger logger = LoggerFactory.getLogger(AbstractNodeProtocolSender.class);
private final SocketConfiguration socketConfiguration; private final SocketConfiguration socketConfiguration;
private final ProtocolContext<ProtocolMessage> protocolContext; private final ProtocolContext<ProtocolMessage> protocolContext;
@ -42,10 +46,24 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
} }
@Override @Override
public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException { public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg, final boolean allowConnectToSelf) throws ProtocolException, UnknownServiceAddressException {
Socket socket = null; Socket socket = null;
try { try {
socket = createSocket(); final InetSocketAddress socketAddress;
try {
socketAddress = getServiceAddress();
} catch (final IOException e) {
throw new ProtocolException("Could not determined address of Cluster Coordinator", e);
}
// If node is not allowed to connect to itself, then we need to check the address of the Cluster Coordinator.
// If the Cluster Coordinator is currently set to this node, then we will throw an UnknownServiceAddressException
if (!allowConnectToSelf) {
validateNotConnectingToSelf(msg, socketAddress);
}
logger.info("Cluster Coordinator is located at {}. Will send Cluster Connection Request to this address", socketAddress);
socket = createSocket(socketAddress);
try { try {
// marshal message to output stream // marshal message to output stream
@ -76,6 +94,21 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
} }
} }
private void validateNotConnectingToSelf(final ConnectionRequestMessage msg, final InetSocketAddress socketAddress) {
final NodeIdentifier localNodeIdentifier = msg.getConnectionRequest().getProposedNodeIdentifier();
if (localNodeIdentifier == null) {
return;
}
final String localAddress = localNodeIdentifier.getSocketAddress();
final int localPort = localNodeIdentifier.getSocketPort();
if (Objects.equals(localAddress, socketAddress.getHostString()) && localPort == socketAddress.getPort()) {
throw new UnknownServiceAddressException("Cluster Coordinator is currently " + socketAddress.getHostString() + ":" + socketAddress.getPort() + ", which is this node, but " +
"connecting to self is not allowed at this phase of the lifecycle. This node must wait for a new Cluster Coordinator to be elected before connecting to the cluster.");
}
}
@Override @Override
public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException { public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
final String hostname; final String hostname;
@ -112,11 +145,9 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
throw new ProtocolException("Expected message type '" + MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + responseMessage.getType() + "'"); throw new ProtocolException("Expected message type '" + MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + responseMessage.getType() + "'");
} }
private Socket createSocket() { private Socket createSocket(final InetSocketAddress socketAddress) {
InetSocketAddress socketAddress = null;
try { try {
// create a socket // create a socket
socketAddress = getServiceAddress();
return SocketUtils.createSocket(socketAddress, socketConfiguration); return SocketUtils.createSocket(socketAddress, socketConfiguration);
} catch (final IOException ioe) { } catch (final IOException ioe) {
if (socketAddress == null) { if (socketAddress == null) {

View File

@ -34,12 +34,13 @@ public interface NodeProtocolSender {
* Sends a "connection request" message to the cluster manager. * Sends a "connection request" message to the cluster manager.
* *
* @param msg a message * @param msg a message
* @param allowConnectionToSelf whether or not the node should be allowed to connect to the cluster if the Cluster Coordinator is this node
* @return the response * @return the response
* @throws UnknownServiceAddressException if the cluster manager's address * @throws UnknownServiceAddressException if the cluster manager's address
* is not known * is not known
* @throws ProtocolException if communication failed * @throws ProtocolException if communication failed
*/ */
ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException; ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg, boolean allowConnectionToSelf) throws ProtocolException, UnknownServiceAddressException;
/** /**
* Sends a heartbeat to the address given * Sends a heartbeat to the address given

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.nifi.cluster.protocol.impl; package org.apache.nifi.cluster.protocol.impl;
import java.io.IOException;
import java.util.Collection;
import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolHandler;
@ -32,6 +29,9 @@ import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.BulletinRepository;
import java.io.IOException;
import java.util.Collection;
public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener { public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener {
private final NodeProtocolSender sender; private final NodeProtocolSender sender;
private final ProtocolListener listener; private final ProtocolListener listener;
@ -85,8 +85,8 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
} }
@Override @Override
public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException { public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg, boolean allowConnectToSelf) throws ProtocolException, UnknownServiceAddressException {
return sender.requestConnection(msg); return sender.requestConnection(msg, allowConnectToSelf);
} }
@Override @Override

View File

@ -82,6 +82,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -553,7 +554,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override @Override
public void disconnectionRequestedByNode(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { public void disconnectionRequestedByNode(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
logger.info("{} requested disconnection from cluster due to {}", nodeId, explanation == null ? disconnectionCode : explanation); logger.info("{} requested disconnection from cluster due to {}", nodeId, explanation == null ? disconnectionCode : explanation);
updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation)); updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation), false);
final Severity severity; final Severity severity;
switch (disconnectionCode) { switch (disconnectionCode) {
@ -839,7 +840,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// about a node status from a different node, since those may be received out-of-order. // about a node status from a different node, since those may be received out-of-order.
final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status); final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status);
final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState(); final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState();
if (Objects.equals(status, currentStatus)) {
logger.debug("Received notification of Node Status Change for {} but the status remained the same: {}", nodeId, status);
} else {
logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, status); logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, status);
}
logger.debug("State of cluster nodes is now {}", nodeStatuses); logger.debug("State of cluster nodes is now {}", nodeStatuses);
latestUpdateId.updateAndGet(curVal -> Math.max(curVal, status.getUpdateIdentifier())); latestUpdateId.updateAndGet(curVal -> Math.max(curVal, status.getUpdateIdentifier()));

View File

@ -2169,6 +2169,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured."); throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured.");
} }
LOG.info("Will no longer send heartbeats");
writeLock.lock(); writeLock.lock();
try { try {
if (!isHeartbeating()) { if (!isHeartbeating()) {
@ -2348,12 +2350,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// update the bulletin repository // update the bulletin repository
if (isChanging) { if (isChanging) {
if (clustered) { if (clustered) {
registerForPrimaryNode(); onClusterConnect();
// Participate in Leader Election for Heartbeat Monitor. Start the heartbeat monitor
// if/when we become leader and stop it when we lose leader role
registerForClusterCoordinator(true);
leaderElectionManager.start(); leaderElectionManager.start();
stateManagerProvider.enableClusterProvider(); stateManagerProvider.enableClusterProvider();
@ -2382,6 +2379,16 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
} }
} }
public void onClusterConnect() {
registerForPrimaryNode();
// Participate in Leader Election for Heartbeat Monitor. Start the heartbeat monitor
// if/when we become leader and stop it when we lose leader role
registerForClusterCoordinator(true);
resumeHeartbeats();
}
public void onClusterDisconnect() { public void onClusterDisconnect() {
leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE); leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR); leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);

View File

@ -23,6 +23,7 @@ import org.apache.nifi.authorization.ManagedAuthorizer;
import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ConnectionException; import org.apache.nifi.cluster.ConnectionException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@ -652,13 +653,22 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
connectionResponse = connect(false, false, createDataFlowFromController()); connectionResponse = connect(false, false, createDataFlowFromController());
} }
if (connectionResponse == null) {
// If we could not communicate with the cluster, just log a warning and return.
// If the node is currently in a CONNECTING state, it will continue to heartbeat, and that will continue to
// result in attempting to connect to the cluster.
logger.warn("Received a Reconnection Request that contained no DataFlow, and was unable to communicate with an active Cluster Coordinator. Cannot connect to cluster at this time.");
controller.resumeHeartbeats();
return;
}
loadFromConnectionResponse(connectionResponse); loadFromConnectionResponse(connectionResponse);
clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream() clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream()
.collect(Collectors.toMap(NodeConnectionStatus::getNodeIdentifier, status -> status))); .collect(Collectors.toMap(NodeConnectionStatus::getNodeIdentifier, status -> status)));
// reconnected, this node needs to explicitly write the inherited flow to disk, and resume heartbeats // reconnected, this node needs to explicitly write the inherited flow to disk, and resume heartbeats
saveFlowChanges(); saveFlowChanges();
controller.resumeHeartbeats(); controller.onClusterConnect();
logger.info("Node reconnected."); logger.info("Node reconnected.");
} catch (final Exception ex) { } catch (final Exception ex) {
@ -871,8 +881,25 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
try { try {
logger.info("Connecting Node: " + nodeId); logger.info("Connecting Node: " + nodeId);
// Upon NiFi startup, the node will register for the Cluster Coordinator role with the Leader Election Manager.
// Sometimes the node will register as an active participant, meaning that it wants to be elected. This happens when the entire cluster starts up,
// for example. (This is determined by checking whether or not there already is a Cluster Coordinator registered).
// Other times, it registers as a 'silent' member, meaning that it will not be elected.
// If the leader election timeout is long (say 30 or 60 seconds), it is possible that this node was the Leader and was then restarted,
// and upon restart found that itself was already registered as the Cluster Coordinator. As a result, it registers as a Silent member of the
// election, and then connects to itself as the Cluster Coordinator. At this point, since the node has just restarted, it doesn't know about
// any of the nodes in the cluster. As a result, it will get the Cluster Topology from itself, and think there are no other nodes in the cluster.
// This causes all other nodes to send in their heartbeats, which then results in them being disconnected because they were previously unknown and
// as a result asked to reconnect to the cluster.
//
// To avoid this, we do not allow the node to connect to itself if it's not an active participant. This means that when the entire cluster is started
// up, the node can still connect to itself because it will be an active participant. But if it is then restarted, it won't be allowed to connect
// to itself. It will instead have to wait until another node is elected Cluster Coordinator.
final boolean activeCoordinatorParticipant = controller.getLeaderElectionManager().isActiveParticipant(ClusterRoles.CLUSTER_COORDINATOR);
// create connection request message // create connection request message
final ConnectionRequest request = new ConnectionRequest(nodeId, dataFlow); final ConnectionRequest request = new ConnectionRequest(nodeId, dataFlow);
final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage(); final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
requestMsg.setConnectionRequest(request); requestMsg.setConnectionRequest(request);
@ -890,7 +917,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
ConnectionResponse response = null; ConnectionResponse response = null;
for (int i = 0; i < maxAttempts || retryIndefinitely; i++) { for (int i = 0; i < maxAttempts || retryIndefinitely; i++) {
try { try {
response = senderListener.requestConnection(requestMsg).getConnectionResponse(); response = senderListener.requestConnection(requestMsg, activeCoordinatorParticipant).getConnectionResponse();
if (response.shouldTryLater()) { if (response.shouldTryLater()) {
logger.info("Requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds with explanation: " + response.getRejectionReason()); logger.info("Requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds with explanation: " + response.getRejectionReason());
@ -907,6 +934,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
response = null; response = null;
break; break;
} else { } else {
logger.info("Received successful response from Cluster Coordinator to Connection Request");
// we received a successful connection response from cluster coordinator // we received a successful connection response from cluster coordinator
break; break;
} }

View File

@ -96,6 +96,17 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
logger.info("{} started", this); logger.info("{} started", this);
} }
@Override
public boolean isActiveParticipant(final String roleName) {
final RegisteredRole role = registeredRoles.get(roleName);
if (role == null) {
return false;
}
final String participantId = role.getParticipantId();
return participantId != null;
}
@Override @Override
public void register(String roleName, LeaderElectionStateChangeListener listener) { public void register(String roleName, LeaderElectionStateChangeListener listener) {
register(roleName, listener, null); register(roleName, listener, null);
@ -158,16 +169,18 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
final LeaderRole leaderRole = leaderRoles.remove(roleName); final LeaderRole leaderRole = leaderRoles.remove(roleName);
if (leaderRole == null) { if (leaderRole == null) {
logger.info("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName); logger.info("Cannot unregister Leader Election Role '{}' because that role is not registered", roleName);
return; return;
} }
final LeaderSelector leaderSelector = leaderRole.getLeaderSelector(); final LeaderSelector leaderSelector = leaderRole.getLeaderSelector();
if (leaderSelector == null) { if (leaderSelector == null) {
logger.info("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName); logger.info("Cannot unregister Leader Election Role '{}' because that role is not registered", roleName);
return; return;
} }
leaderRole.getElectionListener().disable();
leaderSelector.close(); leaderSelector.close();
logger.info("This node is no longer registered to be elected as the Leader for Role '{}'", roleName); logger.info("This node is no longer registered to be elected as the Leader for Role '{}'", roleName);
} }
@ -233,8 +246,15 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
@Override @Override
public boolean isLeader(final String roleName) { public boolean isLeader(final String roleName) {
final boolean activeParticipant = isActiveParticipant(roleName);
if (!activeParticipant) {
logger.debug("Node is not an active participant in election for role {} so cannot be leader", roleName);
return false;
}
final LeaderRole role = getLeaderRole(roleName); final LeaderRole role = getLeaderRole(roleName);
if (role == null) { if (role == null) {
logger.debug("Node is an active participant in election for role {} but there is no LeaderRole registered so this node cannot be leader", roleName);
return false; return false;
} }
@ -423,6 +443,10 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return leaderSelector; return leaderSelector;
} }
public ElectionListener getElectionListener() {
return electionListener;
}
public boolean isLeader() { public boolean isLeader() {
return electionListener.isLeader(); return electionListener.isLeader();
} }
@ -458,6 +482,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
private final String participantId; private final String participantId;
private volatile boolean leader; private volatile boolean leader;
private volatile Thread leaderThread;
private long leaderUpdateTimestamp = 0L; private long leaderUpdateTimestamp = 0L;
private final long MAX_CACHE_MILLIS = TimeUnit.SECONDS.toMillis(5L); private final long MAX_CACHE_MILLIS = TimeUnit.SECONDS.toMillis(5L);
@ -467,6 +492,17 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
this.participantId = participantId; this.participantId = participantId;
} }
public void disable() {
logger.info("Election Listener for Role {} disabled", roleName);
setLeader(false);
if (leaderThread == null) {
logger.debug("Election Listener for Role {} disabled but there is no leader thread. Will not interrupt any threads.", roleName);
} else {
leaderThread.interrupt();
}
}
public synchronized boolean isLeader() { public synchronized boolean isLeader() {
if (leaderUpdateTimestamp < System.currentTimeMillis() - MAX_CACHE_MILLIS) { if (leaderUpdateTimestamp < System.currentTimeMillis() - MAX_CACHE_MILLIS) {
try { try {
@ -483,6 +519,8 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return false; return false;
} }
} else {
logger.debug("Checking if this node is leader for role {}: using cached response, returning {}", roleName, leader);
} }
return leader; return leader;
@ -531,6 +569,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
@Override @Override
public void takeLeadership(final CuratorFramework client) throws Exception { public void takeLeadership(final CuratorFramework client) throws Exception {
leaderThread = Thread.currentThread();
setLeader(true); setLeader(true);
logger.info("{} This node has been elected Leader for Role '{}'", this, roleName); logger.info("{} This node has been elected Leader for Role '{}'", this, roleName);

View File

@ -49,6 +49,13 @@ public interface LeaderElectionManager {
*/ */
void register(String roleName, LeaderElectionStateChangeListener listener, String participantId); void register(String roleName, LeaderElectionStateChangeListener listener, String participantId);
/**
* Indicates whether or not this node is an active participant in the election for the given role
* @param roleName the name of the role
* @return <code>true</code> if this node is an active participant in the election (is allowed to be elected) or <code>false</code> otherwise
*/
boolean isActiveParticipant(String roleName);
/** /**
* Returns the Participant ID of the node that is elected the leader, if one was provided when the node registered * Returns the Participant ID of the node that is elected the leader, if one was provided when the node registered
* for the role via {@link #register(String, LeaderElectionStateChangeListener, String)}. If there is currently no leader * for the role via {@link #register(String, LeaderElectionStateChangeListener, String)}. If there is currently no leader

View File

@ -40,6 +40,11 @@ public class StandaloneLeaderElectionManager implements LeaderElectionManager {
public void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) { public void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
} }
@Override
public boolean isActiveParticipant(final String roleName) {
return true;
}
@Override @Override
public String getLeader(final String roleName) { public String getLeader(final String roleName) {
return null; return null;

View File

@ -236,8 +236,8 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
return zooKeeperClientConfig; return zooKeeperClientConfig;
} else { } else {
Properties stateProviderProperties = new Properties(); Properties stateProviderProperties = new Properties();
stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, String.valueOf(timeoutMillis)); stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, timeoutMillis + " millis");
stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, String.valueOf(timeoutMillis)); stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, timeoutMillis + " millis");
stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, rootNode); stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, rootNode);
stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connectionString); stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connectionString);
zooKeeperClientConfig = ZooKeeperClientConfig.createConfig(combineProperties(nifiProperties, stateProviderProperties)); zooKeeperClientConfig = ZooKeeperClientConfig.createConfig(combineProperties(nifiProperties, stateProviderProperties));

View File

@ -16,25 +16,6 @@
*/ */
package org.apache.nifi.tests.system.clustering; package org.apache.nifi.tests.system.clustering;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.zip.GZIPInputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.nifi.controller.serialization.FlowEncodingVersion; import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.encrypt.StringEncryptor;
@ -69,6 +50,27 @@ import org.w3c.dom.Element;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.zip.GZIPInputStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class JoinClusterWithDifferentFlow extends NiFiSystemIT { public class JoinClusterWithDifferentFlow extends NiFiSystemIT {
@Override @Override
protected NiFiInstanceFactory getInstanceFactory() { protected NiFiInstanceFactory getInstanceFactory() {
@ -104,10 +106,18 @@ public class JoinClusterWithDifferentFlow extends NiFiSystemIT {
} }
private File getBackupFile(final File confDir) { private File getBackupFile(final File confDir) throws InterruptedException {
final File[] flowXmlFileArray = confDir.listFiles(file -> file.getName().startsWith("flow") && file.getName().endsWith(".xml.gz")); final FileFilter fileFilter = file -> file.getName().startsWith("flow") && file.getName().endsWith(".xml.gz");
waitFor(() -> {
final File[] flowXmlFileArray = confDir.listFiles(fileFilter);
return flowXmlFileArray != null && flowXmlFileArray.length == 2;
});
final File[] flowXmlFileArray = confDir.listFiles(fileFilter);
final List<File> flowXmlFiles = new ArrayList<>(Arrays.asList(flowXmlFileArray)); final List<File> flowXmlFiles = new ArrayList<>(Arrays.asList(flowXmlFileArray));
assertEquals(2, flowXmlFiles.size()); assertEquals(2, flowXmlFiles.size());
flowXmlFiles.removeIf(file -> file.getName().equals("flow.xml.gz")); flowXmlFiles.removeIf(file -> file.getName().equals("flow.xml.gz"));
assertEquals(1, flowXmlFiles.size()); assertEquals(1, flowXmlFiles.size());

View File

@ -50,7 +50,6 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class ParameterContextIT extends NiFiSystemIT { public class ParameterContextIT extends NiFiSystemIT {
@Test @Test
public void testCreateParameterContext() throws NiFiClientException, IOException { public void testCreateParameterContext() throws NiFiClientException, IOException {
final Set<ParameterEntity> parameterEntities = new HashSet<>(); final Set<ParameterEntity> parameterEntities = new HashSet<>();
@ -175,13 +174,17 @@ public class ParameterContextIT extends NiFiSystemIT {
setParameterContext("root", createdContextEntity); setParameterContext("root", createdContextEntity);
waitForValidProcessor(processorId); waitForValidProcessor(processorId);
// Create a Parameter that sets the 'foo' value to null and denote that the parameter's value should be explicitly removed.
final ParameterEntity fooNull = createParameterEntity("foo", null, false, null); final ParameterEntity fooNull = createParameterEntity("foo", null, false, null);
fooNull.getParameter().setValueRemoved(true);
createdContextEntity.getComponent().setParameters(Collections.singleton(fooNull)); createdContextEntity.getComponent().setParameters(Collections.singleton(fooNull));
getNifiClient().getParamContextClient().updateParamContext(createdContextEntity); getNifiClient().getParamContextClient().updateParamContext(createdContextEntity);
// Should become invalid because property is required and has no default // Should become invalid because property is required and has no default
waitForInvalidProcessor(processorId); waitForInvalidProcessor(processorId);
} }
@Test @Test
public void testParametersReferencingEL() throws NiFiClientException, IOException, InterruptedException { public void testParametersReferencingEL() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");