NIFI-2544: Created integration tests for clustering and addressed a few minor bugs that were found in doing so

This closes #832.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-08-09 16:50:04 -04:00 committed by Bryan Bende
parent 916292994c
commit 76a4a2c48b
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
21 changed files with 1078 additions and 61 deletions

View File

@ -247,6 +247,12 @@ public class NiFiProperties extends Properties {
super();
}
public NiFiProperties copy() {
final NiFiProperties copy = new NiFiProperties();
copy.putAll(this);
return copy;
}
/**
* Factory method to create an instance of the {@link NiFiProperties}. This
* method employs a standard singleton pattern by caching the instance if it

View File

@ -46,16 +46,18 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
@Override
public void stop() throws IOException {
if (!isRunning()) {
throw new IllegalStateException("Instance is already stopped.");
return;
}
listener.stop();
}
@Override
public void start() throws IOException {
if (isRunning()) {
throw new IllegalStateException("Instance is already started.");
return;
}
listener.start();
}

View File

@ -67,9 +67,14 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin
private final ProtocolContext<ProtocolMessage> protocolContext;
private final SocketConfiguration socketConfiguration;
private final int maxThreadsPerRequest;
private int handshakeTimeoutSeconds;
public StandardClusterCoordinationProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
this(socketConfiguration, protocolContext, NiFiProperties.getInstance().getClusterNodeProtocolThreads());
}
public StandardClusterCoordinationProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext, final int maxThreadsPerRequest) {
if (socketConfiguration == null) {
throw new IllegalArgumentException("Socket configuration may not be null.");
} else if (protocolContext == null) {
@ -78,6 +83,7 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin
this.socketConfiguration = socketConfiguration;
this.protocolContext = protocolContext;
this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured
this.maxThreadsPerRequest = maxThreadsPerRequest;
}
@Override
@ -233,8 +239,7 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin
return;
}
final NiFiProperties properties = NiFiProperties.getInstance();
final int numThreads = Math.min(nodesToNotify.size(), properties.getClusterNodeProtocolThreads());
final int numThreads = Math.min(nodesToNotify.size(), maxThreadsPerRequest);
final byte[] msgBytes;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {

View File

@ -141,6 +141,11 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>

View File

@ -113,8 +113,13 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
@Override
public void onStart() {
final RetryPolicy retryPolicy = new RetryForever(5000);
curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy);
curatorClient = CuratorFrameworkFactory.builder()
.connectString(zkClientConfig.getConnectString())
.sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis())
.retryPolicy(retryPolicy)
.defaultData(new byte[0])
.build();
curatorClient.start();
// We don't know what the heartbeats look like for the nodes, since we were just elected to monitoring
@ -149,7 +154,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
}
curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
logger.info("Successfully created node in ZooKeeper with path {}", path);
logger.info("Successfully published address as heartbeat monitor address at path {} with value {}", path, heartbeatAddress);
return;
}

View File

@ -20,18 +20,19 @@ package org.apache.nifi.cluster.coordination.node;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@ -49,7 +50,7 @@ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender {
private InetSocketAddress coordinatorAddress;
public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, final ProtocolContext<ProtocolMessage> protocolContext, final NiFiProperties properties) {
public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, final ProtocolContext<ProtocolMessage> protocolContext, final Properties properties) {
super(socketConfig, protocolContext);
zkConfig = ZooKeeperClientConfig.createConfig(properties);
coordinatorPath = zkConfig.resolvePath("cluster/nodes/coordinator");
@ -75,9 +76,12 @@ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender {
}
}).forPath(coordinatorPath);
if (coordinatorAddressBytes == null || coordinatorAddressBytes.length == 0) {
throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
}
final String address = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
final String[] splits = address.split(":");
if (splits.length != 2) {
final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates "
@ -86,6 +90,8 @@ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender {
throw new ProtocolException(message);
}
logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
final String hostname = splits[0];
final int port;
try {
@ -105,7 +111,9 @@ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender {
return socketAddress;
} catch (final NoNodeException nne) {
logger.info("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
throw new ProtocolException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
} catch (final NoClusterCoordinatorException ncce) {
throw ncce;
} catch (Exception e) {
throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
} finally {

View File

@ -49,6 +49,8 @@ import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
@ -71,6 +73,7 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
@ -98,6 +101,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private volatile FlowService flowService;
private volatile boolean connected;
private volatile String coordinatorAddress;
private volatile boolean closed = false;
private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
@ -123,14 +127,16 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
senderListener.addHandler(this);
}
// method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
// is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
// before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
// seen by other nodes).
@Override
public synchronized void shutdown() {
public void shutdown() {
if (closed) {
return;
}
closed = true;
final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
updateNodeStatus(shutdownStatus);
updateNodeStatus(shutdownStatus, false);
logger.info("Successfully notified other nodes that I am shutting down");
curatorClient.close();
@ -139,6 +145,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public void setLocalNodeIdentifier(final NodeIdentifier nodeId) {
this.nodeId = nodeId;
nodeStatuses.computeIfAbsent(nodeId, id -> new NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED));
}
@Override
@ -159,6 +166,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
while (localNodeId == null) {
localNodeId = fetchNodeId.get();
if (localNodeId == null) {
if (closed) {
return null;
}
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
@ -188,6 +199,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
logger.info("Determined that Cluster Coordinator is located at {}", address);
return address;
} catch (final KeeperException.NoNodeException nne) {
throw new NoClusterCoordinatorException();
} catch (Exception e) {
throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
}
@ -395,7 +408,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
while (!updated) {
final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeId);
if (currentStatus == null) {
throw new IllegalStateException("Cannot update roles for " + nodeId + " to " + roles + " because the node is not part of this cluster");
throw new UnknownNodeException("Cannot update roles for " + nodeId + " to " + roles + " because the node is not part of this cluster");
}
if (currentStatus.getRoles().equals(roles)) {
@ -531,9 +544,15 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final String electedNodeAddress;
try {
electedNodeAddress = getElectedActiveCoordinatorAddress();
} catch (final NoClusterCoordinatorException ncce) {
logger.debug("There is currently no elected active Cluster Coordinator");
return null;
} catch (final IOException ioe) {
if (warnOnError) {
logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe);
logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently: " + ioe);
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
}
return null;
@ -644,6 +663,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
*/
// visible for testing.
void updateNodeStatus(final NodeConnectionStatus status) {
updateNodeStatus(status, true);
}
void updateNodeStatus(final NodeConnectionStatus status, final boolean waitForCoordinator) {
final NodeIdentifier nodeId = status.getNodeIdentifier();
// In this case, we are using nodeStatuses.put() instead of getting the current value and
@ -668,14 +691,14 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
logger.debug("Notifying cluster coordinator that node status changed from {} to {}", currentStatus, status);
}
notifyOthersOfNodeStatusChange(status, notifyAllNodes);
notifyOthersOfNodeStatusChange(status, notifyAllNodes, waitForCoordinator);
} else {
logger.debug("Not notifying other nodes that status changed because previous state of {} is same as new state of {}", currentState, status.getState());
}
}
void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) {
notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator());
notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator(), true);
}
/**
@ -684,7 +707,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
* @param updatedStatus the updated status for a node in the cluster
* @param notifyAllNodes if <code>true</code> will notify all nodes. If <code>false</code>, will notify only the cluster coordinator
*/
void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes) {
void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes, final boolean waitForCoordinator) {
// If this node is the active cluster coordinator, then we are going to replicate to all nodes.
// Otherwise, get the active coordinator (or wait for one to become active) and then notify the coordinator.
final Set<NodeIdentifier> nodesToNotify;
@ -693,8 +716,14 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// Do not notify ourselves because we already know about the status update.
nodesToNotify.remove(getLocalNodeIdentifier());
} else {
} else if (waitForCoordinator) {
nodesToNotify = Collections.singleton(waitForElectedClusterCoordinator());
} else {
final NodeIdentifier nodeId = getElectedActiveCoordinatorNode();
if (nodeId == null) {
return;
}
nodesToNotify = Collections.singleton(nodeId);
}
final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
@ -878,6 +907,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
if (updated) {
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
logger.debug("State of cluster nodes is now {}", nodeStatuses);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
final String summary = summarizeStatusChange(oldStatus, status);

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
public class ReportedEvent {
private final NodeIdentifier nodeId;
private final Severity severity;
private final String event;
public ReportedEvent(NodeIdentifier nodeId, Severity severity, String event) {
this.nodeId = nodeId;
this.severity = severity;
this.event = event;
}
public NodeIdentifier getNodeId() {
return nodeId;
}
public Severity getSeverity() {
return severity;
}
public String getEvent() {
return event;
}
}

View File

@ -32,6 +32,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
@ -322,30 +323,6 @@ public class TestAbstractHeartbeatMonitor {
}
}
public static class ReportedEvent {
private final NodeIdentifier nodeId;
private final Severity severity;
private final String event;
public ReportedEvent(NodeIdentifier nodeId, Severity severity, String event) {
this.nodeId = nodeId;
this.severity = severity;
this.event = event;
}
public NodeIdentifier getNodeId() {
return nodeId;
}
public Severity getSeverity() {
return severity;
}
public String getEvent() {
return event;
}
}
private static class TestFriendlyHeartbeatMonitor extends AbstractHeartbeatMonitor {
private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new HashMap<>();

View File

@ -77,7 +77,7 @@ public class TestNodeClusterCoordinator {
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) {
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
nodeStatuses.add(updatedStatus);
}
};
@ -132,7 +132,7 @@ public class TestNodeClusterCoordinator {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) {
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
};
@ -170,7 +170,7 @@ public class TestNodeClusterCoordinator {
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) {
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
};

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.integration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Cluster {
private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
private final Set<Node> nodes = new HashSet<>();
private final TestingServer zookeeperServer;
public Cluster() {
try {
zookeeperServer = new TestingServer();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public void start() {
try {
zookeeperServer.start();
} catch (final RuntimeException e) {
throw e;
} catch (final Exception e) {
throw new RuntimeException(e);
}
while (getZooKeeperConnectString() == null) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
}
}
logger.info("Start ZooKeeper Server on Port {}, with temporary directory {}", zookeeperServer.getPort(), zookeeperServer.getTempDirectory());
}
public void stop() {
for (final Node node : nodes) {
try {
if (node.isRunning()) {
node.stop();
}
} catch (Exception e) {
logger.error("Failed to shut down " + node, e);
}
}
try {
zookeeperServer.stop();
zookeeperServer.close();
} catch (final Exception e) {
}
}
public String getZooKeeperConnectString() {
return zookeeperServer.getConnectString();
}
public Set<Node> getNodes() {
return Collections.unmodifiableSet(nodes);
}
public Node createNode() {
NiFiProperties.getInstance().setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString());
NiFiProperties.getInstance().setProperty(NiFiProperties.CLUSTER_IS_NODE, "true");
final NiFiProperties properties = NiFiProperties.getInstance().copy();
final Node node = new Node(properties);
node.start();
nodes.add(node);
return node;
}
public Node waitForClusterCoordinator(final long time, final TimeUnit timeUnit) {
return ClusterUtils.waitUntilNonNull(time, timeUnit,
() -> getNodes().stream().filter(node -> node.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null));
}
public Node waitForPrimaryNode(final long time, final TimeUnit timeUnit) {
return ClusterUtils.waitUntilNonNull(time, timeUnit,
() -> getNodes().stream().filter(node -> node.getRoles().contains(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null));
}
}

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.integration;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.junit.BeforeClass;
import org.junit.Test;
public class ClusterConnectionIT {
@BeforeClass
public static void setup() {
System.setProperty("nifi.properties.file.path", "src/test/resources/conf/nifi.properties");
}
@Test(timeout = 20000)
public void testSingleNode() throws InterruptedException {
final Cluster cluster = new Cluster();
cluster.start();
try {
final Node firstNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS);
firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
}
@Test(timeout = 60000)
public void testThreeNodeCluster() throws InterruptedException {
final Cluster cluster = new Cluster();
cluster.start();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
final Node thirdNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 3 Connected ****");
final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
final Node primaryNode = cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
System.out.println("\n\n");
System.out.println("Cluster Coordinator = " + clusterCoordinator);
System.out.println("Primary Node = " + primaryNode);
System.out.println("\n\n");
} finally {
cluster.stop();
}
}
@Test(timeout = 60000)
public void testNewCoordinatorElected() throws IOException {
final Cluster cluster = new Cluster();
cluster.start();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
clusterCoordinator.stop();
final Node otherNode = firstNode == clusterCoordinator ? secondNode : firstNode;
otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
}
@Test(timeout = 60000)
public void testReconnectGetsCorrectClusterTopology() throws IOException {
final Cluster cluster = new Cluster();
cluster.start();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
final Node thirdNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 3 Connected ****");
// shutdown node
secondNode.stop();
System.out.println("\n\nNode 2 Shut Down\n\n");
// wait for node 1 and 3 to recognize that node 2 is gone
Stream.of(firstNode, thirdNode).forEach(node -> {
node.assertNodeDisconnects(secondNode.getIdentifier(), 5, TimeUnit.SECONDS);
});
// restart node
secondNode.start();
System.out.println("\n\nNode 2 Restarted\n\n");
secondNode.waitUntilConnected(20, TimeUnit.SECONDS);
System.out.println("\n\nNode 2 Reconnected\n\n");
// wait for all 3 nodes to agree that node 2 is connected
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
() -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED);
});
// Ensure that all 3 nodes see a cluster of 3 connected nodes.
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
node.assertNodeIsConnected(firstNode.getIdentifier());
node.assertNodeIsConnected(secondNode.getIdentifier());
node.assertNodeIsConnected(thirdNode.getIdentifier());
});
// Ensure that we get both a cluster coordinator and a primary node elected
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
}
@Test(timeout = 60000)
public void testRestartAllNodes() throws IOException {
final Cluster cluster = new Cluster();
cluster.start();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
final Node thirdNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 1 Connected ****");
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 2 Connected ****");
thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
System.out.println("**** Node 3 Connected ****");
// shutdown node
firstNode.stop();
secondNode.stop();
thirdNode.stop();
System.out.println("\n\nRestarting all nodes\n\n");
thirdNode.start();
firstNode.start();
secondNode.start();
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
node.waitUntilConnected(10, TimeUnit.SECONDS);
});
// wait for all 3 nodes to agree that node 2 is connected
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
() -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED);
});
// Ensure that all 3 nodes see a cluster of 3 connected nodes.
Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
node.assertNodeConnects(firstNode.getIdentifier(), 10, TimeUnit.SECONDS);
node.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS);
node.assertNodeConnects(thirdNode.getIdentifier(), 10, TimeUnit.SECONDS);
});
// Ensure that we get both a cluster coordinator and a primary node elected
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
}
@Test(timeout = 30000)
public void testHeartbeatsMonitored() throws IOException {
final Cluster cluster = new Cluster();
cluster.start();
try {
final Node firstNode = cluster.createNode();
final Node secondNode = cluster.createNode();
firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
secondNode.suspendHeartbeating();
// Heartbeat interval in nifi.properties is set to 1 sec. This means that the node should be kicked out
// due to lack of heartbeat after 8 times this amount of time, or 8 seconds.
firstNode.assertNodeDisconnects(secondNode.getIdentifier(), 12, TimeUnit.SECONDS);
secondNode.resumeHeartbeating();
firstNode.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS);
} finally {
cluster.stop();
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.integration;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
public class ClusterUtils {
public static void waitUntilConditionMet(final long time, final TimeUnit timeUnit, final BooleanSupplier test) {
final long nanosToWait = timeUnit.toNanos(time);
final long start = System.nanoTime();
final long maxTime = start + nanosToWait;
while (!test.getAsBoolean()) {
if (System.nanoTime() > maxTime) {
throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit);
}
}
}
public static <T> T waitUntilNonNull(final long time, final TimeUnit timeUnit, final Supplier<T> test) {
final long nanosToWait = timeUnit.toNanos(time);
final long start = System.nanoTime();
final long maxTime = start + nanosToWait;
T returnVal;
while ((returnVal = test.get()) == null) {
if (System.nanoTime() > maxTime) {
throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit);
}
}
return returnVal;
}
}

