NIFI-1966: When cluster is started up, do not assume that Cluster Coordinator has the golden copy of the flow but instead wait for some period of time or until the required number of nodes have connected, and then choose which flow is correct. This closes #977

This commit is contained in:
Mark Payne 2016-08-30 16:59:50 -04:00 committed by Matt Gilman
parent 7a451935a5
commit a7e76cc00a
34 changed files with 1054 additions and 195 deletions

View File

@ -169,6 +169,8 @@ public abstract class NiFiProperties {
public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout";
public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout";
public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file";
public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time";
public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates";
// zookeeper properties
public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string";
@ -239,6 +241,7 @@ public abstract class NiFiProperties {
// cluster node defaults
public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2;
public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs";
public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins";
// state management defaults
public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = "conf/state-management.xml";
@ -309,12 +312,12 @@ public abstract class NiFiProperties {
public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) {
final String value = getProperty(propertyName);
if (value == null) {
if (value == null || value.trim().isEmpty()) {
return defaultValue;
}
try {
return Integer.parseInt(getProperty(propertyName));
return Integer.parseInt(value.trim());
} catch (final Exception e) {
return defaultValue;
}
@ -935,6 +938,14 @@ public abstract class NiFiProperties {
return getProperty(FLOW_CONFIGURATION_ARCHIVE_DIR);
}
public String getFlowElectionMaxWaitTime() {
return getProperty(FLOW_ELECTION_MAX_WAIT_TIME, DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME);
}
public Integer getFlowElectionMaxCandidates() {
return getIntegerProperty(FLOW_ELECTION_MAX_CANDIDATES, null);
}
public String getFlowConfigurationArchiveMaxTime() {
return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_TIME, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME);
}

View File

@ -1171,6 +1171,20 @@ There are cases where a DFM may wish to continue making changes to the flow, eve
In this case, they DFM may elect to remove the node from the cluster entirely through the Cluster Management dialog. Once removed,
the node cannot be rejoined to the cluster until it has been restarted.
*Flow Election* +
When a cluster first starts up, NiFi must determine which of the nodes have the
"correct" version of the flow. This is done by voting on the flows that each of the nodes has. When a node
attempts to connect to a cluster, it provides a copy of its local flow to the Cluster Coordinator. If no flow
has yet been elected the "correct" flow, the node's flow is compared to each of the other Nodes' flows. If another
Node's flow matches this one, a vote is cast for this flow. If no other Node has reported the same flow yet, this
flow will be added to the pool of possibly elected flows with one vote. After
some amount of time has elapsed (configured by setting the `nifi.cluster.flow.election.max.wait.time` property) or
some number of Nodes have cast votes (configured by setting the `nifi.cluster.flow.election.max.candidates` property),
a flow is elected to be the "correct" copy of the flow. All nodes that have incompatible flows are then disconnected
from the cluster while those with compatible flows inherit the cluster's flow. Election is performed according to
the "popular vote" with the caveat that the winner will never be an "empty flow" unless all flows are empty. This
allows an administrator to remove a node's `flow.xml.gz` file and restart the node, knowing that the node's flow will
not be voted to be the "correct" flow unless no other flow is found.
*Basic Cluster Setup* +
@ -1204,8 +1218,13 @@ For each Node, the minimum properties to configure are as follows:
that should be used for storing data. The default value is _/root_. This is important to set correctly, as which cluster
the NiFi instance attempts to join is determined by which ZooKeeper instance it connects to and the ZooKeeper Root Node
that is specified.
** nifi.cluster.request.replication.claim.timeout - Specifies how long a component can be 'locked' during a request replication
before the lock expires and is automatically unlocked. See <<claim_management>> for more information.
** nifi.cluster.flow.election.max.wait.time - Specifies the amount of time to wait before electing a Flow as the "correct" Flow.
If the number of Nodes that have voted is equal to the number specified by the `nifi.cluster.flow.election.max.candidates`
property, the cluster will not wait this long. The default is 5 minutes. Note that the time starts as soon as the first vote
is cast.
** nifi.cluster.flow.election.max.candidates - Specifies the number of Nodes required in the cluster to cause early election
of Flows. This allows the Nodes in the cluster to avoid having to wait a long time before starting processing if we reach
at least this number of nodes in the cluster.
Now, it is possible to start up the cluster. It does not matter which order the instances start up. Navigate to the URL for
one of the nodes, and the User Interface should look similar to the following:

View File

@ -226,4 +226,14 @@ public interface ClusterCoordinator {
* @return <code>true</code> if connected, <code>false</code> otherwise
*/
boolean isConnected();
/**
* @return <code>true</code> if Flow Election is complete, <code>false</code> otherwise
*/
boolean isFlowElectionComplete();
/**
* @return the current status of Flow Election.
*/
String getFlowElectionStatus();
}

View File

@ -28,16 +28,22 @@ import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionRequestAdapter;
public class ConnectionRequest {
private final NodeIdentifier proposedNodeIdentifier;
private final DataFlow dataFlow;
public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier) {
public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier, final DataFlow dataFlow) {
if (proposedNodeIdentifier == null) {
throw new IllegalArgumentException("Proposed node identifier may not be null.");
}
this.proposedNodeIdentifier = proposedNodeIdentifier;
this.dataFlow = dataFlow;
}
public NodeIdentifier getProposedNodeIdentifier() {
return proposedNodeIdentifier;
}
public DataFlow getDataFlow() {
return dataFlow;
}
}

View File

@ -49,9 +49,8 @@ public class ConnectionResponse {
if (nodeIdentifier == null) {
throw new IllegalArgumentException("Node identifier may not be empty or null.");
} else if (dataFlow == null) {
throw new IllegalArgumentException("DataFlow may not be null.");
}
this.nodeIdentifier = nodeIdentifier;
this.dataFlow = dataFlow;
this.tryLaterSeconds = 0;
@ -61,14 +60,14 @@ public class ConnectionResponse {
this.componentRevisions = componentRevisions == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(componentRevisions));
}
public ConnectionResponse(final int tryLaterSeconds) {
public ConnectionResponse(final int tryLaterSeconds, final String explanation) {
if (tryLaterSeconds <= 0) {
throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds);
throw new IllegalArgumentException("Try-Later seconds must be nonnegative: " + tryLaterSeconds);
}
this.dataFlow = null;
this.nodeIdentifier = null;
this.tryLaterSeconds = tryLaterSeconds;
this.rejectionReason = null;
this.rejectionReason = explanation;
this.instanceId = null;
this.nodeStatuses = null;
this.componentRevisions = null;
@ -120,7 +119,6 @@ public class ConnectionResponse {
return instanceId;
}
public List<NodeConnectionStatus> getNodeConnectionStatuses() {
return nodeStatuses;
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.cluster.protocol.jaxb.message;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
/**
@ -24,6 +26,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
public class AdaptedConnectionRequest {
private NodeIdentifier nodeIdentifier;
private DataFlow dataFlow;
public AdaptedConnectionRequest() {
}
@ -37,4 +40,12 @@ public class AdaptedConnectionRequest {
this.nodeIdentifier = nodeIdentifier;
}
@XmlJavaTypeAdapter(DataFlowAdapter.class)
public DataFlow getDataFlow() {
return dataFlow;
}
public void setDataFlow(final DataFlow dataFlow) {
this.dataFlow = dataFlow;
}
}

View File

@ -28,13 +28,14 @@ public class ConnectionRequestAdapter extends XmlAdapter<AdaptedConnectionReques
final AdaptedConnectionRequest aCr = new AdaptedConnectionRequest();
if (cr != null) {
aCr.setNodeIdentifier(cr.getProposedNodeIdentifier());
aCr.setDataFlow(cr.getDataFlow());
}
return aCr;
}
@Override
public ConnectionRequest unmarshal(final AdaptedConnectionRequest aCr) {
return new ConnectionRequest(aCr.getNodeIdentifier());
return new ConnectionRequest(aCr.getNodeIdentifier(), aCr.getDataFlow());
}
}

View File

@ -41,7 +41,7 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
@Override
public ConnectionResponse unmarshal(final AdaptedConnectionResponse aCr) {
if (aCr.shouldTryLater()) {
return new ConnectionResponse(aCr.getTryLaterSeconds());
return new ConnectionResponse(aCr.getTryLaterSeconds(), aCr.getRejectionReason());
} else if (aCr.getRejectionReason() != null) {
return ConnectionResponse.createRejectionResponse(aCr.getRejectionReason());
} else {

View File

@ -28,7 +28,7 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
public class HeartbeatResponseMessage extends ProtocolMessage {
private List<NodeConnectionStatus> updatedNodeStatuses = new ArrayList<>();
private String flowElectionMessage = null;
@Override
public MessageType getType() {
@ -42,4 +42,12 @@ public class HeartbeatResponseMessage extends ProtocolMessage {
public void setUpdatedNodeStatuses(final List<NodeConnectionStatus> nodeStatuses) {
this.updatedNodeStatuses = new ArrayList<>(nodeStatuses);
}
public String getFlowElectionMessage() {
return flowElectionMessage;
}
public void setFlowElectionMessage(String flowElectionMessage) {
this.flowElectionMessage = flowElectionMessage;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.flow;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
/**
* <p>
* A FlowElection is responsible for examining multiple versions of a dataflow and determining which of
* the versions is the "correct" version of the flow.
* </p>
*/
public interface FlowElection {
/**
* Checks if the election has completed or not.
*
* @return <code>true</code> if the election has completed, <code>false</code> otherwise.
*/
boolean isElectionComplete();
/**
* Returns <code>true</code> if a vote has already been counted for the given Node Identifier, <code>false</code> otherwise.
*
* @param nodeIdentifier the identifier of the node
* @return <code>true</code> if a vote has already been counted for the given Node Identifier, <code>false</code> otherwise.
*/
boolean isVoteCounted(NodeIdentifier nodeIdentifier);
/**
* If the election has not yet completed, adds the given DataFlow to the list of candidates
* (if it is not already in the running) and increments the number of votes for this DataFlow by 1.
* If the election has completed, the given candidate is ignored, and the already-elected DataFlow
* will be returned. If the election has not yet completed, a vote will be cast for the given
* candidate and <code>null</code> will be returned, signifying that no candidate has yet been chosen.
*
* @param candidate the DataFlow to vote for and add to the pool of candidates if not already present
* @param nodeIdentifier the identifier of the node casting the vote
*
* @return the elected {@link DataFlow}, or <code>null</code> if no DataFlow has yet been elected
*/
DataFlow castVote(DataFlow candidate, NodeIdentifier nodeIdentifier);
/**
* Returns the DataFlow that has been elected as the "correct" version of the flow, or <code>null</code>
* if the election has not yet completed.
*
* @return the DataFlow that has been elected as the "correct" version of the flow, or <code>null</code>
* if the election has not yet completed.
*/
DataFlow getElectedDataFlow();
/**
* Returns a human-readable description of the status of the election
*
* @return a human-readable description of the status of the election
*/
String getStatusDescription();
}

View File

@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.flow;
import static java.util.Objects.requireNonNull;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.StandardFlowSynchronizer;
import org.apache.nifi.fingerprint.FingerprintFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* An implementation of {@link FlowElection} that waits until either a maximum amount of time has elapsed
* or a maximum number of Data Flows have entered the running to be elected, and then elects the 'winner'
* based on the number of 'votes' that a particular DataFlow has received. This implementation considers
* two Flows with the same fingerprint to be the same Flow. If there is a tie in the number of votes for
* a particular DataFlow, one will be chosen in a non-deterministic manner. If multiple DataFlows are
* presented with the same fingerprint but different Flows (for instance, the position of a component has
* changed), one of the Flows with that fingerprint will be chosen in a non-deterministic manner.
* </p>
*/
public class PopularVoteFlowElection implements FlowElection {
private static final Logger logger = LoggerFactory.getLogger(PopularVoteFlowElection.class);
private final long maxWaitNanos;
private final Integer maxNodes;
private final FingerprintFactory fingerprintFactory;
private volatile Long startNanos = null;
private volatile DataFlow electedDataFlow = null;
private final Map<String, FlowCandidate> candidateByFingerprint = new HashMap<>();
public PopularVoteFlowElection(final long maxWait, final TimeUnit maxWaitPeriod, final Integer maxNodes, final FingerprintFactory fingerprintFactory) {
this.maxWaitNanos = maxWaitPeriod.toNanos(maxWait);
if (maxWaitNanos < 1) {
throw new IllegalArgumentException("Maximum wait time to elect Cluster Flow cannot be less than 1 nanosecond");
}
this.maxNodes = maxNodes;
if (maxNodes != null && maxNodes < 1) {
throw new IllegalArgumentException("Maximum number of nodes to wait on before electing Cluster Flow cannot be less than 1");
}
this.fingerprintFactory = requireNonNull(fingerprintFactory);
}
@Override
public synchronized boolean isElectionComplete() {
if (electedDataFlow != null) {
return true;
}
if (startNanos == null) {
return false;
}
final long nanosSinceStart = System.nanoTime() - startNanos;
if (nanosSinceStart > maxWaitNanos) {
final FlowCandidate elected = performElection();
logger.info("Election is complete because the maximum allowed time has elapsed. "
+ "The elected dataflow is held by the following nodes: {}", elected.getNodes());
return true;
} else if (maxNodes != null) {
final int numVotes = getVoteCount();
if (numVotes >= maxNodes) {
final FlowCandidate elected = performElection();
logger.info("Election is complete because the required number of nodes ({}) have voted. "
+ "The elected dataflow is held by the following nodes: {}", maxNodes, elected.getNodes());
return true;
}
}
return false;
}
@Override
public boolean isVoteCounted(final NodeIdentifier nodeIdentifier) {
return candidateByFingerprint.values().stream()
.anyMatch(candidate -> candidate.getNodes().contains(nodeIdentifier));
}
private synchronized int getVoteCount() {
return candidateByFingerprint.values().stream().mapToInt(candidate -> candidate.getVotes()).sum();
}
@Override
public synchronized DataFlow castVote(final DataFlow candidate, final NodeIdentifier nodeId) {
if (candidate == null || isElectionComplete()) {
return getElectedDataFlow();
}
final String fingerprint = fingerprint(candidate);
final FlowCandidate flowCandidate = candidateByFingerprint.computeIfAbsent(fingerprint, key -> new FlowCandidate(candidate));
final boolean voteCast = flowCandidate.vote(nodeId);
if (startNanos == null) {
startNanos = System.nanoTime();
}
if (voteCast) {
logger.info("Vote cast by {}; this flow now has {} votes", nodeId, flowCandidate.getVotes());
}
if (isElectionComplete()) {
return getElectedDataFlow();
}
return null; // no elected candidate so return null
}
private String fingerprint(final DataFlow dataFlow) {
final String flowFingerprint = fingerprintFactory.createFingerprint(dataFlow.getFlow());
final String authFingerprint = dataFlow.getAuthorizerFingerprint() == null ? "" : new String(dataFlow.getAuthorizerFingerprint(), StandardCharsets.UTF_8);
final String candidateFingerprint = flowFingerprint + authFingerprint;
return candidateFingerprint;
}
@Override
public DataFlow getElectedDataFlow() {
return electedDataFlow;
}
private FlowCandidate performElection() {
if (candidateByFingerprint.isEmpty()) {
return null;
}
final FlowCandidate elected;
if (candidateByFingerprint.size() == 1) {
elected = candidateByFingerprint.values().iterator().next();
} else {
elected = candidateByFingerprint.values().stream()
.filter(candidate -> !candidate.isFlowEmpty()) // We have more than 1 fingerprint. Do not consider empty flows.
.max((candidate1, candidate2) -> Integer.compare(candidate1.getVotes(), candidate2.getVotes()))
.get();
}
this.electedDataFlow = elected.getDataFlow();
return elected;
}
@Override
public synchronized String getStatusDescription() {
if (startNanos == null) {
return "No votes have yet been cast.";
}
final StringBuilder descriptionBuilder = new StringBuilder("Election will complete in ");
final long nanosElapsed = System.nanoTime() - startNanos;
final long nanosLeft = maxWaitNanos - nanosElapsed;
final long secsLeft = TimeUnit.NANOSECONDS.toSeconds(nanosLeft);
if (secsLeft < 1) {
descriptionBuilder.append("less than 1 second");
} else {
descriptionBuilder.append(secsLeft).append(" seconds");
}
if (maxNodes != null) {
final int votesNeeded = maxNodes.intValue() - getVoteCount();
descriptionBuilder.append(" or after ").append(votesNeeded).append(" more vote");
descriptionBuilder.append(votesNeeded == 1 ? " is " : "s are ");
descriptionBuilder.append("cast, whichever occurs first.");
}
return descriptionBuilder.toString();
}
private static class FlowCandidate {
private final DataFlow dataFlow;
private final AtomicInteger voteCount = new AtomicInteger(0);
private final Set<NodeIdentifier> nodeIds = Collections.synchronizedSet(new HashSet<>());
public FlowCandidate(final DataFlow dataFlow) {
this.dataFlow = dataFlow;
}
/**
* Casts a vote for this candidate for the given node identifier, if a vote has not already
* been cast for this node identifier
*
* @param nodeId the node id that is casting the vote
* @return <code>true</code> if the vote was case, <code>false</code> if this node id has already cast its vote
*/
public boolean vote(final NodeIdentifier nodeId) {
if (nodeIds.add(nodeId)) {
voteCount.incrementAndGet();
return true;
}
return false;
}
public int getVotes() {
return voteCount.get();
}
public DataFlow getDataFlow() {
return dataFlow;
}
public boolean isFlowEmpty() {
return StandardFlowSynchronizer.isEmpty(dataFlow);
}
public Set<NodeIdentifier> getNodes() {
return nodeIds;
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.flow;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.fingerprint.FingerprintFactory;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
public class PopularVoteFlowElectionFactoryBean implements FactoryBean<PopularVoteFlowElection> {
private static final Logger logger = LoggerFactory.getLogger(PopularVoteFlowElectionFactoryBean.class);
private NiFiProperties properties;
@Override
public PopularVoteFlowElection getObject() throws Exception {
final String maxWaitTime = properties.getFlowElectionMaxWaitTime();
long maxWaitMillis;
try {
maxWaitMillis = FormatUtils.getTimeDuration(maxWaitTime, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
logger.warn("Failed to parse value of property '{}' as a valid time period. Value was '{}'. Ignoring this value and using the default value of '{}'",
NiFiProperties.FLOW_ELECTION_MAX_WAIT_TIME, maxWaitTime, NiFiProperties.DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME);
maxWaitMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME, TimeUnit.MILLISECONDS);
}
final Integer maxNodes = properties.getFlowElectionMaxCandidates();
final StringEncryptor encryptor = StringEncryptor.createEncryptor(properties);
final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor);
return new PopularVoteFlowElection(maxWaitMillis, TimeUnit.MILLISECONDS, maxNodes, fingerprintFactory);
}
@Override
public Class<?> getObjectType() {
return PopularVoteFlowElection.class;
}
@Override
public boolean isSingleton() {
return true;
}
public void setProperties(final NiFiProperties properties) {
this.properties = properties;
}
}

View File

@ -121,7 +121,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
* Visible for testing.
*/
protected synchronized void monitorHeartbeats() {
if (!clusterCoordinator.isActiveClusterCoordinator()) {
final NodeIdentifier activeCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode();
if (activeCoordinator != null && !activeCoordinator.equals(clusterCoordinator.getLocalNodeIdentifier())) {
// Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so
// on a very large delay. So before we kick the node out of the cluster, we want to first check what the
// ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate

View File

@ -153,11 +153,19 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
// Formulate a List of differences between our view of the cluster topology and the node's view
// and send that back to the node so that it is in-sync with us
final List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus();
List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus();
if (nodeStatusList == null) {
nodeStatusList = Collections.emptyList();
}
final List<NodeConnectionStatus> updatedStatuses = getUpdatedStatuses(nodeStatusList);
final HeartbeatResponseMessage responseMessage = new HeartbeatResponseMessage();
responseMessage.setUpdatedNodeStatuses(updatedStatuses);
if (!getClusterCoordinator().isFlowElectionComplete()) {
responseMessage.setFlowElectionMessage(getClusterCoordinator().getFlowElectionStatus());
}
return responseMessage;
}

View File

@ -34,6 +34,7 @@ import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
@ -86,16 +87,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final NiFiProperties nifiProperties;
private final LeaderElectionManager leaderElectionManager;
private final AtomicLong latestUpdateId = new AtomicLong(-1);
private final FlowElection flowElection;
private volatile FlowService flowService;
private volatile boolean connected;
private volatile boolean closed = false;
private volatile boolean requireElection = true;
private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) {
final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) {
this.senderListener = senderListener;
this.flowService = null;
this.eventReporter = eventReporter;
@ -103,6 +106,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.revisionManager = revisionManager;
this.nifiProperties = nifiProperties;
this.leaderElectionManager = leaderElectionManager;
this.flowElection = flowElection;
senderListener.addHandler(this);
}
@ -115,9 +119,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
closed = true;
final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
updateNodeStatus(shutdownStatus, false);
logger.info("Successfully notified other nodes that I am shutting down");
final NodeIdentifier localId = getLocalNodeIdentifier();
if (localId != null) {
final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(localId, DisconnectionCode.NODE_SHUTDOWN);
updateNodeStatus(shutdownStatus, false);
logger.info("Successfully notified other nodes that I am shutting down");
}
}
@Override
@ -230,6 +237,15 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public void requestNodeConnect(final NodeIdentifier nodeId, final String userDn) {
if (requireElection && !flowElection.isElectionComplete() && flowElection.isVoteCounted(nodeId)) {
// If we receive a heartbeat from a node that we already know, we don't want to request that it reconnect
// to the cluster because no flow has yet been elected. However, if the node has not yet voted, we want to send
// a reconnect request because we want this node to cast its vote for the flow, and this happens on connection
logger.debug("Received heartbeat for {} and node is not connected. Will not request node connect to cluster, "
+ "though, because the Flow Election is still in progress", nodeId);
return;
}
if (userDn == null) {
reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster");
} else {
@ -243,7 +259,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
request.setNodeId(nodeId);
request.setInstanceId(instanceId);
requestReconnectionAsynchronously(request, 10, 5);
// If we still are requiring that an election take place, we do not want to include our local dataflow, because we don't
// yet know what the cluster's dataflow looks like. However, if we don't require election, then we've connected to the
// cluster, which means that our flow is correct.
final boolean includeDataFlow = !requireElection;
requestReconnectionAsynchronously(request, 10, 5, includeDataFlow);
}
@Override
@ -652,7 +672,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
disconnectThread.start();
}
private void requestReconnectionAsynchronously(final ReconnectionRequestMessage request, final int reconnectionAttempts, final int retrySeconds) {
private void requestReconnectionAsynchronously(final ReconnectionRequestMessage request, final int reconnectionAttempts, final int retrySeconds, final boolean includeDataFlow) {
final Thread reconnectionThread = new Thread(new Runnable() {
@Override
public void run() {
@ -675,7 +695,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return;
}
request.setDataFlow(new StandardDataFlow(flowService.createDataFlow()));
if (includeDataFlow) {
request.setDataFlow(new StandardDataFlow(flowService.createDataFlow()));
}
request.setNodeConnectionStatuses(getConnectionStatuses());
request.setComponentRevisions(revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
@ -726,9 +749,13 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() {
final NodeConnectionStatus connectionStatus = nodeStatuses.get(getLocalNodeIdentifier());
final NodeConnectionStatusResponseMessage msg = new NodeConnectionStatusResponseMessage();
msg.setNodeConnectionStatus(connectionStatus);
final NodeIdentifier self = getLocalNodeIdentifier();
if (self != null) {
final NodeConnectionStatus connectionStatus = nodeStatuses.get(self);
msg.setNodeConnectionStatus(connectionStatus);
}
return msg;
}
@ -781,6 +808,20 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
}
@Override
public String getFlowElectionStatus() {
if (!requireElection) {
return null;
}
return flowElection.getStatusDescription();
}
@Override
public boolean isFlowElectionComplete() {
return !requireElection || flowElection.isElectionComplete();
}
private NodeIdentifier resolveNodeId(final NodeIdentifier proposedIdentifier) {
final NodeConnectionStatus proposedConnectionStatus = new NodeConnectionStatus(proposedIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
final NodeConnectionStatus existingStatus = nodeStatuses.putIfAbsent(proposedIdentifier, proposedConnectionStatus);
@ -808,59 +849,86 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) {
final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier();
final ConnectionRequest requestWithDn = new ConnectionRequest(addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN()));
final NodeIdentifier withRequestorDn = addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN());
final DataFlow dataFlow = requestMessage.getConnectionRequest().getDataFlow();
final ConnectionRequest requestWithDn = new ConnectionRequest(withRequestorDn, dataFlow);
// Resolve Node identifier.
final NodeIdentifier resolvedNodeId = resolveNodeId(proposedIdentifier);
final ConnectionResponse response = createConnectionResponse(requestWithDn, resolvedNodeId);
if (requireElection) {
final DataFlow electedDataFlow = flowElection.castVote(dataFlow, withRequestorDn);
if (electedDataFlow == null) {
logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", withRequestorDn);
return createFlowElectionInProgressResponse();
} else {
logger.info("Received Connection Request from {}; responding with DataFlow that was elected", withRequestorDn);
return createConnectionResponse(requestWithDn, resolvedNodeId, electedDataFlow);
}
}
logger.info("Received Connection Request from {}; responding with my DataFlow", withRequestorDn);
return createConnectionResponse(requestWithDn, resolvedNodeId);
}
private ConnectionResponseMessage createFlowElectionInProgressResponse() {
final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
responseMessage.setConnectionResponse(response);
final String statusDescription = flowElection.getStatusDescription();
responseMessage.setConnectionResponse(new ConnectionResponse(5, "Cluster is still voting on which Flow is the correct flow for the cluster. " + statusDescription));
return responseMessage;
}
private ConnectionResponse createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier) {
if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
// if the socket address is not listed in the firewall, then return a null response
logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier);
return ConnectionResponse.createBlockedByFirewallResponse();
}
// Set node's status to 'CONNECTING'
NodeConnectionStatus status = getConnectionStatus(resolvedNodeIdentifier);
if (status == null) {
addNodeEvent(resolvedNodeIdentifier, "Connection requested from new node. Setting status to connecting.");
} else {
addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting");
}
status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis());
updateNodeStatus(status);
private ConnectionResponseMessage createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier) {
DataFlow dataFlow = null;
if (flowService != null) {
try {
dataFlow = flowService.createDataFlow();
} catch (final IOException ioe) {
logger.error("Unable to obtain current dataflow from FlowService in order to provide the flow to "
+ resolvedNodeIdentifier + ". Will tell node to try again later", ioe);
+ resolvedNodeIdentifier + ". Will tell node to try again later", ioe);
}
}
if (dataFlow == null) {
// Create try-later response based on flow retrieval delay to give
// the flow management service a chance to retrieve a current flow
final int tryAgainSeconds = 5;
addNodeEvent(resolvedNodeIdentifier, Severity.WARNING, "Connection requested from node, but manager was unable to obtain current flow. "
+ "Instructing node to try again in " + tryAgainSeconds + " seconds.");
return createConnectionResponse(request, resolvedNodeIdentifier, dataFlow);
}
// return try later response
return new ConnectionResponse(tryAgainSeconds);
private ConnectionResponseMessage createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier, final DataFlow clusterDataFlow) {
if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
// if the socket address is not listed in the firewall, then return a null response
logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier);
final ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse();
final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
responseMessage.setConnectionResponse(response);
return responseMessage;
}
return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, getConnectionStatuses(),
if (clusterDataFlow == null) {
final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
responseMessage.setConnectionResponse(new ConnectionResponse(5, "The cluster dataflow is not yet available"));
return responseMessage;
}
// Set node's status to 'CONNECTING'
NodeConnectionStatus status = getConnectionStatus(resolvedNodeIdentifier);
if (status == null) {
addNodeEvent(resolvedNodeIdentifier, "Connection requested from new node. Setting status to connecting.");
} else {
addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting.");
}
status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis());
updateNodeStatus(status);
final ConnectionResponse response = new ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, instanceId, getConnectionStatuses(),
revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
responseMessage.setConnectionResponse(response);
return responseMessage;
}
private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(),
nodeId.getSocketAddress(), nodeId.getSocketPort(),
@ -959,6 +1027,16 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public void setConnected(final boolean connected) {
this.connected = connected;
// Once we have connected to the cluster, election is no longer required.
// It is required only upon startup so that if multiple nodes are started up
// at the same time, and they have different flows, that we don't choose the
// wrong flow as the 'golden copy' by electing that node as the elected
// active Cluster Coordinator.
if (connected) {
logger.info("This node is now connected to the cluster. Will no longer require election of DataFlow.");
requireElection = false;
}
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.nifi.cluster.spring;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
@ -44,8 +45,9 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste
final ClusterNodeFirewall clusterFirewall = applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);
final LeaderElectionManager electionManager = applicationContext.getBean("leaderElectionManager", LeaderElectionManager.class);
final FlowElection flowElection = applicationContext.getBean("flowElection", FlowElection.class);
nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, clusterFirewall, revisionManager, properties);
nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, clusterFirewall, revisionManager, properties);
}
return nodeClusterCoordinator;

