NIFI-2574 merging master with cluster changes to updated NiFiProperties approach

This commit is contained in:
joewitt 2016-08-17 01:23:54 -07:00
commit 6bf7e7f325
48 changed files with 1041 additions and 878 deletions

View File

@ -107,12 +107,19 @@ public interface ClusterCoordinator {
Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState... states);
/**
* Returns a Map of NodeConnectionStatus to all Node Identifiers that have that status.
* Returns a Map of NodeConnectionStates to all Node Identifiers that have that state.
*
* @return the NodeConnectionStatus for each Node in the cluster, grouped by the Connection Status
* @return the NodeConnectionState for each Node in the cluster, grouped by the Connection State
*/
Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates();
/**
* Returns a List of the NodeConnectionStatus for each node in the cluster
*
* @return a List of the NodeConnectionStatus for each node in the cluster
*/
List<NodeConnectionStatus> getConnectionStatuses();
/**
* Checks if the given hostname is blocked by the configured firewall, returning
* <code>true</code> if the node is blocked, <code>false</code> if the node is
@ -134,14 +141,6 @@ public interface ClusterCoordinator {
*/
void reportEvent(NodeIdentifier nodeId, Severity severity, String event);
/**
* Updates the roles held by the given node
*
* @param nodeId the id of the node to update
* @param roles the new roles that the node possesses
*/
void updateNodeRoles(NodeIdentifier nodeId, Set<String> roles);
/**
* Returns the NodeIdentifier that exists that has the given UUID, or <code>null</code> if no NodeIdentifier
* exists for the given UUID
@ -196,6 +195,17 @@ public interface ClusterCoordinator {
*/
void resetNodeStatuses(Map<NodeIdentifier, NodeConnectionStatus> statusMap);
/**
* Resets the status of the node to be in accordance with the given NodeConnectionStatus if and only if the
* currently held status for this node has an Update ID equal to the given <code>qualifyingUpdateId</code>
*
* @param connectionStatus the new status of the node
* @param qualifyingUpdateId the Update ID to compare the current ID with. If the current ID for the node described by the provided
* NodeConnectionStatus is not equal to this value, the value will not be updated
* @return <code>true</code> if the node status was updated, <code>false</code> if the <code>qualifyingUpdateId</code> is out of date.
*/
boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long qualifyingUpdateId);
/**
* Notifies the Cluster Coordinator of the Node Identifier that the coordinator is currently running on
*
@ -216,18 +226,4 @@ public interface ClusterCoordinator {
* @return <code>true</code> if connected, <code>false</code> otherwise
*/
boolean isConnected();
/**
* Notifies the cluster coordinator that this node has been granted the given role
*
* @param clusterRole the role that this node has been granted
*/
void addRole(String clusterRole);
/**
* Notifies the cluster coordinator that this node is no longer responsible for the given role
*
* @param clusterRole the role that this node is no longer responsible for
*/
void removeRole(String clusterRole);
}

View File

@ -52,4 +52,9 @@ public interface HeartbeatMonitor {
* @param nodeId the id of the node whose heartbeat should be removed
*/
void removeHeartbeat(NodeIdentifier nodeId);
/**
* @return the address that heartbeats should be sent to when this node is elected coordinator.
*/
String getHeartbeatAddress();
}

View File

@ -17,8 +17,6 @@
package org.apache.nifi.cluster.coordination.heartbeat;
import java.util.Set;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -39,11 +37,6 @@ public interface NodeHeartbeat {
*/
NodeConnectionStatus getConnectionStatus();
/**
* @return the set of Roles that the node currently possesses.
*/
Set<String> getRoles();
/**
* @return the number of FlowFiles that are queued up on the node
*/

View File

@ -17,9 +17,19 @@
package org.apache.nifi.cluster.coordination.node;
import java.util.HashSet;
import java.util.Set;
public class ClusterRoles {
public static final String PRIMARY_NODE = "Primary Node";
public static final String CLUSTER_COORDINATOR = "Cluster Coordinator";
public static Set<String> getAllRoles() {
final Set<String> roles = new HashSet<>();
roles.add(PRIMARY_NODE);
roles.add(CLUSTER_COORDINATOR);
return roles;
}
}

View File

@ -17,10 +17,7 @@
package org.apache.nifi.cluster.coordination.node;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
@ -41,40 +38,38 @@ public class NodeConnectionStatus {
private final DisconnectionCode disconnectCode;
private final String disconnectReason;
private final Long connectionRequestTime;
private final Set<String> roles;
public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final Set<String> roles) {
this(nodeId, state, null, null, null, roles);
public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state) {
this(nodeId, state, null, null, null);
}
public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) {
this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.name(), null, null);
this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.toString(), null);
}
public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String disconnectionExplanation) {
this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionExplanation, null, null);
this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionExplanation, null);
}
public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectionCode, final Set<String> roles) {
this(nodeId, state, disconnectionCode, disconnectionCode.name(), null, roles);
public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectionCode) {
this(nodeId, state, disconnectionCode, disconnectionCode == null ? null : disconnectionCode.toString(), null);
}
public NodeConnectionStatus(final NodeConnectionStatus status, final Set<String> roles) {
this(status.getNodeIdentifier(), status.getState(), status.getDisconnectCode(), status.getDisconnectReason(), status.getConnectionRequestTime(), roles);
public NodeConnectionStatus(final NodeConnectionStatus status) {
this(status.getNodeIdentifier(), status.getState(), status.getDisconnectCode(), status.getDisconnectReason(), status.getConnectionRequestTime());
}
public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode,
final String disconnectReason, final Long connectionRequestTime, final Set<String> roles) {
this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, disconnectReason, connectionRequestTime, roles);
final String disconnectReason, final Long connectionRequestTime) {
this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, disconnectReason, connectionRequestTime);
}
public NodeConnectionStatus(final long updateId, final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode,
final String disconnectReason, final Long connectionRequestTime, final Set<String> roles) {
final String disconnectReason, final Long connectionRequestTime) {
this.updateId = updateId;
this.nodeId = nodeId;
this.state = state;
this.roles = roles == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(roles));
if (state == NodeConnectionState.DISCONNECTED && disconnectCode == null) {
this.disconnectCode = DisconnectionCode.UNKNOWN;
this.disconnectReason = this.disconnectCode.toString();
@ -90,10 +85,6 @@ public class NodeConnectionStatus {
return updateId;
}
public Set<String> getRoles() {
return roles;
}
public NodeIdentifier getNodeIdentifier() {
return nodeId;
}
@ -118,11 +109,10 @@ public class NodeConnectionStatus {
public String toString() {
final StringBuilder sb = new StringBuilder();
final NodeConnectionState state = getState();
sb.append("NodeConnectionStatus[state=").append(state);
sb.append("NodeConnectionStatus[nodeId=").append(nodeId).append(", state=").append(state);
if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) {
sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getDisconnectReason());
}
sb.append(", roles=").append(getRoles());
sb.append(", updateId=").append(getUpdateIdentifier());
sb.append("]");
return sb.toString();
@ -142,7 +132,6 @@ public class NodeConnectionStatus {
final int prime = 31;
int result = 1;
result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
result = prime * result + ((roles == null) ? 0 : roles.hashCode());
result = prime * result + ((state == null) ? 0 : state.hashCode());
return result;
}
@ -163,7 +152,6 @@ public class NodeConnectionStatus {
NodeConnectionStatus other = (NodeConnectionStatus) obj;
return Objects.deepEquals(getNodeIdentifier(), other.getNodeIdentifier())
&& Objects.deepEquals(getRoles(), other.getRoles())
&& Objects.deepEquals(getState(), other.getState());
}
}

View File