View File

@ -0,0 +1,320 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.integration;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender;
import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
import org.apache.nifi.cluster.protocol.impl.StandardClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.revision.RevisionManager;
import org.junit.Assert;
import org.mockito.Mockito;
public class Node {
private final NodeIdentifier nodeId;
private final NiFiProperties nodeProperties;
private final List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
private final RevisionManager revisionManager;
private NodeClusterCoordinator clusterCoordinator;
private CuratorNodeProtocolSender protocolSender;
private FlowController flowController;
private StandardFlowService flowService;
private ProtocolListener protocolListener;
private volatile boolean running = false;
private ScheduledExecutorService executor = new FlowEngine(8, "Node tasks", true);
public Node(final NiFiProperties properties) {
this(new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null), properties);
}
public Node(final NodeIdentifier nodeId, final NiFiProperties properties) {
this.nodeId = nodeId;
this.nodeProperties = properties;
nodeProperties.setProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT, String.valueOf(nodeId.getSocketPort()));
nodeProperties.setProperty(NiFiProperties.WEB_HTTP_PORT, String.valueOf(nodeId.getApiPort()));
revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.<Revision> emptyList());
}
public synchronized void start() {
running = true;
protocolSender = createNodeProtocolSender();
clusterCoordinator = createClusterCoordinator();
clusterCoordinator.setLocalNodeIdentifier(nodeId);
clusterCoordinator.setConnected(true);
final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties,
null, null, StringEncryptor.createEncryptor(), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, VariableRegistry.EMPTY_REGISTRY);
try {
flowController.initializeFlow();
} catch (IOException e) {
throw new RuntimeException(e);
}
final NodeProtocolSenderListener senderListener = new NodeProtocolSenderListener(protocolSender, protocolListener);
try {
flowController.getStateManagerProvider().getStateManager("Cluster Node Configuration").setState(Collections.singletonMap("Node UUID", nodeId.getId()), Scope.LOCAL);
flowService = StandardFlowService.createClusteredInstance(flowController, nodeProperties, senderListener, clusterCoordinator,
StringEncryptor.createEncryptor(), revisionManager, Mockito.mock(Authorizer.class));
flowService.start();
flowService.load(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void stop() throws IOException {
running = false;
flowController.shutdown(true);
flowService.stop(true);
clusterCoordinator.shutdown();
executor.shutdownNow();
// protocol listener is closed by flow controller
}
public void suspendHeartbeating() {
flowController.suspendHeartbeats();
}
public void resumeHeartbeating() {
flowController.resumeHeartbeats();
}
public NodeIdentifier getIdentifier() {
return nodeId;
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(nodeId).build();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (!(obj instanceof Node)) {
return false;
}
return getIdentifier().equals(((Node) obj).getIdentifier());
}
@Override
public String toString() {
return "Node[id=" + getIdentifier() + ", started=" + isRunning() + "]";
}
public boolean isRunning() {
return running;
}
private static int createPort() {
// get an unused port
while (true) {
try (ServerSocket ss = new ServerSocket(0)) {
return ss.getLocalPort();
} catch (final IOException ioe) {
}
}
}
public Set<String> getRoles() {
final NodeConnectionStatus status = getConnectionStatus();
return status == null ? Collections.emptySet() : status.getRoles();
}
public NodeConnectionStatus getConnectionStatus() {
return clusterCoordinator.getConnectionStatus(nodeId);
}
@SuppressWarnings("unchecked")
private CuratorNodeProtocolSender createNodeProtocolSender() {
final SocketConfiguration socketConfig = new SocketConfiguration();
socketConfig.setSocketTimeout(3000);
socketConfig.setReuseAddress(true);
final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT);
final CuratorNodeProtocolSender protocolSender = new CuratorNodeProtocolSender(socketConfig, protocolContext, nodeProperties);
return protocolSender;
}
@SuppressWarnings("unchecked")
private ClusterCoordinationProtocolSender createCoordinatorProtocolSender() {
final SocketConfiguration socketConfig = new SocketConfiguration();
socketConfig.setSocketTimeout(3000);
socketConfig.setReuseAddress(true);
final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT);
return new StandardClusterCoordinationProtocolSender(socketConfig, protocolContext, 1);
}
private HeartbeatMonitor createHeartbeatMonitor() {
return new ClusterProtocolHeartbeatMonitor(clusterCoordinator, protocolListener, nodeProperties);
}
@SuppressWarnings("unchecked")
private NodeClusterCoordinator createClusterCoordinator() {
final EventReporter eventReporter = new EventReporter() {
@Override
public void reportEvent(Severity severity, String category, String message) {
reportedEvents.add(new ReportedEvent(nodeId, severity, message));
}
};
final ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration();
serverSocketConfiguration.setSocketTimeout(5000);
final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT);
protocolListener = new SocketProtocolListener(3, Integer.parseInt(nodeProperties.getProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT)), serverSocketConfiguration, protocolContext);
try {
protocolListener.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
return new NodeClusterCoordinator(protocolSenderListener, eventReporter, null, revisionManager, nodeProperties);
}
public ClusterCoordinator getClusterCoordinator() {
return clusterCoordinator;
}
//
// Methods for checking conditions
//
public boolean isConnected() {
final NodeConnectionStatus status = getConnectionStatus();
if (status == null) {
return false;
}
return status.getState() == NodeConnectionState.CONNECTED;
}
//
// Methods to wait for conditions
//
public void waitUntilConnected(final long time, final TimeUnit timeUnit) {
ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> isConnected());
}
public void waitUntilElectedForRole(final String roleName, final long time, final TimeUnit timeUnit) {
ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> getRoles().contains(roleName));
}
// Assertions
/**
* Assert that the node with the given ID connects (According to this node!) within the given amount of time
*
* @param nodeId id of the node
* @param time how long to wait
* @param timeUnit unit of time provided by the 'time' argument
*/
public void assertNodeConnects(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) {
ClusterUtils.waitUntilConditionMet(time, timeUnit,
() -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.CONNECTED);
}
/**
* Assert that the node with the given ID disconnects (According to this node!) within the given amount of time
*
* @param nodeId id of the node
* @param time how long to wait
* @param timeUnit unit of time provided by the 'time' argument
*/
public void assertNodeDisconnects(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) {
ClusterUtils.waitUntilConditionMet(time, timeUnit,
() -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.DISCONNECTED);
}
/**
* Asserts that the node with the given ID is currently connected (According to this node!)
*
* @param nodeId id of the node
*/
public void assertNodeIsConnected(final NodeIdentifier nodeId) {
Assert.assertEquals(NodeConnectionState.CONNECTED, getClusterCoordinator().getConnectionStatus(nodeId).getState());
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.integration;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.controller.state.StandardStateMap;
public class NopStateProvider implements StateProvider {
private final String id = UUID.randomUUID().toString();
private final Map<String, Map<String, String>> componentStateMap = new HashMap<>();
@Override
public Collection<ValidationResult> validate(ValidationContext context) {
return Collections.emptyList();
}
@Override
public PropertyDescriptor getPropertyDescriptor(String name) {
return null;
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
@Override
public List<PropertyDescriptor> getPropertyDescriptors() {
return Collections.emptyList();
}
@Override
public String getIdentifier() {
return id;
}
@Override
public void initialize(StateProviderInitializationContext context) throws IOException {
}
@Override
public void shutdown() {
}
@Override
public synchronized void setState(Map<String, String> state, String componentId) throws IOException {
final Map<String, String> stateMap = componentStateMap.computeIfAbsent(componentId, compId -> new HashMap<String, String>());
stateMap.clear();
stateMap.putAll(state);
}
@Override
public synchronized StateMap getState(String componentId) throws IOException {
return new StandardStateMap(componentStateMap.computeIfAbsent(componentId, compId -> new HashMap<String, String>()), 0L);
}
@Override
public synchronized boolean replace(StateMap oldValue, Map<String, String> newValue, String componentId) throws IOException {
return false;
}
@Override
public void clear(String componentId) throws IOException {
}
@Override
public void onComponentRemoved(String componentId) throws IOException {
}
@Override
public void enable() {
}
@Override
public void disable() {
}
@Override
public boolean isEnabled() {
return true;
}
@Override
public Scope[] getSupportedScopes() {
return new Scope[] {Scope.LOCAL};
}
}

View File

@ -30,11 +30,25 @@ nifi.ui.autorefresh.interval=30 sec
nifi.nar.library.directory=./target/lib
nifi.nar.working.directory=./target/work/nar/
####################
# State Management #
####################
nifi.state.management.configuration.file=src/test/resources/conf/state-management.xml
# The ID of the local state provider
nifi.state.management.provider.local=local-provider
# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
nifi.state.management.provider.cluster=zk-provider
# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
nifi.state.management.embedded.zookeeper.start=false
# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# H2 Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.VolatileFlowFileRepository
nifi.flowfile.repository.directory=./target/test-repo
nifi.flowfile.repository.partitions=1
nifi.flowfile.repository.checkpoint.interval=2 mins
@ -56,10 +70,11 @@ nifi.provenance.repository.max.storage.time=24 hours
nifi.provenance.repository.max.storage.size=1 GB
nifi.provenance.repository.rollover.time=30 secs
nifi.provenance.repository.rollover.size=100 MB
nifi.provenance.repository.implementation=org.apache.nifi.provenance.MockProvenanceRepository
# Site to Site properties
nifi.remote.input.socket.port=9990
nifi.remote.input.secure=true
nifi.remote.input.socket.port=
nifi.remote.input.secure=false
# web properties #
nifi.web.war.directory=./target/lib
@ -89,7 +104,7 @@ nifi.security.support.new.account.requests=
nifi.security.default.user.roles=
# cluster common properties (cluster manager and nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=5 sec
nifi.cluster.protocol.heartbeat.interval=1 secs
nifi.cluster.protocol.is.secure=false
nifi.cluster.protocol.socket.timeout=30 sec
nifi.cluster.protocol.connection.handshake.timeout=45 sec

View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
This file provides a mechanism for defining and configuring the State Providers
that should be used for storing state locally and across a NiFi cluster. In order
to use a specific provider, it must be configured here and its identifier
must be specified in the nifi.properties file.
-->
<stateManagement>
<!--
State Provider that stores state locally in a configurable directory. This Provider requires the following properties:
Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it
is important that the directory be copied over to the new version when upgrading NiFi.
-->
<local-provider>
<id>local-provider</id>
<class>org.apache.nifi.cluster.integration.NopStateProvider</class>
</local-provider>
<!--
State Provider that is used to store state in ZooKeeper. This Provider requires the following properties:
Root Node - the root node in ZooKeeper where state should be stored. The default is '/nifi', but it is advisable to change this to a different value if not using
the embedded ZooKeeper server and if multiple NiFi instances may all be using the same ZooKeeper Server.
Connect String - A comma-separated list of host:port pairs to connect to ZooKeeper. For example, myhost.mydomain:2181,host2.mydomain:5555,host3:6666
Session Timeout - Specifies how long this instance of NiFi is allowed to be disconnected from ZooKeeper before creating a new ZooKeeper Session. Default value is "30 seconds"
Access Control - Specifies which Access Controls will be applied to the ZooKeeper ZNodes that are created by this State Provider. This value must be set to one of:
- Open : ZNodes will be open to any ZooKeeper client.
- CreatorOnly : ZNodes will be accessible only by the creator. The creator will have full access to create children, read, write, delete, and administer the ZNodes.
This option is available only if access to ZooKeeper is secured via Kerberos or if a Username and Password are set.
-->
<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name="Connect String">localhost:8320</property>
<property name="Root Node">/nifi</property>
<property name="Session Timeout">10 seconds</property>
<property name="Access Control">Open</property>
</cluster-provider>
</stateManagement>

View File

@ -23,6 +23,11 @@
<!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
<logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.engine.FlowEngine" level="OFF" />
<logger name="org.apache.nifi.cluster.coordination.node" level="DEBUG" />
<logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF" />
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
<logger name="org.apache.curator.framework.imps.CuratorFrameworkImpl" level="OFF" />
<!-- Logger for managing logging statements for nifi clusters. -->
<logger name="org.apache.nifi.cluster" level="INFO"/>

View File

@ -581,7 +581,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
if (configuredForClustering) {
leaderElectionManager = new CuratorLeaderElectionManager(4);
leaderElectionManager = new CuratorLeaderElectionManager(4, properties);
heartbeater = new ClusterProtocolHeartbeater(protocolSender, properties);
// Check if there is already a cluster coordinator elected. If not, go ahead

View File

@ -788,7 +788,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
} catch (final Exception pe) {
// could not create a socket and communicate with manager
logger.warn("Failed to connect to cluster due to: " + pe, pe);
logger.warn("Failed to connect to cluster due to: " + pe);
if (logger.isDebugEnabled()) {
logger.warn("", pe);
}
if (retryOnCommsFailure) {
try {
Thread.sleep(response == null ? 5000 : response.getTryLaterSeconds());

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.leader.election;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@ -48,10 +49,13 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
private final Map<String, LeaderElectionStateChangeListener> registeredRoles = new HashMap<>();
public CuratorLeaderElectionManager(final int threadPoolSize) {
leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
final NiFiProperties properties = NiFiProperties.getInstance();
public CuratorLeaderElectionManager(final int threadPoolSize) {
this(threadPoolSize, NiFiProperties.getInstance());
}
public CuratorLeaderElectionManager(final int threadPoolSize, final Properties properties) {
leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
zkConfig = ZooKeeperClientConfig.createConfig(properties);
}
@ -65,7 +69,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
stopped = false;
final RetryPolicy retryPolicy = new RetryForever(5000);
curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
curatorClient = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getConnectString())
.sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
.retryPolicy(retryPolicy)
.defaultData(new byte[0])
.build();
curatorClient.start();
// Call #register for each already-registered role. This will
@ -227,6 +238,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e);
logger.error("", e);
leader = false;
Thread.sleep(1000L);
return;
}
}
@ -251,7 +263,8 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
try {
listener.onLeaderRelinquish();
} catch (final Exception e) {
logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e);
logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
@ -259,3 +272,4 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
}
}
}
}