View File

@ -41,6 +41,10 @@
<property name="properties" ref="nifiProperties" />
</bean>
<bean id="flowElection" class="org.apache.nifi.cluster.coordination.flow.PopularVoteFlowElectionFactoryBean">
<property name="properties" ref="nifiProperties" />
</bean>
<!-- Cluster Coordinator -->
<bean id="clusterCoordinator" class="org.apache.nifi.cluster.spring.NodeClusterCoordinatorFactoryBean">
<property name="properties" ref="nifiProperties"/>

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.fingerprint.FingerprintFactory;
import org.junit.Test;
import org.mockito.Mockito;
public class TestPopularVoteFlowElection {
@Test
public void testOnlyEmptyFlows() throws IOException {
final FingerprintFactory fingerprintFactory = Mockito.mock(FingerprintFactory.class);
Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenReturn("fingerprint");
final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 3, fingerprintFactory);
final byte[] flow = Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml"));
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
assertNull(election.castVote(createDataFlow(flow), createNodeId(1)));
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
assertNull(election.castVote(createDataFlow(flow), createNodeId(2)));
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
final DataFlow electedDataFlow = election.castVote(createDataFlow(flow), createNodeId(3));
assertNotNull(electedDataFlow);
assertEquals(new String(flow), new String(electedDataFlow.getFlow()));
}
@Test
public void testEmptyFlowIgnoredIfNonEmptyFlowExists() throws IOException {
final FingerprintFactory fingerprintFactory = Mockito.mock(FingerprintFactory.class);
Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenReturn("fingerprint");
final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 8, fingerprintFactory);
final byte[] emptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml"));
final byte[] nonEmptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/non-empty-flow.xml"));
for (int i = 0; i < 8; i++) {
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
final DataFlow dataFlow;
if (i % 4 == 0) {
dataFlow = createDataFlow(nonEmptyFlow);
} else {
dataFlow = createDataFlow(emptyFlow);
}
final DataFlow electedDataFlow = election.castVote(dataFlow, createNodeId(i));
if (i == 7) {
assertNotNull(electedDataFlow);
assertEquals(new String(nonEmptyFlow), new String(electedDataFlow.getFlow()));
} else {
assertNull(electedDataFlow);
}
}
}
private NodeIdentifier createNodeId(final int index) {
return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true);
}
private DataFlow createDataFlow(final byte[] flow) {
return new StandardDataFlow(flow, new byte[0], new byte[0]);
}
}