@ -24,6 +24,7 @@ import java.net.Socket;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.io.socket.SocketConfiguration;
@ -74,7 +75,7 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
}
@Override
public void heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
final String hostname;
final int port;
try {
@ -85,7 +86,12 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
throw new IllegalArgumentException("Cannot send heartbeat to address [" + address + "]. Address must be in <hostname>:<port> format");
}
sendProtocolMessage(msg, hostname, port);
final ProtocolMessage responseMessage = sendProtocolMessage(msg, hostname, port);
if (MessageType.HEARTBEAT_RESPONSE == responseMessage.getType()) {
return (HeartbeatResponseMessage) responseMessage;
}
throw new ProtocolException("Expected message type '" + MessageType.HEARTBEAT_RESPONSE + "' but found '" + responseMessage.getType() + "'");
}
@ -108,7 +114,7 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
return socketConfiguration;
}
private void sendProtocolMessage(final ProtocolMessage msg, final String hostname, final int port) {
private ProtocolMessage sendProtocolMessage(final ProtocolMessage msg, final String hostname, final int port) {
Socket socket = null;
try {
try {
@ -124,6 +130,18 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
} catch (final IOException ioe) {
throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
}
final ProtocolMessage response;
try {
// unmarshall response and return
final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
response = unmarshaller.unmarshal(socket.getInputStream());
} catch (final IOException ioe) {
throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message from "
+ socket.getRemoteSocketAddress() + " due to: " + ioe, ioe);
}
return response;
} finally {
SocketUtils.closeQuietly(socket);
}

View File

@ -16,10 +16,7 @@
*/
package org.apache.nifi.cluster.protocol;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
@ -35,17 +32,15 @@ import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter;
public class Heartbeat {
private final NodeIdentifier nodeIdentifier;
private final Set<String> roles;
private final NodeConnectionStatus connectionStatus;
private final long createdTimestamp;
private final byte[] payload;
public Heartbeat(final NodeIdentifier nodeIdentifier, final Set<String> roles, final NodeConnectionStatus connectionStatus, final byte[] payload) {
public Heartbeat(final NodeIdentifier nodeIdentifier, final NodeConnectionStatus connectionStatus, final byte[] payload) {
if (nodeIdentifier == null) {
throw new IllegalArgumentException("Node Identifier may not be null.");
}
this.nodeIdentifier = nodeIdentifier;
this.roles = roles == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(roles));
this.connectionStatus = connectionStatus;
this.payload = payload;
this.createdTimestamp = new Date().getTime();
@ -59,10 +54,6 @@ public class Heartbeat {
return payload;
}
public Set<String> getRoles() {
return roles;
}
public NodeConnectionStatus getConnectionStatus() {
return connectionStatus;
}

View File

@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster;
package org.apache.nifi.cluster.protocol;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
@ -27,7 +28,7 @@ import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
/**
* The payload of the heartbeat. The payload contains status to inform the cluster manager the current workload of this node.
@ -50,6 +51,7 @@ public class HeartbeatPayload {
private long totalFlowFileCount;
private long totalFlowFileBytes;
private long systemStartTime;
private List<NodeConnectionStatus> clusterStatus;
public int getActiveThreadCount() {
return activeThreadCount;
@ -83,6 +85,14 @@ public class HeartbeatPayload {
this.systemStartTime = systemStartTime;
}
public List<NodeConnectionStatus> getClusterStatus() {
return clusterStatus;
}
public void setClusterStatus(final List<NodeConnectionStatus> clusterStatus) {
this.clusterStatus = clusterStatus;
}
public byte[] marshal() throws ProtocolException {
final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
marshal(this, payloadBytes);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.cluster.protocol;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
/**
* An interface for sending protocol messages from a node to the cluster
@ -44,6 +45,8 @@ public interface NodeProtocolSender {
* @param msg the heartbeat message to send
* @param address the address of the Cluster Coordinator in &lt;hostname&gt;:&lt;port&gt; format
* @throws ProtocolException if unable to send the heartbeat
*
* @return the response from the Cluster Coordinator
*/
void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException;
HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) throws ProtocolException;
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener {
@ -92,7 +93,7 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
}
@Override
public void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException {
sender.heartbeat(msg, address);
public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
return sender.heartbeat(msg, address);
}
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.cluster.protocol.jaxb.message;
import java.util.Set;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@ -29,7 +27,6 @@ public class AdaptedHeartbeat {
private NodeIdentifier nodeIdentifier;
private byte[] payload;
private Set<String> roles;
private NodeConnectionStatus connectionStatus;
public AdaptedHeartbeat() {
@ -44,14 +41,6 @@ public class AdaptedHeartbeat {
this.nodeIdentifier = nodeIdentifier;
}
public Set<String> getRoles() {
return roles;
}
public void setRoles(Set<String> roles) {
this.roles = roles;
}
public void setConnectionStatus(NodeConnectionStatus connectionStatus) {
this.connectionStatus = connectionStatus;
}

View File

@ -17,8 +17,6 @@
package org.apache.nifi.cluster.protocol.jaxb.message;
import java.util.Set;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -30,7 +28,6 @@ public class AdaptedNodeConnectionStatus {
private DisconnectionCode disconnectCode;
private String disconnectReason;
private Long connectionRequestTime;
private Set<String> roles;
public Long getUpdateId() {
return updateId;
@ -79,12 +76,4 @@ public class AdaptedNodeConnectionStatus {
public void setConnectionRequestTime(Long connectionRequestTime) {
this.connectionRequestTime = connectionRequestTime;
}
public Set<String> getRoles() {
return roles;
}
public void setRoles(Set<String> roles) {
this.roles = roles;
}
}

View File

@ -34,9 +34,6 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
// set payload
aHb.setPayload(hb.getPayload());
// set leader flag
aHb.setRoles(hb.getRoles());
// set connected flag
aHb.setConnectionStatus(hb.getConnectionStatus());
}
@ -46,7 +43,7 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
@Override
public Heartbeat unmarshal(final AdaptedHeartbeat aHb) {
return new Heartbeat(aHb.getNodeIdentifier(), aHb.getRoles(), aHb.getConnectionStatus(), aHb.getPayload());
return new Heartbeat(aHb.getNodeIdentifier(), aHb.getConnectionStatus(), aHb.getPayload());
}
}

View File

@ -30,8 +30,7 @@ public class NodeConnectionStatusAdapter extends XmlAdapter<AdaptedNodeConnectio
adapted.getState(),
adapted.getDisconnectCode(),
adapted.getDisconnectReason(),
adapted.getConnectionRequestTime(),
adapted.getRoles());
adapted.getConnectionRequestTime());
}
@Override
@ -44,7 +43,6 @@ public class NodeConnectionStatusAdapter extends XmlAdapter<AdaptedNodeConnectio
adapted.setDisconnectCode(toAdapt.getDisconnectCode());
adapted.setDisconnectReason(toAdapt.getDisconnectReason());
adapted.setState(toAdapt.getState());
adapted.setRoles(toAdapt.getRoles());
}
return adapted;
}

View File

@ -24,6 +24,7 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
@ -97,4 +98,7 @@ public class ObjectFactory {
return new NodeConnectionStatusResponseMessage();
}
public HeartbeatResponseMessage createHeartbeatResponse() {
return new HeartbeatResponseMessage();
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.message;
import java.util.ArrayList;
import java.util.List;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@XmlRootElement(name = "heartbeatResponse")
public class HeartbeatResponseMessage extends ProtocolMessage {
private List<NodeConnectionStatus> updatedNodeStatuses = new ArrayList<>();
@Override
public MessageType getType() {
return MessageType.HEARTBEAT_RESPONSE;
}
public List<NodeConnectionStatus> getUpdatedNodeStatuses() {
return updatedNodeStatuses;
}
public void setUpdatedNodeStatuses(final List<NodeConnectionStatus> nodeStatuses) {
this.updatedNodeStatuses = new ArrayList<>(nodeStatuses);
}
}

View File

@ -32,6 +32,7 @@ public abstract class ProtocolMessage {
RECONNECTION_RESPONSE,
SERVICE_BROADCAST,
HEARTBEAT,
HEARTBEAT_RESPONSE,
NODE_CONNECTION_STATUS_REQUEST,
NODE_CONNECTION_STATUS_RESPONSE,
NODE_STATUS_CHANGE;

View File

@ -54,11 +54,18 @@
</bean>
<!-- node protocol sender -->
<!--
<bean id="nodeProtocolSender" class="org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender">
<constructor-arg ref="protocolSocketConfiguration"/>
<constructor-arg ref="protocolContext"/>
<constructor-arg ref="nifiProperties"/>
</bean>
-->
<bean id="nodeProtocolSender" class="org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender">
<constructor-arg ref="protocolSocketConfiguration"/>
<constructor-arg ref="protocolContext"/>
<constructor-arg ref="leaderElectionManager"/>
</bean>
<!-- protocol listener -->
<bean id="protocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener">

View File

@ -32,9 +32,12 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.web.Revision;
@ -96,4 +99,29 @@ public class TestJaxbProtocolUtils {
final NodeConnectionStatus unmarshalledStatus = unmarshalledMsg.getNodeConnectionStatus();
assertEquals(nodeStatus, unmarshalledStatus);
}
@Test
public void testRoundTripHeartbeat() throws JAXBException {
final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, true);
final NodeConnectionStatus nodeStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
final HeartbeatPayload payload = new HeartbeatPayload();
payload.setActiveThreadCount(1);
payload.setSystemStartTime(System.currentTimeMillis());
payload.setTotalFlowFileBytes(83L);
payload.setTotalFlowFileCount(4);
final List<NodeConnectionStatus> clusterStatus = Collections.singletonList(nodeStatus);
payload.setClusterStatus(clusterStatus);
final Heartbeat heartbeat = new Heartbeat(nodeId, nodeStatus, payload.marshal());
final HeartbeatMessage msg = new HeartbeatMessage();
msg.setHeartbeat(heartbeat);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
assertTrue(unmarshalled instanceof HeartbeatMessage);
}
}

View File

@ -253,8 +253,6 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
clusterCoordinator.finishNodeConnection(nodeId);
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
}
clusterCoordinator.updateNodeRoles(nodeId, heartbeat.getRoles());
}
/**

View File

@ -16,38 +16,33 @@
*/
package org.apache.nifi.cluster.coordination.heartbeat;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,13 +54,6 @@ import org.slf4j.LoggerFactory;
public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler {
protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
private static final String COORDINATOR_ZNODE_NAME = "coordinator";
private final ZooKeeperClientConfig zkClientConfig;
private final String clusterNodesPath;
private volatile Map<String, NodeIdentifier> clusterNodeIds = new HashMap<>();
private volatile CuratorFramework curatorClient;
private final String heartbeatAddress;
private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>();
@ -85,8 +73,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
super(clusterCoordinator, nifiProperties);
protocolListener.addHandler(this);
this.zkClientConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes");
String hostname = nifiProperties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS);
if (hostname == null || hostname.trim().isEmpty()) {
@ -110,17 +96,12 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
}
@Override
public void onStart() {
final RetryPolicy retryPolicy = new RetryForever(5000);
curatorClient = CuratorFrameworkFactory.builder()
.connectString(zkClientConfig.getConnectString())
.sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis())
.retryPolicy(retryPolicy)
.defaultData(new byte[0])
.build();
curatorClient.start();
public String getHeartbeatAddress() {
return heartbeatAddress;
}
@Override
public void onStart() {
// We don't know what the heartbeats look like for the nodes, since we were just elected to monitoring
// them. However, the map may be filled with old heartbeats. So we clear the heartbeats and populate the
// map with new heartbeats set to the current time and using the currently known status. We do this so
@ -129,58 +110,13 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
heartbeatMessages.clear();
for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) {
final NodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(),
clusterCoordinator.getConnectionStatus(nodeId), Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis());
clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, System.currentTimeMillis());
heartbeatMessages.put(nodeId, heartbeat);
}
final Thread publishAddress = new Thread(new Runnable() {
@Override
public void run() {
while (!isStopped()) {
final String path = clusterNodesPath + "/" + COORDINATOR_ZNODE_NAME;
try {
try {
curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
logger.info("Successfully published Cluster Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress);
return;
} catch (final NoNodeException nne) {
// ensure that parents are created, using a wide-open ACL because the parents contain no data
// and the path is not in any way sensitive.
try {
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (final NodeExistsException nee) {
// This is okay. Node already exists.
}
curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
logger.info("Successfully published address as heartbeat monitor address at path {} with value {}", path, heartbeatAddress);
return;
}
} catch (Exception e) {
logger.warn("Failed to update ZooKeeper to notify nodes of the heartbeat address. Will continue to retry.");
try {
Thread.sleep(2000L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
}
});
publishAddress.setName("Publish Heartbeat Address");
publishAddress.setDaemon(true);
publishAddress.start();
}
@Override
public void onStop() {
if (curatorClient != null) {
curatorClient.close();
}
}
@Override
@ -194,10 +130,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
heartbeatMessages.remove(nodeId);
}
protected Set<NodeIdentifier> getClusterNodeIds() {
return new HashSet<>(clusterNodeIds.values());
}
@Override
public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
if (msg.getType() != MessageType.HEARTBEAT) {
@ -209,7 +141,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
final NodeConnectionStatus connectionStatus = heartbeat.getConnectionStatus();
final Set<String> roles = heartbeat.getRoles();
final byte[] payloadBytes = heartbeat.getPayload();
final HeartbeatPayload payload = HeartbeatPayload.unmarshal(payloadBytes);
final int activeThreadCount = payload.getActiveThreadCount();
@ -218,11 +149,48 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
final long systemStartTime = payload.getSystemStartTime();
final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(),
connectionStatus, roles, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime);
connectionStatus, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime);
heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat);
logger.debug("Received new heartbeat from {}", nodeId);
return null;
// Formulate a List of differences between our view of the cluster topology and the node's view
// and send that back to the node so that it is in-sync with us
final List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus();
final List<NodeConnectionStatus> updatedStatuses = getUpdatedStatuses(nodeStatusList);
final HeartbeatResponseMessage responseMessage = new HeartbeatResponseMessage();
responseMessage.setUpdatedNodeStatuses(updatedStatuses);
return responseMessage;
}
private List<NodeConnectionStatus> getUpdatedStatuses(final List<NodeConnectionStatus> nodeStatusList) {
// Map node's statuses by NodeIdentifier for quick & easy lookup
final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = nodeStatusList.stream()
.collect(Collectors.toMap(status -> status.getNodeIdentifier(), Function.identity()));
// Check if our connection status is the same for each Node Identifier and if not, add our version of the status
// to a List of updated statuses.
final List<NodeConnectionStatus> currentStatuses = clusterCoordinator.getConnectionStatuses();
final List<NodeConnectionStatus> updatedStatuses = new ArrayList<>();
for (final NodeConnectionStatus currentStatus : currentStatuses) {
final NodeConnectionStatus nodeStatus = nodeStatusMap.get(currentStatus.getNodeIdentifier());
if (!currentStatus.equals(nodeStatus)) {
updatedStatuses.add(currentStatus);
}
}
// If the node has any statuses that we do not have, add a REMOVED status to the update list
final Set<NodeIdentifier> nodeIds = currentStatuses.stream().map(status -> status.getNodeIdentifier()).collect(Collectors.toSet());
for (final NodeConnectionStatus nodeStatus : nodeStatusList) {
if (!nodeIds.contains(nodeStatus.getNodeIdentifier())) {
updatedStatuses.add(new NodeConnectionStatus(nodeStatus.getNodeIdentifier(), NodeConnectionState.REMOVED, null));
}
}
logger.debug("\n\nCalculated diff between current cluster status and node cluster status as follows:\nNode: {}\nSelf: {}\nDifference: {}\n\n",
nodeStatusList, currentStatuses, updatedStatuses);
return updatedStatuses;
}
@Override

View File

@ -17,13 +17,9 @@
package org.apache.nifi.cluster.coordination.heartbeat;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@ -32,18 +28,16 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
private final NodeIdentifier nodeId;
private final long timestamp;
private final NodeConnectionStatus connectionStatus;
private final Set<String> roles;
private final int flowFileCount;
private final long flowFileBytes;
private final int activeThreadCount;
private final long systemStartTime;
public StandardNodeHeartbeat(final NodeIdentifier nodeId, final long timestamp, final NodeConnectionStatus connectionStatus,
final Set<String> roles, final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime) {
final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime) {
this.timestamp = timestamp;
this.nodeId = nodeId;
this.connectionStatus = connectionStatus;
this.roles = roles == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(roles));
this.flowFileCount = flowFileCount;
this.flowFileBytes = flowFileBytes;
this.activeThreadCount = activeThreadCount;
@ -65,11 +59,6 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
return connectionStatus;
}
@Override
public Set<String> getRoles() {
return roles;
}
@Override
public int getFlowFileCount() {
return flowFileCount;
@ -85,7 +74,6 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
return activeThreadCount;
}
@Override
public long getSystemStartTime() {
return systemStartTime;
@ -96,7 +84,7 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
final HeartbeatPayload payload = HeartbeatPayload.unmarshal(heartbeat.getPayload());
return new StandardNodeHeartbeat(heartbeat.getNodeIdentifier(), timestamp, heartbeat.getConnectionStatus(),
heartbeat.getRoles(), (int) payload.getTotalFlowFileCount(), payload.getTotalFlowFileBytes(),
(int) payload.getTotalFlowFileCount(), payload.getTotalFlowFileBytes(),
payload.getActiveThreadCount(), payload.getSystemStartTime());
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.node;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LeaderElectionNodeProtocolSender extends AbstractNodeProtocolSender {
private static final Logger logger = LoggerFactory.getLogger(LeaderElectionNodeProtocolSender.class);
private final LeaderElectionManager electionManager;
public LeaderElectionNodeProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext, final LeaderElectionManager electionManager) {
super(socketConfiguration, protocolContext);
this.electionManager = electionManager;
}
@Override
protected InetSocketAddress getServiceAddress() throws IOException {
final String address = electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
if (StringUtils.isEmpty(address)) {
throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
}
final String[] splits = address.split(":");
if (splits.length != 2) {
final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates "
+ "that address is %s, but this is not in the expected format of <hostname>:<port>", address);
logger.error(message);
throw new ProtocolException(message);
}
logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
final String hostname = splits[0];
final int port;
try {
port = Integer.parseInt(splits[1]);
if (port < 1 || port > 65535) {
throw new NumberFormatException("Port must be in the range of 1 - 65535 but got " + port);
}
} catch (final NumberFormatException nfe) {
final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates "
+ "that address is %s, but the port is not a valid port number", address);
logger.error(message);
throw new ProtocolException(message);
}
final InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(hostname, port);
return socketAddress;
}
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.cluster.coordination.node;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -28,16 +27,13 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@ -48,7 +44,6 @@ import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
@ -66,15 +61,12 @@ import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -92,22 +84,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final EventReporter eventReporter;
private final ClusterNodeFirewall firewall;
private final RevisionManager revisionManager;
// Curator used to determine which node is coordinator
private final CuratorFramework curatorClient;
private final String nodesPathPrefix;
private final String coordinatorPath;
private final NiFiProperties nifiProperties;
private final LeaderElectionManager leaderElectionManager;
private final AtomicLong latestUpdateId = new AtomicLong(-1);
private volatile FlowService flowService;
private volatile boolean connected;
private volatile String coordinatorAddress;
private volatile boolean closed = false;
private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter,
public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) {
this.senderListener = senderListener;
this.flowService = null;
@ -115,16 +103,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.firewall = firewall;
this.revisionManager = revisionManager;
this.nifiProperties = nifiProperties;
final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
curatorClient.start();
nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
coordinatorPath = nodesPathPrefix + "/coordinator";
this.leaderElectionManager = leaderElectionManager;
senderListener.addHandler(this);
}
@ -140,8 +119,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
updateNodeStatus(shutdownStatus, false);
logger.info("Successfully notified other nodes that I am shutting down");
curatorClient.close();
}
@Override
@ -155,10 +132,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return nodeId;
}
private NodeIdentifier waitForLocalNodeIdentifier() {
return waitForNodeIdentifier(() -> getLocalNodeIdentifier());
}
private NodeIdentifier waitForElectedClusterCoordinator() {
return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false));
}
@ -176,6 +149,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
Thread.sleep(100L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return null;
}
}
}
@ -184,34 +158,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
private String getElectedActiveCoordinatorAddress() throws IOException {
final String curAddress = coordinatorAddress;
if (curAddress != null) {
return curAddress;
}
try {
// Get coordinator address and add watcher to change who we are heartbeating to if the value changes.
final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() {
@Override
public void process(final WatchedEvent event) {
coordinatorAddress = null;
}
}).forPath(coordinatorPath);
final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
logger.info("Determined that Cluster Coordinator is located at {}", address);
return address;
} catch (final KeeperException.NoNodeException nne) {
throw new NoClusterCoordinatorException();
} catch (Exception e) {
throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
}
return leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
}
@Override
public void resetNodeStatuses(final Map<NodeIdentifier, NodeConnectionStatus> statusMap) {
logger.info("Resetting cluster node statuses from {} to {}", nodeStatuses, statusMap);
coordinatorAddress = null;
// For each proposed replacement, update the nodeStatuses map if and only if the replacement
// has a larger update id than the current value.
@ -219,14 +171,29 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final NodeIdentifier nodeId = entry.getKey();
final NodeConnectionStatus proposedStatus = entry.getValue();
boolean updated = false;
while (!updated) {
final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeId);
updated = replaceNodeStatus(nodeId, currentStatus, proposedStatus);
if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
nodeStatuses.remove(nodeId);
} else {
nodeStatuses.put(nodeId, proposedStatus);
}
}
}
@Override
public boolean resetNodeStatus(final NodeConnectionStatus connectionStatus, final long qualifyingUpdateId) {
final NodeIdentifier nodeId = connectionStatus.getNodeIdentifier();
final NodeConnectionStatus currentStatus = getConnectionStatus(nodeId);
if (currentStatus == null) {
return replaceNodeStatus(nodeId, null, connectionStatus);
} else if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) {
return replaceNodeStatus(nodeId, currentStatus, connectionStatus);
}
// The update identifier is not the same. We will not replace the value
return false;
}
/**
* Attempts to update the nodeStatuses map by changing the value for the
* given node id from the current status to the new status, as in
@ -247,11 +214,19 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
if (currentStatus == null) {
final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, newStatus);
return existingValue == null;
if (newStatus.getState() == NodeConnectionState.REMOVED) {
return nodeStatuses.remove(nodeId, currentStatus);
} else {
final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, newStatus);
return existingValue == null;
}
}
return nodeStatuses.replace(nodeId, currentStatus, newStatus);
if (newStatus.getState() == NodeConnectionState.REMOVED) {
return nodeStatuses.remove(nodeId, currentStatus);
} else {
return nodeStatuses.replace(nodeId, currentStatus, newStatus);
}
}
@Override
@ -262,7 +237,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster on behalf of " + userDn);
}
updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), getRoles(nodeId)));
updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis()));
// create the request
final ReconnectionRequestMessage request = new ReconnectionRequestMessage();
@ -272,11 +247,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
requestReconnectionAsynchronously(request, 10, 5);
}
private Set<String> getRoles(final NodeIdentifier nodeId) {
final NodeConnectionStatus status = getConnectionStatus(nodeId);
return status == null ? Collections.emptySet() : status.getRoles();
}
@Override
public void finishNodeConnection(final NodeIdentifier nodeId) {
final NodeConnectionState state = getConnectionState(nodeId);
@ -298,7 +268,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
logger.info("{} is now connected", nodeId);
updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, getRoles(nodeId)));
updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
}
@Override
@ -354,7 +324,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster");
nodeStatuses.remove(nodeId);
nodeEvents.remove(nodeId);
notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED, Collections.emptySet()));
notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED));
}
@Override
@ -367,6 +337,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return status == null ? null : status.getState();
}
@Override
public List<NodeConnectionStatus> getConnectionStatuses() {
return new ArrayList<>(nodeStatuses.values());
}
@Override
public Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates() {
final Map<NodeConnectionState, List<NodeIdentifier>> connectionStates = new HashMap<>();
@ -405,58 +380,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
}
@Override
public synchronized void updateNodeRoles(final NodeIdentifier nodeId, final Set<String> roles) {
boolean updated = false;
while (!updated) {
final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeId);
if (currentStatus == null) {
throw new UnknownNodeException("Cannot update roles for " + nodeId + " to " + roles + " because the node is not part of this cluster");
}
if (currentStatus.getRoles().equals(roles)) {
logger.debug("Roles for {} already up-to-date as {}", nodeId, roles);
return;
}
final NodeConnectionStatus updatedStatus = new NodeConnectionStatus(currentStatus, roles);
updated = replaceNodeStatus(nodeId, currentStatus, updatedStatus);
if (updated) {
logger.info("Updated Roles of {} from {} to {}", nodeId, currentStatus, updatedStatus);
notifyOthersOfNodeStatusChange(updatedStatus);
}
}
// If any other node contains any of the given roles, revoke the role from the other node.
for (final String role : roles) {
for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : nodeStatuses.entrySet()) {
if (entry.getKey().equals(nodeId)) {
continue;
}
updated = false;
while (!updated) {
final NodeConnectionStatus status = entry.getValue();
if (status.getRoles().contains(role)) {
final Set<String> newRoles = new HashSet<>(status.getRoles());
newRoles.remove(role);
final NodeConnectionStatus updatedStatus = new NodeConnectionStatus(status, newRoles);
updated = replaceNodeStatus(entry.getKey(), status, updatedStatus);
if (updated) {
logger.info("Updated Roles of {} from {} to {}", nodeId, status, updatedStatus);
notifyOthersOfNodeStatusChange(updatedStatus);
}
} else {
updated = true;
}
}
}
}
}
@Override
public NodeIdentifier getNodeIdentifier(final String uuid) {
for (final NodeIdentifier nodeId : nodeStatuses.keySet()) {
@ -468,48 +391,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return null;
}
// method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
// is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
// before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
// seen by other nodes).
@Override
public synchronized void addRole(final String clusterRole) {
final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
final NodeConnectionStatus status = getConnectionStatus(localNodeId);
final Set<String> roles = new HashSet<>();
if (status != null) {
roles.addAll(status.getRoles());
}
final boolean roleAdded = roles.add(clusterRole);
if (roleAdded) {
updateNodeRoles(localNodeId, roles);
logger.info("Cluster role {} added. This node is now responsible for the following roles: {}", clusterRole, roles);
}
}
// method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
// is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
// before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
// seen by other nodes).
@Override
public synchronized void removeRole(final String clusterRole) {
final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
final NodeConnectionStatus status = getConnectionStatus(localNodeId);
final Set<String> roles = new HashSet<>();
if (status != null) {
roles.addAll(status.getRoles());
}
final boolean roleRemoved = roles.remove(clusterRole);
if (roleRemoved) {
updateNodeRoles(localNodeId, roles);
logger.info("Cluster role {} removed. This node is now responsible for the following roles: {}", clusterRole, roles);
}
}
@Override
public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState... states) {
final Set<NodeConnectionState> statesOfInterest = new HashSet<>();
@ -531,10 +412,14 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public NodeIdentifier getPrimaryNode() {
return nodeStatuses.values().stream()
.filter(status -> status.getRoles().contains(ClusterRoles.PRIMARY_NODE))
final String primaryNodeAddress = leaderElectionManager.getLeader(ClusterRoles.PRIMARY_NODE);
if (primaryNodeAddress == null) {
return null;
}
return nodeStatuses.keySet().stream()
.filter(nodeId -> primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + nodeId.getSocketPort()))
.findFirst()
.map(status -> status.getNodeIdentifier())
.orElse(null);
}
@ -561,6 +446,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return null;
}
if (electedNodeAddress == null) {
logger.debug("There is currently no elected active Cluster Coordinator");
return null;
}
final int colonLoc = electedNodeAddress.indexOf(':');
if (colonLoc < 1) {
if (warnOnError) {
@ -683,12 +573,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, status);
logger.debug("State of cluster nodes is now {}", nodeStatuses);
latestUpdateId.updateAndGet(curVal -> Math.max(curVal, status.getUpdateIdentifier()));
if (currentState == null || currentState != status.getState()) {
// We notify all nodes of the status change if either this node is the current cluster coordinator, OR if the node was
// the cluster coordinator and no longer is. This is done because if a user disconnects the cluster coordinator, we need
// to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still
// believe that this node is connected to the cluster.
final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR));
final boolean notifyAllNodes = isActiveClusterCoordinator();
if (notifyAllNodes) {
logger.debug("Notifying all nodes that status changed from {} to {}", currentStatus, status);
} else {
@ -789,7 +677,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
request.setDataFlow(new StandardDataFlow(flowService.createDataFlow()));
request.setNodeConnectionStatuses(new ArrayList<>(nodeStatuses.values()));
request.setNodeConnectionStatuses(getConnectionStatuses());
request.setComponentRevisions(revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
// Issue a reconnection request to the node.
@ -848,43 +736,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) {
final StringBuilder sb = new StringBuilder();
if (oldStatus != null && status.getState() == oldStatus.getState()) {
// Check if roles changed
final Set<String> oldRoles = oldStatus.getRoles();
final Set<String> newRoles = status.getRoles();
final Set<String> rolesRemoved = new HashSet<>(oldRoles);
rolesRemoved.removeAll(newRoles);
final Set<String> rolesAdded = new HashSet<>(newRoles);
rolesAdded.removeAll(oldRoles);
if (!rolesRemoved.isEmpty()) {
sb.append("Relinquished role");
if (rolesRemoved.size() != 1) {
sb.append("s");
}
sb.append(" ").append(rolesRemoved);
}
if (!rolesAdded.isEmpty()) {
if (sb.length() > 0) {
sb.append("; ");
}
sb.append("Acquired role");
if (rolesAdded.size() != 1) {
sb.append("s");
}
sb.append(" ").append(rolesAdded);
}
} else {
if (oldStatus == null || status.getState() != oldStatus.getState()) {
sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString());
if (status.getState() == NodeConnectionState.CONNECTED) {
sb.append(" (Roles=").append(status.getRoles().toString()).append(")");
} else if (status.getDisconnectReason() != null) {
if (status.getDisconnectReason() != null) {
sb.append(" due to ").append(status.getDisconnectReason());
} else if (status.getDisconnectCode() != null) {
sb.append(" due to ").append(status.getDisconnectCode().toString());
@ -899,35 +753,30 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
logger.debug("Handling request {}", statusChangeMessage);
boolean updated = false;
while (!updated) {
final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId());
final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId());
// Either remove the value from the map or update the map depending on the connection state
if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
updated = nodeStatuses.remove(nodeId, oldStatus);
} else {
updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus);
}
if (updated) {
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
logger.debug("State of cluster nodes is now {}", nodeStatuses);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
final String summary = summarizeStatusChange(oldStatus, status);
if (!StringUtils.isEmpty(summary)) {
addNodeEvent(nodeId, summary);
}
// Update our counter so that we are in-sync with the cluster on the
// most up-to-date version of the NodeConnectionStatus' Update Identifier.
// We do this so that we can accurately compare status updates that are generated
// locally against those generated from other nodes in the cluster.
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
}
// Either remove the value from the map or update the map depending on the connection state
if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
nodeStatuses.remove(nodeId, oldStatus);
} else {
nodeStatuses.put(nodeId, updatedStatus);
}
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
logger.debug("State of cluster nodes is now {}", nodeStatuses);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
final String summary = summarizeStatusChange(oldStatus, status);
if (!StringUtils.isEmpty(summary)) {
addNodeEvent(nodeId, summary);
}
// Update our counter so that we are in-sync with the cluster on the
// most up-to-date version of the NodeConnectionStatus' Update Identifier.
// We do this so that we can accurately compare status updates that are generated
// locally against those generated from other nodes in the cluster.
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
if (isActiveClusterCoordinator()) {
notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus());
}
@ -985,7 +834,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting");
}
status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), getRoles(resolvedNodeIdentifier));
status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis());
updateNodeStatus(status);
DataFlow dataFlow = null;
@ -1009,7 +858,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return new ConnectionResponse(tryAgainSeconds);
}
return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, new ArrayList<>(nodeStatuses.values()),
return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, getConnectionStatuses(),
revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
}
@ -1111,7 +960,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public void setConnected(final boolean connected) {
this.connected = connected;
this.coordinatorAddress = null; // if connection state changed, we are not sure about the coordinator. Check for address again.
}
@Override

View File

@ -20,6 +20,7 @@ package org.apache.nifi.cluster.spring;
import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.revision.RevisionManager;
@ -42,8 +43,9 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste
final EventReporter eventReporter = applicationContext.getBean("eventReporter", EventReporter.class);
final ClusterNodeFirewall clusterFirewall = applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);
final LeaderElectionManager electionManager = applicationContext.getBean("leaderElectionManager", LeaderElectionManager.class);
nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, revisionManager, properties);
nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, clusterFirewall, revisionManager, properties);
}
return nodeClusterCoordinator;

View File

@ -35,6 +35,12 @@
<property name="properties" ref="nifiProperties"/>
</bean>
<!-- Leader Election Manager -->
<bean id="leaderElectionManager" class="org.apache.nifi.spring.LeaderElectionManagerFactoryBean">
<property name="numThreads" value="4" />
<property name="properties" ref="nifiProperties" />
</bean>
<!-- Cluster Coordinator -->
<bean id="clusterCoordinator" class="org.apache.nifi.cluster.spring.NodeClusterCoordinatorFactoryBean">
<property name="properties" ref="nifiProperties"/>

View File

@ -169,8 +169,8 @@ public class TestAbstractHeartbeatMonitor {
private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state, Collections.emptySet());
return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, Collections.emptySet(), 0, 0, 0, 0);
final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state);
return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, 0, 0, 0, 0);
}
private TestFriendlyHeartbeatMonitor createMonitor(final ClusterCoordinator coordinator) {
@ -195,7 +195,7 @@ public class TestAbstractHeartbeatMonitor {
@Override
public synchronized void requestNodeConnect(NodeIdentifier nodeId, String userDn) {
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, Collections.emptySet()));
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING));
}
@Override
@ -205,17 +205,17 @@ public class TestAbstractHeartbeatMonitor {
@Override
public synchronized void finishNodeConnection(NodeIdentifier nodeId) {
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
}
@Override
public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, Collections.emptySet()));
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
}
@Override
public synchronized void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, Collections.emptySet()));
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
}
@Override
@ -246,10 +246,6 @@ public class TestAbstractHeartbeatMonitor {
events.add(new ReportedEvent(nodeId, severity, event));
}
@Override
public void updateNodeRoles(NodeIdentifier nodeId, Set<String> roles) {
}
synchronized List<ReportedEvent> getEvents() {
return new ArrayList<>(events);
}
@ -309,18 +305,20 @@ public class TestAbstractHeartbeatMonitor {
return false;
}
@Override
public void addRole(String clusterRole) {
}
@Override
public void removeRole(String clusterRole) {
}
@Override
public NodeIdentifier getLocalNodeIdentifier() {
return null;
}
@Override
public List<NodeConnectionStatus> getConnectionStatuses() {
return Collections.emptyList();
}
@Override
public boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long qualifyingUpdateId) {
return false;
}
}
@ -360,5 +358,10 @@ public class TestAbstractHeartbeatMonitor {
mutex.wait();
}
}
@Override
public String getHeartbeatAddress() {
return "localhost";
}
}
}

View File

@ -27,7 +27,6 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -162,7 +161,7 @@ public class TestThreadPoolRequestReplicator {
nodeIds.add(nodeId);
final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class);
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
final AtomicInteger requestCount = new AtomicInteger(0);
final ThreadPoolRequestReplicator replicator
@ -209,7 +208,7 @@ public class TestThreadPoolRequestReplicator {
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenAnswer(new Answer<NodeConnectionStatus>() {
@Override
public NodeConnectionStatus answer(InvocationOnMock invocation) throws Throwable {
return new NodeConnectionStatus(invocation.getArgumentAt(0, NodeIdentifier.class), NodeConnectionState.CONNECTED, Collections.emptySet());
return new NodeConnectionStatus(invocation.getArgumentAt(0, NodeIdentifier.class), NodeConnectionState.CONNECTED);
}
});

View File

@ -78,7 +78,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
nodeStatuses.add(updatedStatus);
@ -95,10 +95,10 @@ public class TestNodeClusterCoordinator {
public void testConnectionResponseIndicatesAllNodes() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
// Create a connection request message and send to the coordinator
final NodeIdentifier requestedNodeId = createNodeId(6);
@ -133,7 +133,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
@ -171,7 +171,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
@ -265,10 +265,10 @@ public class TestNodeClusterCoordinator {
public void testGetConnectionStates() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = coordinator.getConnectionStates();
assertEquals(4, stateMap.size());
@ -295,10 +295,10 @@ public class TestNodeClusterCoordinator {
public void testGetNodeIdentifiers() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
final Set<NodeIdentifier> connectedIds = coordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
assertEquals(2, connectedIds.size());
@ -322,8 +322,8 @@ public class TestNodeClusterCoordinator {
public void testRequestNodeDisconnect() throws InterruptedException {
// Add a connected node
final NodeIdentifier nodeId1 = createNodeId(1);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED));
// wait for the status change message and clear it
while (nodeStatuses.isEmpty()) {
@ -347,8 +347,8 @@ public class TestNodeClusterCoordinator {
// Add a connected node
final NodeIdentifier nodeId1 = createNodeId(1);
final NodeIdentifier nodeId2 = createNodeId(2);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED));
// wait for the status change message and clear it
while (nodeStatuses.isEmpty()) {
@ -375,8 +375,8 @@ public class TestNodeClusterCoordinator {
final NodeIdentifier nodeId1 = createNodeId(1);
final NodeIdentifier nodeId2 = createNodeId(2);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED));
// wait for the status change message and clear it
while (nodeStatuses.size() < 2) {
@ -385,7 +385,7 @@ public class TestNodeClusterCoordinator {
nodeStatuses.clear();
final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED,
DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null);
DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L);
final NodeStatusChangeMessage msg = new NodeStatusChangeMessage();
msg.setNodeId(nodeId1);
msg.setNodeConnectionStatus(oldStatus);
@ -396,61 +396,6 @@ public class TestNodeClusterCoordinator {
assertTrue(nodeStatuses.isEmpty());
}
@Test(timeout = 5000)
public void testUpdateNodeRoles() throws InterruptedException {
// Add a connected node
final NodeIdentifier nodeId1 = createNodeId(1);
final NodeIdentifier nodeId2 = createNodeId(2);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it
while (nodeStatuses.isEmpty()) {
Thread.sleep(10L);
}
nodeStatuses.clear();
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it
while (nodeStatuses.isEmpty()) {
Thread.sleep(10L);
}
nodeStatuses.clear();
// Update role of node 1 to primary node
coordinator.updateNodeRoles(nodeId1, Collections.singleton(ClusterRoles.PRIMARY_NODE));
// wait for the status change message
while (nodeStatuses.isEmpty()) {
Thread.sleep(10L);
}
// verify the message
final NodeConnectionStatus status = nodeStatuses.get(0);
assertNotNull(status);
assertEquals(nodeId1, status.getNodeIdentifier());
assertEquals(NodeConnectionState.CONNECTED, status.getState());
assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), status.getRoles());
nodeStatuses.clear();
// Update role of node 2 to primary node. This should trigger 2 status changes -
// node 1 should lose primary role & node 2 should gain it
coordinator.updateNodeRoles(nodeId2, Collections.singleton(ClusterRoles.PRIMARY_NODE));
// wait for the status change message
while (nodeStatuses.size() < 2) {
Thread.sleep(10L);
}
final NodeConnectionStatus status1 = nodeStatuses.get(0);
final NodeConnectionStatus status2 = nodeStatuses.get(1);
final NodeConnectionStatus id1Msg = (status1.getNodeIdentifier().equals(nodeId1)) ? status1 : status2;
final NodeConnectionStatus id2Msg = (status1.getNodeIdentifier().equals(nodeId2)) ? status1 : status2;
assertNotSame(id1Msg, id2Msg);
assertTrue(id1Msg.getRoles().isEmpty());
assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), id2Msg.getRoles());
}
@Test
public void testProposedIdentifierResolvedIfConflict() {
final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false);

View File

@ -24,6 +24,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.util.NiFiProperties;
@ -91,6 +95,19 @@ public class Cluster {
return Collections.unmodifiableSet(nodes);
}
public CuratorFramework createCuratorClient() {
final RetryPolicy retryPolicy = new RetryNTimes(20, 500);
final CuratorFramework curatorClient = CuratorFrameworkFactory.builder()
.connectString(getZooKeeperConnectString())
.sessionTimeoutMs(3000)
.connectionTimeoutMs(3000)
.retryPolicy(retryPolicy)
.defaultData(new byte[0])
.build();
curatorClient.start();
return curatorClient;
}
public Node createNode() {
final Map<String, String> addProps = new HashMap<>();
@ -106,11 +123,24 @@ public class Cluster {
public Node waitForClusterCoordinator(final long time, final TimeUnit timeUnit) {
return ClusterUtils.waitUntilNonNull(time, timeUnit,
() -> getNodes().stream().filter(node -> node.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null));
() -> getNodes().stream().filter(node -> node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null));
}
public Node waitForPrimaryNode(final long time, final TimeUnit timeUnit) {
return ClusterUtils.waitUntilNonNull(time, timeUnit,
() -> getNodes().stream().filter(node -> node.getRoles().contains(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null));
() -> getNodes().stream().filter(node -> node.hasRole(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null));
}
/**
* Waits for each node in the cluster to connect. The time given is the maximum amount of time to wait for each node to connect, not for
* the entire cluster to connect.
*
* @param time the max amount of time to wait for a node to connect
* @param timeUnit the unit of time that the given <code>time</code> value represents
*/
public void waitUntilAllNodesConnected(final long time, final TimeUnit timeUnit) {
for (final Node node : nodes) {
node.waitUntilConnected(time, timeUnit);
}
}
}