View File

@ -319,6 +319,16 @@ public class TestAbstractHeartbeatMonitor {
public boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long qualifyingUpdateId) {
return false;
}
@Override
public boolean isFlowElectionComplete() {
return true;
}
@Override
public String getFlowElectionStatus() {
return null;
}
}

View File

@ -32,9 +32,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
@ -77,7 +80,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
nodeStatuses.add(updatedStatus);
@ -132,17 +135,19 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
};
final NodeIdentifier requestedNodeId = createNodeId(6);
final ConnectionRequest request = new ConnectionRequest(requestedNodeId);
final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
requestMsg.setConnectionRequest(request);
coordinator.setConnected(true);
final ProtocolMessage protocolResponse = coordinator.handle(requestMsg);
assertNotNull(protocolResponse);
assertTrue(protocolResponse instanceof ConnectionResponseMessage);
@ -170,7 +175,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
@ -180,6 +185,7 @@ public class TestNodeClusterCoordinator {
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
coordinator.setFlowService(flowService);
coordinator.setConnected(true);
final NodeIdentifier nodeId = createNodeId(1);
coordinator.finishNodeConnection(nodeId);
@ -400,7 +406,7 @@ public class TestNodeClusterCoordinator {
final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false);
final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "localhost", 10000, 11000, false);
final ConnectionRequest connectionRequest = new ConnectionRequest(id1);
final ConnectionRequest connectionRequest = new ConnectionRequest(id1, new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
final ConnectionRequestMessage crm = new ConnectionRequestMessage();
crm.setConnectionRequest(connectionRequest);
@ -411,7 +417,7 @@ public class TestNodeClusterCoordinator {
final NodeIdentifier resolvedNodeId = responseMessage.getConnectionResponse().getNodeIdentifier();
assertEquals(id1, resolvedNodeId);
final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId);
final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
crm2.setConnectionRequest(conRequest2);
@ -434,10 +440,45 @@ public class TestNodeClusterCoordinator {
}
private ProtocolMessage requestConnection(final NodeIdentifier requestedNodeId, final NodeClusterCoordinator coordinator) {
final ConnectionRequest request = new ConnectionRequest(requestedNodeId);
final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
requestMsg.setConnectionRequest(request);
return coordinator.handle(requestMsg);
}
private static class FirstVoteWinsFlowElection implements FlowElection {
private DataFlow dataFlow;
private NodeIdentifier voter;
@Override
public boolean isElectionComplete() {
return dataFlow != null;
}
@Override
public synchronized DataFlow castVote(DataFlow candidate, NodeIdentifier nodeIdentifier) {
if (dataFlow == null) {
dataFlow = candidate;
voter = nodeIdentifier;
}
return dataFlow;
}
@Override
public DataFlow getElectedDataFlow() {
return dataFlow;
}
@Override
public String getStatusDescription() {
return "First Vote Wins";
}
@Override
public boolean isVoteCounted(NodeIdentifier nodeIdentifier) {
return voter != null && voter.equals(nodeIdentifier);
}
}
}