View File

@ -17,222 +17,219 @@
package org.apache.nifi.cluster.integration;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class ClusterConnectionIT {
private Cluster cluster;
@BeforeClass
public static void setup() {
System.setProperty("nifi.properties.file.path", "src/test/resources/conf/nifi.properties");
}
@Test(timeout = 20000)
public void testSingleNode() throws InterruptedException {
final Cluster cluster = new Cluster();
@Before
public void createCluster() {
cluster = new Cluster();
cluster.start();
}
try {
final Node firstNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS);
firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, TimeUnit.SECONDS);
} finally {
@After
public void destroyCluster() {
if (cluster != null) {
cluster.stop();
}
}
@Test(timeout = 20000)
public void testSingleNode() throws InterruptedException {
final Node firstNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS);
firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, TimeUnit.SECONDS);
}
@Test(timeout = 60000)
public void testThreeNodeCluster() throws InterruptedException {
final Cluster cluster = new Cluster();
cluster.start();
cluster.createNode();
cluster.createNode();
cluster.createNode();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
final Node thirdNode = cluster.createNode();
cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 3 Connected ****");
final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
final Node primaryNode = cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
System.out.println("\n\n");
System.out.println("Cluster Coordinator = " + clusterCoordinator);
System.out.println("Primary Node = " + primaryNode);
System.out.println("\n\n");
} finally {
cluster.stop();
}
final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
final Node primaryNode = cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
System.out.println("\n\n");
System.out.println("Cluster Coordinator = " + clusterCoordinator);
System.out.println("Primary Node = " + primaryNode);
System.out.println("\n\n");
}
@Test(timeout = 60000)
public void testNewCoordinatorElected() throws IOException {
final Cluster cluster = new Cluster();
cluster.start();
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
clusterCoordinator.stop();
final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
clusterCoordinator.stop();
final Node otherNode = firstNode == clusterCoordinator ? secondNode : firstNode;
otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
final Node otherNode = firstNode == clusterCoordinator ? secondNode : firstNode;
otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS);
}
@Test(timeout = 60000)
public void testReconnectGetsCorrectClusterTopology() throws IOException {
final Cluster cluster = new Cluster();
cluster.start();
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
final Node thirdNode = cluster.createNode();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
final Node thirdNode = cluster.createNode();
cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 3 Connected ****");
// shutdown node
secondNode.stop();
// shutdown node
secondNode.stop();
System.out.println("\n\nNode 2 Shut Down\n\n");
System.out.println("\n\nNode 2 Shut Down\n\n");
// wait for node 1 and 3 to recognize that node 2 is gone
Stream.of(firstNode, thirdNode).forEach(node -> {
node.assertNodeDisconnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS);
});
// wait for node 1 and 3 to recognize that node 2 is gone
Stream.of(firstNode, thirdNode).forEach(node -> {
node.assertNodeDisconnects(secondNode.getIdentifier(), 5, TimeUnit.SECONDS);
});
// restart node
secondNode.start();
System.out.println("\n\nNode 2 Restarted\n\n");
// restart node
secondNode.start();
System.out.println("\n\nNode 2 Restarted\n\n");
secondNode.waitUntilConnected(20, TimeUnit.SECONDS);
System.out.println("\n\nNode 2 Reconnected\n\n");
secondNode.waitUntilConnected(20, TimeUnit.SECONDS);
System.out.println("\n\nNode 2 Reconnected\n\n");
// wait for all 3 nodes to agree that node 2 is connected
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
() -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED);
});
// wait for all 3 nodes to agree that node 2 is connected
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
() -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED);
});
// Ensure that all 3 nodes see a cluster of 3 connected nodes.
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
node.assertNodeIsConnected(firstNode.getIdentifier());
node.assertNodeIsConnected(secondNode.getIdentifier());
node.assertNodeIsConnected(thirdNode.getIdentifier());
});
// Ensure that all 3 nodes see a cluster of 3 connected nodes.
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
node.assertNodeIsConnected(firstNode.getIdentifier());
node.assertNodeIsConnected(secondNode.getIdentifier());
node.assertNodeIsConnected(thirdNode.getIdentifier());
});
// Ensure that we get both a cluster coordinator and a primary node elected
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
// Ensure that we get both a cluster coordinator and a primary node elected
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
}
@Test(timeout = 60000)
public void testRestartAllNodes() throws IOException {
final Cluster cluster = new Cluster();
cluster.start();
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
final Node thirdNode = cluster.createNode();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
final Node thirdNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 3 Connected ****");
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 3 Connected ****");
// shutdown node
firstNode.stop();
secondNode.stop();
thirdNode.stop();
// shutdown node
firstNode.stop();
secondNode.stop();
thirdNode.stop();
System.out.println("\n\nRestarting all nodes\n\n");
thirdNode.start();
firstNode.start();
secondNode.start();
System.out.println("\n\nRestarting all nodes\n\n");
thirdNode.start();
firstNode.start();
secondNode.start();
cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
node.waitUntilConnected(10, TimeUnit.SECONDS);
});
// wait for all 3 nodes to agree that node 2 is connected
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
() -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED);
});
// wait for all 3 nodes to agree that node 2 is connected
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
() -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED);
});
// Ensure that all 3 nodes see a cluster of 3 connected nodes.
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
node.assertNodeConnects(firstNode.getIdentifier(), 10, TimeUnit.SECONDS);
node.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS);
node.assertNodeConnects(thirdNode.getIdentifier(), 10, TimeUnit.SECONDS);
});
// Ensure that all 3 nodes see a cluster of 3 connected nodes.
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
node.assertNodeConnects(firstNode.getIdentifier(), 10, TimeUnit.SECONDS);
node.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS);
node.assertNodeConnects(thirdNode.getIdentifier(), 10, TimeUnit.SECONDS);
});
// Ensure that we get both a cluster coordinator and a primary node elected
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
// Ensure that we get both a cluster coordinator and a primary node elected
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
}
@Test(timeout = 30000)
public void testHeartbeatsMonitored() throws IOException {
final Cluster cluster = new Cluster();
cluster.start();
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
final Node nodeToSuspend = firstNode;
final Node otherNode = secondNode;
secondNode.suspendHeartbeating();
nodeToSuspend.suspendHeartbeating();
// Heartbeat interval in nifi.properties is set to 1 sec. This means that the node should be kicked out
// due to lack of heartbeat after 8 times this amount of time, or 8 seconds.
firstNode.assertNodeDisconnects(secondNode.getIdentifier(), 12, TimeUnit.SECONDS);
// Heartbeat interval in nifi.properties is set to 1 sec. This means that the node should be kicked out
// due to lack of heartbeat after 8 times this amount of time, or 8 seconds.
otherNode.assertNodeDisconnects(nodeToSuspend.getIdentifier(), 12, TimeUnit.SECONDS);
secondNode.resumeHeartbeating();
firstNode.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
nodeToSuspend.resumeHeartbeating();
otherNode.assertNodeConnects(nodeToSuspend.getIdentifier(), 10, TimeUnit.SECONDS);
}
@Test
public void testNodeInheritsClusterTopologyOnHeartbeat() throws InterruptedException {
final Node node1 = cluster.createNode();
final Node node2 = cluster.createNode();
final Node node3 = cluster.createNode();
cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
final Node coordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
final NodeIdentifier node4NotReallyInCluster = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 9284, "localhost", 9285, null, false, null);
final Map<NodeIdentifier, NodeConnectionStatus> replacementStatuses = new HashMap<>();
replacementStatuses.put(node1.getIdentifier(), new NodeConnectionStatus(node1.getIdentifier(), DisconnectionCode.USER_DISCONNECTED));
replacementStatuses.put(node4NotReallyInCluster, new NodeConnectionStatus(node4NotReallyInCluster, NodeConnectionState.CONNECTING));
// reset coordinator status so that other nodes with get its now-fake view of the cluster
coordinator.getClusterCoordinator().resetNodeStatuses(replacementStatuses);
final List<NodeConnectionStatus> expectedStatuses = coordinator.getClusterCoordinator().getConnectionStatuses();
// give nodes a bit to heartbeat in. We need to wait long enough that each node heartbeats.
// But we need to not wait more than 8 seconds because that's when nodes start getting kicked out.
Thread.sleep(6000L);
for (final Node node : new Node[] {node1, node2, node3}) {
assertEquals(expectedStatuses, node.getClusterCoordinator().getConnectionStatuses());
}
}
}

View File

@ -24,13 +24,21 @@ import java.util.function.Supplier;
public class ClusterUtils {
public static void waitUntilConditionMet(final long time, final TimeUnit timeUnit, final BooleanSupplier test) {
waitUntilConditionMet(time, timeUnit, test, null);
}
public static void waitUntilConditionMet(final long time, final TimeUnit timeUnit, final BooleanSupplier test, final Supplier<String> errorMessageSupplier) {
final long nanosToWait = timeUnit.toNanos(time);
final long start = System.nanoTime();
final long maxTime = start + nanosToWait;
while (!test.getAsBoolean()) {
if (System.nanoTime() > maxTime) {
throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit);
if (errorMessageSupplier == null) {
throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit);
} else {
throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit + " : " + errorMessageSupplier.get());
}
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.cluster.integration;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
@ -31,15 +32,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender;
import org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender;
import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
@ -52,6 +53,8 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
@ -75,9 +78,10 @@ public class Node {
private final RevisionManager revisionManager;
private NodeClusterCoordinator clusterCoordinator;
private CuratorNodeProtocolSender protocolSender;
private NodeProtocolSender protocolSender;
private FlowController flowController;
private StandardFlowService flowService;
private LeaderElectionManager electionManager;
private ProtocolListener protocolListener;
@ -114,6 +118,8 @@ public class Node {
revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.<Revision> emptyList());
electionManager = new CuratorLeaderElectionManager(4, nodeProperties);
}
@ -127,7 +133,8 @@ public class Node {
final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties,
null, null, StringEncryptor.createEncryptor(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, VariableRegistry.EMPTY_REGISTRY);
null, null, StringEncryptor.createEncryptor(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator,
heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY);
try {
flowController.initializeFlow();
@ -212,23 +219,18 @@ public class Node {
}
}
public Set<String> getRoles() {
final NodeConnectionStatus status = getConnectionStatus();
return status == null ? Collections.emptySet() : status.getRoles();
}
public NodeConnectionStatus getConnectionStatus() {
return clusterCoordinator.getConnectionStatus(nodeId);
}
@SuppressWarnings("unchecked")
private CuratorNodeProtocolSender createNodeProtocolSender() {
private NodeProtocolSender createNodeProtocolSender() {
final SocketConfiguration socketConfig = new SocketConfiguration();
socketConfig.setSocketTimeout(3000);
socketConfig.setReuseAddress(true);
final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT);
final CuratorNodeProtocolSender protocolSender = new CuratorNodeProtocolSender(socketConfig, protocolContext, nodeProperties);
final NodeProtocolSender protocolSender = new LeaderElectionNodeProtocolSender(socketConfig, protocolContext, electionManager);
return protocolSender;
}
@ -267,11 +269,11 @@ public class Node {
}
final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
return new NodeClusterCoordinator(protocolSenderListener, eventReporter, null, revisionManager, nodeProperties);
return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, null, revisionManager, nodeProperties);
}
public ClusterCoordinator getClusterCoordinator() {
public NodeClusterCoordinator getClusterCoordinator() {
return clusterCoordinator;
}
@ -295,8 +297,22 @@ public class Node {
ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> isConnected());
}
private String getClusterAddress() {
final InetSocketAddress address = nodeProperties.getClusterNodeProtocolAddress();
return address.getHostName() + ":" + address.getPort();
}
public boolean hasRole(final String roleName) {
final String leaderAddress = electionManager.getLeader(roleName);
if (leaderAddress == null) {
return false;
}
return leaderAddress.equals(getClusterAddress());
}
public void waitUntilElectedForRole(final String roleName, final long time, final TimeUnit timeUnit) {
ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> getRoles().contains(roleName));
ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> hasRole(roleName));
}
// Assertions
@ -309,7 +325,8 @@ public class Node {
*/
public void assertNodeConnects(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) {
ClusterUtils.waitUntilConditionMet(time, timeUnit,
() -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.CONNECTED);
() -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.CONNECTED,
() -> "Connection Status is " + getClusterCoordinator().getConnectionStatus(nodeId).toString());
}
@ -322,7 +339,8 @@ public class Node {
*/
public void assertNodeDisconnects(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) {
ClusterUtils.waitUntilConditionMet(time, timeUnit,
() -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.DISCONNECTED);
() -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.DISCONNECTED,
() -> "Connection Status is " + getClusterCoordinator().getConnectionStatus(nodeId).toString());
}