View File

@ -30,7 +30,11 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.flow.PopularVoteFlowElection;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.fingerprint.FingerprintFactory;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,12 +45,22 @@ public class Cluster {
private final Set<Node> nodes = new HashSet<>();
private final TestingServer zookeeperServer;
private final long flowElectionTimeoutMillis;
private final Integer flowElectionMaxNodes;
public Cluster() throws IOException {
this(3, TimeUnit.SECONDS, 3);
}
public Cluster(final long flowElectionTimeout, final TimeUnit flowElectionTimeUnit, final Integer flowElectionMaxNodes) throws IOException {
try {
zookeeperServer = new TestingServer();
} catch (final Exception e) {
throw new RuntimeException(e);
}
this.flowElectionTimeoutMillis = flowElectionTimeUnit.toMillis(flowElectionTimeout);
this.flowElectionMaxNodes = flowElectionMaxNodes;
}
@ -116,7 +130,11 @@ public class Cluster {
addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true");
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps);
final Node node = new Node(nifiProperties);
final FingerprintFactory fingerprintFactory = new FingerprintFactory(StringEncryptor.createEncryptor(nifiProperties));
final FlowElection flowElection = new PopularVoteFlowElection(flowElectionTimeoutMillis, TimeUnit.MILLISECONDS, flowElectionMaxNodes, fingerprintFactory);
final Node node = new Node(nifiProperties, flowElection);
node.start();
nodes.add(node);

View File

@ -28,9 +28,11 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender;
@ -74,6 +76,7 @@ public class Node {
private final List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
private final RevisionManager revisionManager;
private final FlowElection flowElection;
private NodeClusterCoordinator clusterCoordinator;
private NodeProtocolSender protocolSender;
@ -88,11 +91,11 @@ public class Node {
private ScheduledExecutorService executor = new FlowEngine(8, "Node tasks", true);
public Node(final NiFiProperties properties) {
this(createNodeId(), properties);
public Node(final NiFiProperties properties, final FlowElection flowElection) {
this(createNodeId(), properties, flowElection);
}
public Node(final NodeIdentifier nodeId, final NiFiProperties properties) {
public Node(final NodeIdentifier nodeId, final NiFiProperties properties, final FlowElection flowElection) {
this.nodeId = nodeId;
this.nodeProperties = new NiFiProperties() {
@Override
@ -119,6 +122,7 @@ public class Node {
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
electionManager = new CuratorLeaderElectionManager(4, nodeProperties);
this.flowElection = flowElection;
}
@ -132,7 +136,7 @@ public class Node {
protocolSender = createNodeProtocolSender();
clusterCoordinator = createClusterCoordinator();
clusterCoordinator.setLocalNodeIdentifier(nodeId);
clusterCoordinator.setConnected(true);
// clusterCoordinator.setConnected(true);
final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties,
@ -273,7 +277,7 @@ public class Node {
}
final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, null, revisionManager, nodeProperties);
return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, revisionManager, nodeProperties);
}

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<flowController encoding-version="1.0">
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
<rootGroup>
<id>00000000-0000-0000-0000-000000000000</id>
<name>Empty NiFi Flow</name>
<position x="0.0" y="0.0"/>
<comment/>
</rootGroup>
<controllerServices/>
<reportingTasks/>
</flowController>

View File

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<flowController>
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
<rootGroup>
<id>00000000-0000-0000-0000-000000000000</id>
<name>Integration Test Flow</name>
<position x="0.0" y="0.0"/>
<comment/>
</rootGroup>
</flowController>

View File

@ -16,7 +16,35 @@
*/
package org.apache.nifi.controller;
import com.sun.jersey.api.client.ClientHandlerException;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@ -206,36 +234,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.Objects.requireNonNull;
import com.sun.jersey.api.client.ClientHandlerException;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider {
@ -363,7 +362,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final Lock writeLock = rwLock.writeLock();
private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);
private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
public static FlowController createStandaloneInstance(
final FlowFileEventRepository flowFileEventRepo,
@ -3417,8 +3415,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
heartbeat();
} else {
leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);
stateManagerProvider.disableClusterProvider();
setPrimary(false);
@ -3430,6 +3426,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
if (!clustered) {
leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary()));
} finally {
@ -3870,9 +3871,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
private class HeartbeatSendTask implements Runnable {
private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
@Override
public void run() {
try (final NarCloseable narCloseable = NarCloseable.withFrameworkNar()) {
@ -3882,36 +3880,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final HeartbeatMessage message = createHeartbeatMessage();
if (message == null) {
heartbeatLogger.debug("No heartbeat to send");
LOG.debug("No heartbeat to send");
return;
}
final long sendStart = System.nanoTime();
heartbeater.send(message);
final long sendNanos = System.nanoTime() - sendStart;
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
String heartbeatAddress;
try {
heartbeatAddress = heartbeater.getHeartbeatAddress();
} catch (final IOException ioe) {
heartbeatAddress = "Cluster Coordinator (could not determine socket address)";
}
heartbeatLogger.info("Heartbeat created at {} and sent to {} at {}; send took {} millis",
dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
heartbeatAddress,
dateFormatter.format(new Date()),
sendMillis);
} catch (final UnknownServiceAddressException usae) {
if (heartbeatLogger.isDebugEnabled()) {
heartbeatLogger.debug(usae.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug(usae.getMessage());
}
} catch (final Throwable ex) {
heartbeatLogger.warn("Failed to send heartbeat due to: " + ex);
if (heartbeatLogger.isDebugEnabled()) {
heartbeatLogger.warn("", ex);
LOG.warn("Failed to send heartbeat due to: " + ex);
if (LOG.isDebugEnabled()) {
LOG.warn("", ex);
}
}
}
@ -3950,7 +3931,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final HeartbeatMessage message = new HeartbeatMessage();
message.setHeartbeat(heartbeat);
heartbeatLogger.debug("Generated heartbeat");
LOG.debug("Generated heartbeat");
return message;
} catch (final Throwable ex) {

View File

@ -446,7 +446,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
* and heartbeat until a manager is located.
*/
final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow);
final ConnectionResponse response = connect(localFlowEmpty, localFlowEmpty);
final ConnectionResponse response = connect(true, localFlowEmpty, proposedFlow);
// obtain write lock while we are updating the controller. We need to ensure that we don't
// obtain the lock before calling connect(), though, or we will end up getting a deadlock
@ -454,7 +454,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// flow, as that requires a read lock.
writeLock.lock();
try {
if (response == null) {
if (response == null || response.shouldTryLater()) {
logger.info("Flow controller will load local dataflow and suspend connection handshake until a cluster connection response is received.");
// load local proposed flow
@ -523,6 +523,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
clusterCoordinator.disconnectionRequestedByNode(getNodeId(), disconnectionCode, ex.toString());
controller.setClustered(false, null);
clusterCoordinator.setConnected(false);
}
private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException {
@ -587,9 +588,14 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
logger.info("Processing reconnection request from manager.");
// reconnect
final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(),
ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(),
request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions());
if (connectionResponse.getDataFlow() == null) {
logger.info("Received a Reconnection Request that contained no DataFlow. Will attempt to connect to cluster using local flow.");
connectionResponse = connect(false, false, createDataFlow());
}
loadFromConnectionResponse(connectionResponse);
clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream()
@ -747,13 +753,13 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
return templates;
}
private ConnectionResponse connect(final boolean retryOnCommsFailure, final boolean retryIndefinitely) throws ConnectionException {
private ConnectionResponse connect(final boolean retryOnCommsFailure, final boolean retryIndefinitely, final DataFlow dataFlow) throws ConnectionException {
readLock.lock();
try {
logger.info("Connecting Node: " + nodeId);
// create connection request message
final ConnectionRequest request = new ConnectionRequest(nodeId);
final ConnectionRequest request = new ConnectionRequest(nodeId, dataFlow);
final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
requestMsg.setConnectionRequest(request);
@ -772,19 +778,21 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
for (int i = 0; i < maxAttempts || retryIndefinitely; i++) {
try {
response = senderListener.requestConnection(requestMsg).getConnectionResponse();
if (response.getRejectionReason() != null) {
logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason());
// set response to null and treat a firewall blockage the same as getting no response from manager
response = null;
break;
} else if (response.shouldTryLater()) {
logger.info("Flow controller requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds.");
if (response.shouldTryLater()) {
logger.info("Requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds with explanation: " + response.getRejectionReason());
try {
Thread.sleep(response.getTryLaterSeconds() * 1000);
} catch (final InterruptedException ie) {
// we were interrupted, so finish quickly
Thread.currentThread().interrupt();
break;
}
} else if (response.getRejectionReason() != null) {
logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason());
// set response to null and treat a firewall blockage the same as getting no response from manager
response = null;
break;
} else {
// we received a successful connection response from manager
break;
@ -824,7 +832,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// if response is null, then either we had IO problems or we were blocked by firewall or we couldn't determine manager's address
return response;
} else if (response.shouldTryLater()) {
// if response indicates we should try later, then manager was unable to service our request. Just load local flow and move on.
// if response indicates we should try later, then coordinator was unable to service our request. Just load local flow and move on.
// when the cluster coordinator is able to service requests, this node's heartbeat will trigger the cluster coordinator to reach
// out to this node and re-connect to the cluster.
logger.info("Received a 'try again' response from Cluster Coordinator when attempting to connect to cluster with explanation '"
+ response.getRejectionReason() + "'. However, the maximum number of retries have already completed. Will load local flow and connect to the cluster when able.");
return null;
} else {
// cluster manager provided a successful response with a current dataflow
@ -848,8 +860,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private void loadFromConnectionResponse(final ConnectionResponse response) throws ConnectionException {
writeLock.lock();
try {
clusterCoordinator.resetNodeStatuses(response.getNodeConnectionStatuses().stream()
if (response.getNodeConnectionStatuses() != null) {
clusterCoordinator.resetNodeStatuses(response.getNodeConnectionStatuses().stream()
.collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status)));
}
// get the dataflow from the response
final DataFlow dataFlow = response.getDataFlow();

View File

@ -17,8 +17,13 @@
package org.apache.nifi.controller.cluster;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
@ -54,7 +59,7 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
}
@Override
public String getHeartbeatAddress() throws IOException {
public String getHeartbeatAddress() {
final String heartbeatAddress = electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
if (heartbeatAddress == null) {
throw new ProtocolException("Cannot send heartbeat because there is no Cluster Coordinator currently elected");
@ -65,6 +70,8 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
@Override
public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException {
final long sendStart = System.nanoTime();
final String heartbeatAddress = getHeartbeatAddress();
final HeartbeatResponseMessage responseMessage = protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
@ -88,6 +95,21 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
}
}
}
final long sendNanos = System.nanoTime() - sendStart;
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
final String flowElectionMessage = responseMessage.getFlowElectionMessage();
final String formattedElectionMessage = flowElectionMessage == null ? "" : "; " + flowElectionMessage;
logger.info("Heartbeat created at {} and sent to {} at {}; send took {} millis{}",
dateFormatter.format(new Date(heartbeatMessage.getHeartbeat().getCreatedTimestamp())),
heartbeatAddress,
dateFormatter.format(new Date()),
sendMillis,
formattedElectionMessage);
}
@Override

View File

@ -137,9 +137,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
registeredRoles.remove(roleName);
final LeaderRole leaderRole = leaderRoles.remove(roleName);
if (leaderRole == null) {
logger.info("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName);
return;
}
final LeaderSelector leaderSelector = leaderRole.getLeaderSelector();
if (leaderSelector == null) {
logger.warn("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName);
logger.info("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName);
return;
}

View File

@ -75,7 +75,7 @@ import java.util.UUID;
* and processor properties. Examples of items not involved in the fingerprint are: items in the processor "settings" or "comments" tabs, position information, flow controller settings, and counters.
*
*/
public final class FingerprintFactory {
public class FingerprintFactory {
/*
* Developer Note: This class should be changed with care and coordinated
@ -87,33 +87,47 @@ public final class FingerprintFactory {
public static final String NO_VALUE = "NO_VALUE";
private static final String FLOW_CONFIG_XSD = "/FlowConfiguration.xsd";
private static final Schema FLOW_CONFIG_SCHEMA;
private static final DocumentBuilder FLOW_CONFIG_DOC_BUILDER;
private static final String ENCRYPTED_VALUE_PREFIX = "enc{";
private static final String ENCRYPTED_VALUE_SUFFIX = "}";
private final StringEncryptor encryptor;
private final DocumentBuilder flowConfigDocBuilder;
private static final Logger logger = LoggerFactory.getLogger(FingerprintFactory.class);
static {
public FingerprintFactory(final StringEncryptor encryptor) {
this.encryptor = encryptor;
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
documentBuilderFactory.setNamespaceAware(true);
final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
final Schema schema;
try {
FLOW_CONFIG_SCHEMA = schemaFactory.newSchema(FingerprintFactory.class.getResource(FLOW_CONFIG_XSD));
schema = schemaFactory.newSchema(FingerprintFactory.class.getResource(FLOW_CONFIG_XSD));
} catch (final Exception e) {
throw new RuntimeException("Failed to parse schema for file flow configuration.", e);
}
try {
documentBuilderFactory.setSchema(FLOW_CONFIG_SCHEMA);
FLOW_CONFIG_DOC_BUILDER = documentBuilderFactory.newDocumentBuilder();
documentBuilderFactory.setSchema(schema);
flowConfigDocBuilder = documentBuilderFactory.newDocumentBuilder();
} catch (final Exception e) {
throw new RuntimeException("Failed to create document builder for flow configuration.", e);
}
}
public FingerprintFactory(final StringEncryptor encryptor) {
this.encryptor = encryptor;
/**
* Creates a fingerprint of a flow. The order of elements or attributes in the flow does not influence the fingerprint generation.
* This method does not accept a FlowController, which means that Processors cannot be created in order to verify default property
* values, etc. As a result, if Flow A and Flow B are fingerprinted and Flow B, for instance, contains a property with a default value
* that is not present in Flow A, then the two will have different fingerprints.
*
* @param flowBytes the flow represented as bytes
*
* @return a generated fingerprint
*
* @throws FingerprintException if the fingerprint failed to be generated
*/
public synchronized String createFingerprint(final byte[] flowBytes) throws FingerprintException {
return createFingerprint(flowBytes, null);
}
/**
@ -126,7 +140,7 @@ public final class FingerprintFactory {
*
* @throws FingerprintException if the fingerprint failed to be generated
*/
public String createFingerprint(final byte[] flowBytes, final FlowController controller) throws FingerprintException {
public synchronized String createFingerprint(final byte[] flowBytes, final FlowController controller) throws FingerprintException {
try {
return createFingerprint(parseFlow(flowBytes), controller);
} catch (final NoSuchAlgorithmException e) {
@ -178,7 +192,7 @@ public final class FingerprintFactory {
}
try {
return FLOW_CONFIG_DOC_BUILDER.parse(new ByteArrayInputStream(flow));
return flowConfigDocBuilder.parse(new ByteArrayInputStream(flow));
} catch (final SAXException | IOException ex) {
throw new FingerprintException(ex);
}

View File

@ -152,6 +152,8 @@
<nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout>
<nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout>
<nifi.cluster.firewall.file />
<nifi.cluster.flow.election.max.wait.time>5 mins</nifi.cluster.flow.election.max.wait.time>
<nifi.cluster.flow.election.max.candidates></nifi.cluster.flow.election.max.candidates>
<nifi.cluster.request.replication.claim.timeout>15 secs</nifi.cluster.request.replication.claim.timeout>

View File

@ -172,6 +172,8 @@ nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size}
nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}
nifi.cluster.firewall.file=${nifi.cluster.firewall.file}
nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time}
nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates}
# zookeeper properties, used for cluster management #
nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}

View File

@ -16,12 +16,39 @@
*/
package org.apache.nifi.web.api;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.sun.jersey.api.core.HttpContext;
import com.sun.jersey.api.representation.Form;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriBuilderException;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
@ -34,8 +61,10 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.VersionNegotiator;
@ -58,37 +87,12 @@ import org.apache.nifi.web.security.util.CacheKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriBuilderException;
import javax.ws.rs.core.UriInfo;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.sun.jersey.api.core.HttpContext;
import com.sun.jersey.api.representation.Form;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
/**
* Base class for controllers.
@ -120,6 +124,7 @@ public abstract class ApplicationResource {
protected NiFiProperties properties;
private RequestReplicator requestReplicator;
private ClusterCoordinator clusterCoordinator;
private FlowController flowController;
private static final int MAX_CACHE_SOFT_LIMIT = 500;
private final Cache<CacheKey, Request<? extends Entity>> twoPhaseCommitCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
@ -394,6 +399,8 @@ public abstract class ApplicationResource {
return false;
}
ensureFlowInitialized();
// If not connected to the cluster, we do not replicate
if (!isConnectedToCluster()) {
return false;
@ -753,6 +760,12 @@ public abstract class ApplicationResource {
return replicate(method, entity, nodeUuid, null);
}
private void ensureFlowInitialized() {
if (!flowController.isInitialized()) {
throw new IllegalClusterStateException("Cluster is still in the process of voting on the appropriate Data Flow.");
}
}
/**
* Replicates the request to the given node
*
@ -773,6 +786,8 @@ public abstract class ApplicationResource {
throw new UnknownNodeException("Cannot replicate request " + method + " " + getAbsolutePath() + " to node with ID " + nodeUuid + " because the specified node does not exist.");
}
ensureFlowInitialized();
final URI path = getAbsolutePath();
try {
final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
@ -812,6 +827,8 @@ public abstract class ApplicationResource {
}
protected Response replicate(final String method, final NodeIdentifier targetNode, final Object entity) {
ensureFlowInitialized();
try {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
// to the cluster nodes themselves.
@ -828,6 +845,8 @@ public abstract class ApplicationResource {
}
protected Response replicateToCoordinator(final String method, final Object entity) {
ensureFlowInitialized();
try {
final NodeIdentifier coordinatorNode = getClusterCoordinatorNode();
final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
@ -906,6 +925,8 @@ public abstract class ApplicationResource {
* @see #replicate(String, Object, Map)
*/
protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
ensureFlowInitialized();
final URI path = getAbsolutePath();
final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
@ -935,6 +956,8 @@ public abstract class ApplicationResource {
}
protected RequestReplicator getRequestReplicator() {
ensureFlowInitialized();
return requestReplicator;
}
@ -950,6 +973,10 @@ public abstract class ApplicationResource {
return clusterCoordinator;
}
public void setFlowController(final FlowController flowController) {
this.flowController = flowController;
}
protected NiFiProperties getProperties() {
return properties;
}

View File

@ -197,6 +197,7 @@
<property name="properties" ref="nifiProperties"/>
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="flowController" ref="flowController" />
</bean>
<bean id="resourceResource" class="org.apache.nifi.web.api.ResourceResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -204,6 +205,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="controllerResource" class="org.apache.nifi.web.api.ControllerResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -213,6 +215,7 @@
<property name="reportingTaskResource" ref="reportingTaskResource"/>
<property name="controllerServiceResource" ref="controllerServiceResource"/>
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="siteToSiteResource" class="org.apache.nifi.web.api.SiteToSiteResource" scope="singleton">
<constructor-arg ref="nifiProperties"/>
@ -221,6 +224,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="dataTransferResource" class="org.apache.nifi.web.api.DataTransferResource" scope="singleton">
<constructor-arg ref="nifiProperties"/>
@ -229,6 +233,7 @@
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="serviceFacade" ref="serviceFacade"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="snippetResource" class="org.apache.nifi.web.api.SnippetResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -236,6 +241,7 @@
<property name="requestReplicator" ref="requestReplicator" />
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="templateResource" class="org.apache.nifi.web.api.TemplateResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -243,6 +249,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="controllerServiceResource" class="org.apache.nifi.web.api.ControllerServiceResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -250,6 +257,7 @@
<property name="requestReplicator" ref="requestReplicator"/>
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="reportingTaskResource" class="org.apache.nifi.web.api.ReportingTaskResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -257,6 +265,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="processGroupResource" class="org.apache.nifi.web.api.ProcessGroupResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -273,6 +282,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="processorResource" class="org.apache.nifi.web.api.ProcessorResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -280,6 +290,7 @@
<property name="requestReplicator" ref="requestReplicator" />
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="connectionResource" class="org.apache.nifi.web.api.ConnectionResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -287,6 +298,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="flowfileQueueResource" class="org.apache.nifi.web.api.FlowFileQueueResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -294,6 +306,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="remoteProcessGroupResource" class="org.apache.nifi.web.api.RemoteProcessGroupResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -301,6 +314,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="inputPortResource" class="org.apache.nifi.web.api.InputPortResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -308,6 +322,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="outputPortResource" class="org.apache.nifi.web.api.OutputPortResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -315,6 +330,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="labelResource" class="org.apache.nifi.web.api.LabelResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -322,6 +338,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="funnelResource" class="org.apache.nifi.web.api.FunnelResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -329,6 +346,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer" />
<property name="flowController" ref="flowController" />
</bean>
<bean id="provenanceResource" class="org.apache.nifi.web.api.ProvenanceResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -336,12 +354,14 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="provenanceEventResource" class="org.apache.nifi.web.api.ProvenanceEventResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
<property name="properties" ref="nifiProperties"/>
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="flowController" ref="flowController" />
</bean>
<bean id="countersResource" class="org.apache.nifi.web.api.CountersResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -349,6 +369,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="systemDiagnosticsResource" class="org.apache.nifi.web.api.SystemDiagnosticsResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
@ -356,6 +377,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
</bean>
<bean id="accessResource" class="org.apache.nifi.web.api.AccessResource" scope="singleton">
<property name="loginIdentityProvider" ref="loginIdentityProvider"/>
@ -369,6 +391,7 @@
<property name="properties" ref="nifiProperties"/>
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="flowController" ref="flowController" />
</bean>
<bean id="accessPolicyResource" class="org.apache.nifi.web.api.AccessPolicyResource" scope="singleton">
<constructor-arg ref="serviceFacade"/>

View File

@ -407,6 +407,8 @@ nf.Common = (function () {
$('#message-title').text('Unauthorized');
} else if (xhr.status === 403) {
$('#message-title').text('Access Denied');
} else if (xhr.status === 409) {
$('#message-title').text('Invalid State');
} else {
$('#message-title').text('An unexpected error has occurred');
}