View File

@ -65,7 +65,6 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.DataAuthorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
@ -74,6 +73,7 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
@ -96,7 +96,6 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.label.StandardLabel;
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
@ -388,6 +387,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
bulletinRepo,
/* cluster coordinator */ null,
/* heartbeat monitor */ null,
/* leader election manager */ null,
/* variable registry */ variableRegistry);
}
@ -401,7 +401,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final BulletinRepository bulletinRepo,
final ClusterCoordinator clusterCoordinator,
final HeartbeatMonitor heartbeatMonitor,
VariableRegistry variableRegistry) {
final LeaderElectionManager leaderElectionManager,
final VariableRegistry variableRegistry) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
@ -413,7 +414,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
protocolSender,
bulletinRepo,
clusterCoordinator,
heartbeatMonitor, variableRegistry);
heartbeatMonitor,
leaderElectionManager,
variableRegistry);
return flowController;
}
@ -429,6 +432,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final BulletinRepository bulletinRepo,
final ClusterCoordinator clusterCoordinator,
final HeartbeatMonitor heartbeatMonitor,
final LeaderElectionManager leaderElectionManager,
final VariableRegistry variableRegistry) {
maxTimerDrivenThreads = new AtomicInteger(10);
@ -577,10 +581,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
this.leaderElectionManager = leaderElectionManager;
if (configuredForClustering) {
leaderElectionManager = new CuratorLeaderElectionManager(4, nifiProperties);
heartbeater = new ClusterProtocolHeartbeater(protocolSender, nifiProperties);
heartbeater = new ClusterProtocolHeartbeater(protocolSender, clusterCoordinator, leaderElectionManager);
// Check if there is already a cluster coordinator elected. If not, go ahead
// and register for coordinator role. If there is already one elected, do not register until
@ -600,7 +604,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager.start();
} else {
leaderElectionManager = null;
heartbeater = null;
}
}
@ -3305,6 +3308,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
private void registerForClusterCoordinator() {
final String participantId = heartbeatMonitor.getHeartbeatAddress();
leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() {
@Override
public synchronized void onLeaderRelinquish() {
@ -3318,24 +3323,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// call start() when we become the leader, and this will ensure that initialization is handled. The heartbeat monitor
// then will check the zookeeper znode to check if it is the cluster coordinator before kicking any nodes out of the
// cluster.
if (clusterCoordinator != null) {
clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR);
}
}
@Override
public synchronized void onLeaderElection() {
LOG.info("This node elected Active Cluster Coordinator");
heartbeatMonitor.start(); // ensure heartbeat monitor is started
if (clusterCoordinator != null) {
clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR);
}
}
});
}, participantId);
}
private void registerForPrimaryNode() {
final String participantId = heartbeatMonitor.getHeartbeatAddress();
leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new LeaderElectionStateChangeListener() {
@Override
public void onLeaderElection() {
@ -3346,7 +3346,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public void onLeaderRelinquish() {
setPrimary(false);
}
});
}, participantId);
}
/**
@ -3852,7 +3852,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public void run() {
try {
try (final NarCloseable narCloseable = NarCloseable.withFrameworkNar()) {
if (heartbeatsSuspended.get()) {
return;
}
@ -3914,6 +3914,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
hbPayload.setClusterStatus(clusterCoordinator.getConnectionStatuses());
// create heartbeat message
final NodeIdentifier nodeId = getNodeId();
@ -3922,15 +3923,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return null;
}
final Set<String> roles = new HashSet<>();
if (bean.isPrimary()) {
roles.add(ClusterRoles.PRIMARY_NODE);
}
if (clusterCoordinator.isActiveClusterCoordinator()) {
roles.add(ClusterRoles.CLUSTER_COORDINATOR);
}
final Heartbeat heartbeat = new Heartbeat(nodeId, roles, connectionStatus, hbPayload.marshal());
final Heartbeat heartbeat = new Heartbeat(nodeId, connectionStatus, hbPayload.marshal());
final HeartbeatMessage message = new HeartbeatMessage();
message.setHeartbeat(heartbeat);

View File

@ -35,7 +35,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -855,9 +854,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// mark the node as clustered
controller.setClustered(true, response.getInstanceId());
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles();
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, roles));
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
// start the processors as indicated by the dataflow
controller.onFlowInitialized(autoResumeState);

View File

@ -16,6 +16,34 @@
*/
package org.apache.nifi.controller;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.Authorizer;
@ -41,7 +69,6 @@ import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.fingerprint.FingerprintException;
@ -66,7 +93,6 @@ import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
@ -86,33 +112,6 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
/**
*/
public class StandardFlowSynchronizer implements FlowSynchronizer {
@ -362,10 +361,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// enable all the original controller services
ControllerServiceLoader.enableControllerServices(controllerServices, controller, encryptor, autoResumeState);
} else {
for (final Element serviceElement : serviceElements) {
updateControllerService(controller, serviceElement, encryptor);
}
}
}
@ -507,22 +502,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
return baos.toByteArray();
}
private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
final ControllerServiceState dtoState = ControllerServiceState.valueOf(dto.getState());
final boolean dtoEnabled = (dtoState == ControllerServiceState.ENABLED || dtoState == ControllerServiceState.ENABLING);
final ControllerServiceNode serviceNode = controller.getControllerServiceNode(dto.getId());
final ControllerServiceState serviceState = serviceNode.getState();
final boolean serviceEnabled = (serviceState == ControllerServiceState.ENABLED || serviceState == ControllerServiceState.ENABLING);
if (dtoEnabled && !serviceEnabled) {
controller.enableControllerService(controller.getControllerServiceNode(dto.getId()));
} else if (!dtoEnabled && serviceEnabled) {
controller.disableControllerService(controller.getControllerServiceNode(dto.getId()));
}
}
private ReportingTaskNode getOrCreateReportingTask(final FlowController controller, final ReportingTaskDTO dto, final boolean controllerInitialized, final boolean existingFlowEmpty)
throws ReportingTaskInstantiationException {
@ -667,12 +646,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// get the real process group and ID
final ProcessGroup processGroup = controller.getGroup(processGroupDto.getId());
// Update Controller Services
final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
for (final Element serviceNodeElement : serviceNodeList) {
updateControllerService(controller, serviceNodeElement, encryptor);
}
// processors & ports cannot be updated - they must be the same. Except for the scheduled state.
final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor");
for (final Element processorElement : processorNodeList) {

View File

@ -17,25 +17,28 @@
package org.apache.nifi.controller.cluster;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Uses ZooKeeper in order to determine which node is the elected Cluster
* Coordinator and to indicate that this node is part of the cluster. However,
* once the Cluster Coordinator is known, heartbeats are sent directly to the
* Uses Leader Election Manager in order to determine which node is the elected
* Cluster Coordinator and to indicate that this node is part of the cluster.
* Once the Cluster Coordinator is known, heartbeats are sent directly to the
* Cluster Coordinator.
*/
public class ClusterProtocolHeartbeater implements Heartbeater {
@ -43,75 +46,53 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
private static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
private final NodeProtocolSender protocolSender;
private final CuratorFramework curatorClient;
private final String nodesPathPrefix;
private final LeaderElectionManager electionManager;
private final ClusterCoordinator clusterCoordinator;
private final String coordinatorPath;
private volatile String coordinatorAddress;
public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final NiFiProperties nifiProperties) {
public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final ClusterCoordinator clusterCoordinator, final LeaderElectionManager electionManager) {
this.protocolSender = protocolSender;
final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
curatorClient.start();
nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
coordinatorPath = nodesPathPrefix + "/coordinator";
this.clusterCoordinator = clusterCoordinator;
this.electionManager = electionManager;
}
@Override
public String getHeartbeatAddress() throws IOException {
final String curAddress = coordinatorAddress;
if (curAddress != null) {
return curAddress;
final String heartbeatAddress = electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
if (heartbeatAddress == null) {
throw new ProtocolException("Cannot send heartbeat because there is no Cluster Coordinator currently elected");
}
try {
// Get coordinator address and add watcher to change who we are heartbeating to if the value changes.
final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() {
@Override
public void process(final WatchedEvent event) {
coordinatorAddress = null;
}
}).forPath(coordinatorPath);
final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
return address;
} catch (Exception e) {
throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
}
return heartbeatAddress;
}
@Override
public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException {
final String heartbeatAddress = getHeartbeatAddress();
final HeartbeatResponseMessage responseMessage = protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
try {
protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
} catch (final ProtocolException pe) {
// a ProtocolException is likely the result of not being able to communicate
// with the coordinator. If we do get an IOException communicating with the coordinator,
// it will be the cause of the Protocol Exception. In this case, set coordinatorAddress
// to null so that we double-check next time that the coordinator has not changed.
if (pe.getCause() instanceof IOException) {
coordinatorAddress = null;
final byte[] payloadBytes = heartbeatMessage.getHeartbeat().getPayload();
final HeartbeatPayload payload = HeartbeatPayload.unmarshal(payloadBytes);
final List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus();
final Map<NodeIdentifier, Long> updateIdMap = nodeStatusList.stream().collect(
Collectors.toMap(status -> status.getNodeIdentifier(), status -> status.getUpdateIdentifier()));
final List<NodeConnectionStatus> updatedStatuses = responseMessage.getUpdatedNodeStatuses();
if (updatedStatuses != null) {
for (final NodeConnectionStatus updatedStatus : updatedStatuses) {
final NodeIdentifier nodeId = updatedStatus.getNodeIdentifier();
final Long updateId = updateIdMap.get(nodeId);
final boolean updated = clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L : updateId);
if (updated) {
logger.info("After receiving heartbeat response, updated status of {} to {}", updatedStatus.getNodeIdentifier(), updatedStatus);
} else {
logger.debug("After receiving heartbeat response, did not update status of {} to {} because the update is out-of-date", updatedStatus.getNodeIdentifier(), updatedStatus);
}
}
throw pe;
}
}
@Override
public void close() throws IOException {
if (curatorClient != null) {
curatorClient.close();
}
logger.info("ZooKeeper heartbeater closed. Will no longer send Heartbeat messages to ZooKeeper");
}
}

View File

@ -19,12 +19,14 @@ package org.apache.nifi.controller.leader.election;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryForever;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
@ -46,7 +48,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
private volatile boolean stopped = true;
private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
private final Map<String, LeaderElectionStateChangeListener> registeredRoles = new HashMap<>();
private final Map<String, RegisteredRole> registeredRoles = new HashMap<>();
public CuratorLeaderElectionManager(final int threadPoolSize, final NiFiProperties properties) {
leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
@ -75,8 +77,9 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
// Call #register for each already-registered role. This will
// cause us to start listening for leader elections for that
// role again
for (final Map.Entry<String, LeaderElectionStateChangeListener> entry : registeredRoles.entrySet()) {
register(entry.getKey(), entry.getValue());
for (final Map.Entry<String, RegisteredRole> entry : registeredRoles.entrySet()) {
final RegisteredRole role = entry.getValue();
register(entry.getKey(), role.getListener(), role.getParticipantId());
}
logger.info("{} started", this);
@ -88,7 +91,12 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
}
@Override
public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener) {
public void register(String roleName, LeaderElectionStateChangeListener listener) {
register(roleName, listener, null);
}
@Override
public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
logger.debug("{} Registering new Leader Selector for role {}", this, roleName);
if (leaderRoles.containsKey(roleName)) {
@ -105,18 +113,23 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name");
}
registeredRoles.put(roleName, listener);
registeredRoles.put(roleName, new RegisteredRole(participantId, listener));
if (!isStopped()) {
final ElectionListener electionListener = new ElectionListener(roleName, listener);
final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
leaderSelector.autoRequeue();
if (participantId != null) {
leaderSelector.setId(participantId);
}
leaderSelector.start();
final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener);
leaderRoles.put(roleName, leaderRole);
}
logger.info("{} Registered new Leader Selector for role {}", this, roleName);
}
@ -174,6 +187,33 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return role.isLeader();
}
@Override
public synchronized String getLeader(final String roleName) {
final LeaderRole role = leaderRoles.get(roleName);
if (role == null) {
return null;
}
Participant participant;
try {
participant = role.getLeaderSelector().getLeader();
} catch (Exception e) {
logger.debug("Unable to determine leader for role '{}'; returning null", roleName);
return null;
}
if (participant == null) {
return null;
}
final String participantId = participant.getId();
if (StringUtils.isEmpty(participantId)) {
return null;
}
return participantId;
}
private static class LeaderRole {
private final LeaderSelector leaderSelector;
@ -193,6 +233,25 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
}
}
private static class RegisteredRole {
private final LeaderElectionStateChangeListener listener;
private final String participantId;
public RegisteredRole(final String participantId, final LeaderElectionStateChangeListener listener) {
this.participantId = participantId;
this.listener = listener;
}
public LeaderElectionStateChangeListener getListener() {
return listener;
}
public String getParticipantId() {
return participantId;
}
}
private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener {
private final String roleName;

View File

@ -31,7 +31,7 @@ public interface LeaderElectionManager {
void register(String roleName);
/**
* Adds a new role for which a leader is required
* Adds a new role for which a leader is required, without providing a Participant ID
*
* @param roleName the name of the role
* @param listener a listener that will be called when the node gains or relinquishes
@ -39,6 +39,28 @@ public interface LeaderElectionManager {
*/
void register(String roleName, LeaderElectionStateChangeListener listener);
/**
* Adds a new role for which a leader is required, providing the given value for this node as the Participant ID
*
* @param roleName the name of the role
* @param listener a listener that will be called when the node gains or relinquishes
* the role of leader
* @param participantId the ID to register as this node's Participant ID. All nodes will see this as the identifier when
* asking to see who the leader is via the {@link #getLeader(String)} method
*/
void register(String roleName, LeaderElectionStateChangeListener listener, String participantId);
/**
* Returns the Participant ID of the node that is elected the leader, if one was provided when the node registered
* for the role via {@link #register(String, LeaderElectionStateChangeListener, String)}. If there is currently no leader
* known or if the role was registered without providing a Participant ID, this will return <code>null</code>.
*
* @param roleName the name of the role
* @return the Participant ID of the node that is elected leader, or <code>null</code> if either no leader is known or the leader
* did not register with a Participant ID.
*/
String getLeader(String roleName);
/**
* Removes the role with the given name from this manager. If this
* node is the elected leader for the given role, this node will relinquish

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.leader.election;
/**
* <p>
* A LeaderElectionManager to use when running a standalone (un-clustered) NiFi instance
* </p>
*/
public class StandaloneLeaderElectionManager implements LeaderElectionManager {
@Override
public void start() {
}
@Override
public void register(final String roleName) {
}
@Override
public void register(final String roleName, final LeaderElectionStateChangeListener listener) {
}
@Override
public void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
}
@Override
public String getLeader(final String roleName) {
return null;
}
@Override
public void unregister(final String roleName) {
}
@Override
public boolean isLeader(final String roleName) {
return false;
}
@Override
public boolean isStopped() {
return false;
}
@Override
public void stop() {
}
}

View File

@ -908,6 +908,7 @@ public final class FingerprintFactory {
builder.append(dto.getName());
builder.append(dto.getComments());
builder.append(dto.getAnnotationData());
builder.append(dto.getState());
final Map<String, String> properties = dto.getProperties();
if (properties == null) {

View File

@ -22,13 +22,12 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.context.ApplicationContext;
@ -40,8 +39,6 @@ import org.springframework.context.ApplicationContextAware;
@SuppressWarnings("rawtypes")
public class FlowControllerFactoryBean implements FactoryBean, ApplicationContextAware {
private static final Logger LOG = LoggerFactory.getLogger(FlowControllerFactoryBean.class);
private ApplicationContext applicationContext;
private FlowController flowController;
private NiFiProperties properties;
@ -51,6 +48,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
private BulletinRepository bulletinRepository;
private ClusterCoordinator clusterCoordinator;
private VariableRegistry variableRegistry;
private LeaderElectionManager leaderElectionManager;
@Override
public Object getObject() throws Exception {
@ -69,7 +67,9 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
nodeProtocolSender,
bulletinRepository,
clusterCoordinator,
heartbeatMonitor, variableRegistry);
heartbeatMonitor,
leaderElectionManager,
variableRegistry);
} else {
flowController = FlowController.createStandaloneInstance(
flowFileEventRepository,
@ -129,4 +129,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}
public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) {
this.leaderElectionManager = leaderElectionManager;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spring;
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.leader.election.StandaloneLeaderElectionManager;
import org.apache.nifi.util.NiFiProperties;
import org.springframework.beans.factory.FactoryBean;
public class LeaderElectionManagerFactoryBean implements FactoryBean<LeaderElectionManager> {
private int numThreads;
private NiFiProperties properties;
@Override
public LeaderElectionManager getObject() throws Exception {
final boolean isNode = properties.isNode();
if (isNode) {
return new CuratorLeaderElectionManager(numThreads, properties);
} else {
return new StandaloneLeaderElectionManager();
}
}
@Override
public Class<?> getObjectType() {
return LeaderElectionManager.class;
}
@Override
public boolean isSingleton() {
return true;
}
public void setNumThreads(final int numThreads) {
this.numThreads = numThreads;
}
public void setProperties(final NiFiProperties properties) {
this.properties = properties;
}
}

View File

@ -47,6 +47,7 @@
<property name="bulletinRepository" ref="bulletinRepository" />
<property name="clusterCoordinator" ref="clusterCoordinator" />
<property name="variableRegistry" ref="variableRegistry"/>
<property name="leaderElectionManager" ref="leaderElectionManager" />
</bean>
<!-- flow service -->

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Before;
import org.junit.BeforeClass;

View File

@ -18,10 +18,14 @@ package org.apache.nifi.nar;
import java.io.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class NarCloseable implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(NarCloseable.class);
public static NarCloseable withNarLoader() {
final ClassLoader current = Thread.currentThread().getContextClassLoader();
@ -29,6 +33,31 @@ public class NarCloseable implements Closeable {
return new NarCloseable(current);
}
/**
* Creates a Closeable object that can be used to to switch to current class loader to the framework class loader
* and will automatically set the ClassLoader back to the previous class loader when closed
*
* @return a NarCloseable
*/
public static NarCloseable withFrameworkNar() {
final ClassLoader frameworkClassLoader;
try {
frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader();
} catch (final Exception e) {
// This should never happen in a running instance, but it will occur in unit tests
logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without change ClassLoaders.");
if (logger.isDebugEnabled()) {
logger.error("", e);
}
return new NarCloseable(null);
}
final ClassLoader current = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
return new NarCloseable(current);
}
private final ClassLoader toSet;
private NarCloseable(final ClassLoader toSet) {

View File

@ -42,6 +42,7 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@ -65,6 +66,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
@ -261,6 +263,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private AccessPolicyDAO accessPolicyDAO;
private ClusterCoordinator clusterCoordinator;
private HeartbeatMonitor heartbeatMonitor;
private LeaderElectionManager leaderElectionManager;
// administrative services
private AuditService auditService;
@ -3116,19 +3119,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
clusterDto.setGenerated(new Date());
// create node dtos
final Collection<NodeDTO> nodeDtos = new ArrayList<>();
final List<NodeDTO> nodeDtos = clusterCoordinator.getNodeIdentifiers().stream()
.map(nodeId -> getNode(nodeId))
.collect(Collectors.toList());
clusterDto.setNodes(nodeDtos);
for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
if (status == null) {
continue;
}
final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId);
final Set<String> nodeRoles = clusterCoordinator.getConnectionStatus(nodeId).getRoles();
final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId);
nodeDtos.add(dtoFactory.createNodeDTO(nodeId, status, heartbeat, events, nodeRoles));
}
return clusterDto;
}
@ -3142,11 +3136,29 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private NodeDTO getNode(final NodeIdentifier nodeId) {
final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId);
final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId);
final Set<String> roles = clusterCoordinator.getConnectionStatus(nodeId).getRoles();
final Set<String> roles = getRoles(nodeId);
final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId);
return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, roles);
}
private Set<String> getRoles(final NodeIdentifier nodeId) {
final Set<String> roles = new HashSet<>();
final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort();
for (final String roleName : ClusterRoles.getAllRoles()) {
final String leader = leaderElectionManager.getLeader(roleName);
if (leader == null) {
continue;
}
if (leader.equals(nodeAddress)) {
roles.add(roleName);
}
}
return roles;
}
@Override
public void deleteNode(final String nodeId) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -3290,4 +3302,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
public void setBulletinRepository(final BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
}
public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) {
this.leaderElectionManager = leaderElectionManager;
}
}

View File

@ -163,6 +163,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="heartbeatMonitor" ref="heartbeatMonitor" />
<property name="bulletinRepository" ref="bulletinRepository"/>
<property name="leaderElectionManager" ref="leaderElectionManager" />
</bean>
<!-- component ui extension configuration context -->