From 76a4a2c48b154719c3ea750b14d568f9998069ba Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 9 Aug 2016 16:50:04 -0400 Subject: [PATCH] NIFI-2544: Created integration tests for clustering and addressed a few minor bugs that were found in doing so This closes #832. Signed-off-by: Bryan Bende --- .../org/apache/nifi/util/NiFiProperties.java | 6 + .../impl/NodeProtocolSenderListener.java | 6 +- ...dardClusterCoordinationProtocolSender.java | 9 +- .../nifi-framework-cluster/pom.xml | 5 + .../ClusterProtocolHeartbeatMonitor.java | 11 +- .../node/CuratorNodeProtocolSender.java | 16 +- .../node/NodeClusterCoordinator.java | 54 ++- .../apache/nifi/cluster/ReportedEvent.java | 45 +++ .../TestAbstractHeartbeatMonitor.java | 25 +- .../node/TestNodeClusterCoordinator.java | 6 +- .../nifi/cluster/integration/Cluster.java | 114 +++++++ .../integration/ClusterConnectionIT.java | 238 +++++++++++++ .../cluster/integration/ClusterUtils.java | 52 +++ .../apache/nifi/cluster/integration/Node.java | 320 ++++++++++++++++++ .../cluster/integration/NopStateProvider.java | 115 +++++++ .../src/test/resources/conf/nifi.properties | 21 +- .../test/resources/conf/state-management.xml | 57 ++++ .../src/test/resources/logback-test.xml | 5 + .../nifi/controller/FlowController.java | 2 +- .../nifi/controller/StandardFlowService.java | 6 +- .../CuratorLeaderElectionManager.java | 26 +- 21 files changed, 1078 insertions(+), 61 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/ReportedEvent.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/state-management.xml diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 90115d5d79..bbb3998a62 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -247,6 +247,12 @@ public class NiFiProperties extends Properties { super(); } + public NiFiProperties copy() { + final NiFiProperties copy = new NiFiProperties(); + copy.putAll(this); + return copy; + } + /** * Factory method to create an instance of the {@link NiFiProperties}. This * method employs a standard singleton pattern by caching the instance if it diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java index cc403310a9..0fd2517aa1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java @@ -46,16 +46,18 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL @Override public void stop() throws IOException { if (!isRunning()) { - throw new IllegalStateException("Instance is already stopped."); + return; } + listener.stop(); } @Override public void start() throws IOException { if (isRunning()) { - throw new IllegalStateException("Instance is already started."); + return; } + listener.start(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java index e8099610db..b9ff0829f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java @@ -67,9 +67,14 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin private final ProtocolContext protocolContext; private final SocketConfiguration socketConfiguration; + private final int maxThreadsPerRequest; private int handshakeTimeoutSeconds; public StandardClusterCoordinationProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext protocolContext) { + this(socketConfiguration, protocolContext, NiFiProperties.getInstance().getClusterNodeProtocolThreads()); + } + + public StandardClusterCoordinationProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext protocolContext, final int maxThreadsPerRequest) { if (socketConfiguration == null) { throw new IllegalArgumentException("Socket configuration may not be null."); } else if (protocolContext == null) { @@ -78,6 +83,7 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin this.socketConfiguration = socketConfiguration; this.protocolContext = protocolContext; this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured + this.maxThreadsPerRequest = maxThreadsPerRequest; } @Override @@ -233,8 +239,7 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin return; } - final NiFiProperties properties = NiFiProperties.getInstance(); - final int numThreads = Math.min(nodesToNotify.size(), properties.getClusterNodeProtocolThreads()); + final int numThreads = Math.min(nodesToNotify.size(), maxThreadsPerRequest); final byte[] msgBytes; try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml index f74d1ae5d8..79b8d6d20f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml @@ -141,6 +141,11 @@ curator-test test + + org.apache.nifi + nifi-mock + test + org.testng testng diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index 09dccad75a..d2d81d1da4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -113,8 +113,13 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im @Override public void onStart() { final RetryPolicy retryPolicy = new RetryForever(5000); - curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), - zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy); + curatorClient = CuratorFrameworkFactory.builder() + .connectString(zkClientConfig.getConnectString()) + .sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis()) + .connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis()) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); curatorClient.start(); // We don't know what the heartbeats look like for the nodes, since we were just elected to monitoring @@ -149,7 +154,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im } curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8)); - logger.info("Successfully created node in ZooKeeper with path {}", path); + logger.info("Successfully published address as heartbeat monitor address at path {} with value {}", path, heartbeatAddress); return; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java index e893c3aacc..daa3e5c812 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java @@ -20,18 +20,19 @@ package org.apache.nifi.cluster.coordination.node; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; +import java.util.Properties; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; +import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.nifi.util.NiFiProperties; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -49,7 +50,7 @@ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender { private InetSocketAddress coordinatorAddress; - public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, final ProtocolContext protocolContext, final NiFiProperties properties) { + public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, final ProtocolContext protocolContext, final Properties properties) { super(socketConfig, protocolContext); zkConfig = ZooKeeperClientConfig.createConfig(properties); coordinatorPath = zkConfig.resolvePath("cluster/nodes/coordinator"); @@ -75,9 +76,12 @@ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender { } }).forPath(coordinatorPath); + if (coordinatorAddressBytes == null || coordinatorAddressBytes.length == 0) { + throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet."); + } + final String address = new String(coordinatorAddressBytes, StandardCharsets.UTF_8); - logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address); final String[] splits = address.split(":"); if (splits.length != 2) { final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates " @@ -86,6 +90,8 @@ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender { throw new ProtocolException(message); } + logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address); + final String hostname = splits[0]; final int port; try { @@ -105,7 +111,9 @@ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender { return socketAddress; } catch (final NoNodeException nne) { logger.info("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet."); - throw new ProtocolException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet."); + throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet."); + } catch (final NoClusterCoordinatorException ncce) { + throw ncce; } catch (Exception e) { throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e); } finally { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 0c94a6604a..68c0a3a412 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -49,6 +49,8 @@ import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; +import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; @@ -71,6 +73,7 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; import org.apache.nifi.services.FlowService; import org.apache.nifi.web.revision.RevisionManager; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -98,6 +101,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private volatile FlowService flowService; private volatile boolean connected; private volatile String coordinatorAddress; + private volatile boolean closed = false; private final ConcurrentMap nodeStatuses = new ConcurrentHashMap<>(); private final ConcurrentMap> nodeEvents = new ConcurrentHashMap<>(); @@ -123,14 +127,16 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl senderListener.addHandler(this); } - // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this - // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it - // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never - // seen by other nodes). @Override - public synchronized void shutdown() { + public void shutdown() { + if (closed) { + return; + } + + closed = true; + final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN); - updateNodeStatus(shutdownStatus); + updateNodeStatus(shutdownStatus, false); logger.info("Successfully notified other nodes that I am shutting down"); curatorClient.close(); @@ -139,6 +145,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public void setLocalNodeIdentifier(final NodeIdentifier nodeId) { this.nodeId = nodeId; + nodeStatuses.computeIfAbsent(nodeId, id -> new NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED)); } @Override @@ -159,6 +166,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl while (localNodeId == null) { localNodeId = fetchNodeId.get(); if (localNodeId == null) { + if (closed) { + return null; + } + try { Thread.sleep(100L); } catch (final InterruptedException ie) { @@ -188,6 +199,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl logger.info("Determined that Cluster Coordinator is located at {}", address); return address; + } catch (final KeeperException.NoNodeException nne) { + throw new NoClusterCoordinatorException(); } catch (Exception e) { throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e); } @@ -395,7 +408,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl while (!updated) { final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeId); if (currentStatus == null) { - throw new IllegalStateException("Cannot update roles for " + nodeId + " to " + roles + " because the node is not part of this cluster"); + throw new UnknownNodeException("Cannot update roles for " + nodeId + " to " + roles + " because the node is not part of this cluster"); } if (currentStatus.getRoles().equals(roles)) { @@ -531,9 +544,15 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl final String electedNodeAddress; try { electedNodeAddress = getElectedActiveCoordinatorAddress(); + } catch (final NoClusterCoordinatorException ncce) { + logger.debug("There is currently no elected active Cluster Coordinator"); + return null; } catch (final IOException ioe) { if (warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe); + logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently: " + ioe); + if (logger.isDebugEnabled()) { + logger.warn("", ioe); + } } return null; @@ -644,6 +663,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl */ // visible for testing. void updateNodeStatus(final NodeConnectionStatus status) { + updateNodeStatus(status, true); + } + + void updateNodeStatus(final NodeConnectionStatus status, final boolean waitForCoordinator) { final NodeIdentifier nodeId = status.getNodeIdentifier(); // In this case, we are using nodeStatuses.put() instead of getting the current value and @@ -668,14 +691,14 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl logger.debug("Notifying cluster coordinator that node status changed from {} to {}", currentStatus, status); } - notifyOthersOfNodeStatusChange(status, notifyAllNodes); + notifyOthersOfNodeStatusChange(status, notifyAllNodes, waitForCoordinator); } else { logger.debug("Not notifying other nodes that status changed because previous state of {} is same as new state of {}", currentState, status.getState()); } } void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) { - notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator()); + notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator(), true); } /** @@ -684,7 +707,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl * @param updatedStatus the updated status for a node in the cluster * @param notifyAllNodes if true will notify all nodes. If false, will notify only the cluster coordinator */ - void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes) { + void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes, final boolean waitForCoordinator) { // If this node is the active cluster coordinator, then we are going to replicate to all nodes. // Otherwise, get the active coordinator (or wait for one to become active) and then notify the coordinator. final Set nodesToNotify; @@ -693,8 +716,14 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // Do not notify ourselves because we already know about the status update. nodesToNotify.remove(getLocalNodeIdentifier()); - } else { + } else if (waitForCoordinator) { nodesToNotify = Collections.singleton(waitForElectedClusterCoordinator()); + } else { + final NodeIdentifier nodeId = getElectedActiveCoordinatorNode(); + if (nodeId == null) { + return; + } + nodesToNotify = Collections.singleton(nodeId); } final NodeStatusChangeMessage message = new NodeStatusChangeMessage(); @@ -878,6 +907,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl if (updated) { logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); + logger.debug("State of cluster nodes is now {}", nodeStatuses); final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); final String summary = summarizeStatusChange(oldStatus, status); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/ReportedEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/ReportedEvent.java new file mode 100644 index 0000000000..8fc808d49b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/ReportedEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.reporting.Severity; + +public class ReportedEvent { + private final NodeIdentifier nodeId; + private final Severity severity; + private final String event; + + public ReportedEvent(NodeIdentifier nodeId, Severity severity, String event) { + this.nodeId = nodeId; + this.severity = severity; + this.event = event; + } + + public NodeIdentifier getNodeId() { + return nodeId; + } + + public Severity getSeverity() { + return severity; + } + + public String getEvent() { + return event; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 0f1ce20d03..5086dc004d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import org.apache.nifi.cluster.ReportedEvent; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; @@ -322,30 +323,6 @@ public class TestAbstractHeartbeatMonitor { } } - public static class ReportedEvent { - private final NodeIdentifier nodeId; - private final Severity severity; - private final String event; - - public ReportedEvent(NodeIdentifier nodeId, Severity severity, String event) { - this.nodeId = nodeId; - this.severity = severity; - this.event = event; - } - - public NodeIdentifier getNodeId() { - return nodeId; - } - - public Severity getSeverity() { - return severity; - } - - public String getEvent() { - return event; - } - } - private static class TestFriendlyHeartbeatMonitor extends AbstractHeartbeatMonitor { private Map heartbeats = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 319bd84e00..2f034b34b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -77,7 +77,7 @@ public class TestNodeClusterCoordinator { coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { @Override - void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) { + void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { nodeStatuses.add(updatedStatus); } }; @@ -132,7 +132,7 @@ public class TestNodeClusterCoordinator { final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { @Override - void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) { + void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } }; @@ -170,7 +170,7 @@ public class TestNodeClusterCoordinator { final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { @Override - void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) { + void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } }; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java new file mode 100644 index 0000000000..dbd8c00411 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.integration; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.test.TestingServer; +import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Cluster { + private static final Logger logger = LoggerFactory.getLogger(Cluster.class); + + private final Set nodes = new HashSet<>(); + private final TestingServer zookeeperServer; + + public Cluster() { + try { + zookeeperServer = new TestingServer(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + + public void start() { + try { + zookeeperServer.start(); + } catch (final RuntimeException e) { + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); + } + + while (getZooKeeperConnectString() == null) { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + } + } + + logger.info("Start ZooKeeper Server on Port {}, with temporary directory {}", zookeeperServer.getPort(), zookeeperServer.getTempDirectory()); + } + + public void stop() { + for (final Node node : nodes) { + try { + if (node.isRunning()) { + node.stop(); + } + } catch (Exception e) { + logger.error("Failed to shut down " + node, e); + } + } + + try { + zookeeperServer.stop(); + zookeeperServer.close(); + } catch (final Exception e) { + } + } + + + public String getZooKeeperConnectString() { + return zookeeperServer.getConnectString(); + } + + public Set getNodes() { + return Collections.unmodifiableSet(nodes); + } + + + public Node createNode() { + NiFiProperties.getInstance().setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString()); + NiFiProperties.getInstance().setProperty(NiFiProperties.CLUSTER_IS_NODE, "true"); + + final NiFiProperties properties = NiFiProperties.getInstance().copy(); + final Node node = new Node(properties); + node.start(); + nodes.add(node); + + return node; + } + + public Node waitForClusterCoordinator(final long time, final TimeUnit timeUnit) { + return ClusterUtils.waitUntilNonNull(time, timeUnit, + () -> getNodes().stream().filter(node -> node.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null)); + } + + public Node waitForPrimaryNode(final long time, final TimeUnit timeUnit) { + return ClusterUtils.waitUntilNonNull(time, timeUnit, + () -> getNodes().stream().filter(node -> node.getRoles().contains(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null)); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java new file mode 100644 index 0000000000..6881ca233c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.integration; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ClusterConnectionIT { + + @BeforeClass + public static void setup() { + System.setProperty("nifi.properties.file.path", "src/test/resources/conf/nifi.properties"); + } + + @Test(timeout = 20000) + public void testSingleNode() throws InterruptedException { + final Cluster cluster = new Cluster(); + cluster.start(); + + try { + final Node firstNode = cluster.createNode(); + firstNode.waitUntilConnected(10, TimeUnit.SECONDS); + + firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS); + firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, TimeUnit.SECONDS); + } finally { + cluster.stop(); + } + } + + @Test(timeout = 60000) + public void testThreeNodeCluster() throws InterruptedException { + final Cluster cluster = new Cluster(); + cluster.start(); + + try { + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); + final Node thirdNode = cluster.createNode(); + + firstNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 1 Connected ****"); + secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 2 Connected ****"); + thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 3 Connected ****"); + + final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + final Node primaryNode = cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); + System.out.println("\n\n"); + System.out.println("Cluster Coordinator = " + clusterCoordinator); + System.out.println("Primary Node = " + primaryNode); + System.out.println("\n\n"); + } finally { + cluster.stop(); + } + } + + @Test(timeout = 60000) + public void testNewCoordinatorElected() throws IOException { + final Cluster cluster = new Cluster(); + cluster.start(); + + try { + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); + + firstNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 1 Connected ****"); + secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 2 Connected ****"); + + final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + clusterCoordinator.stop(); + + final Node otherNode = firstNode == clusterCoordinator ? secondNode : firstNode; + otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS); + } finally { + cluster.stop(); + } + } + + @Test(timeout = 60000) + public void testReconnectGetsCorrectClusterTopology() throws IOException { + final Cluster cluster = new Cluster(); + cluster.start(); + + try { + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); + final Node thirdNode = cluster.createNode(); + + firstNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 1 Connected ****"); + secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 2 Connected ****"); + thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 3 Connected ****"); + + // shutdown node + secondNode.stop(); + + System.out.println("\n\nNode 2 Shut Down\n\n"); + + // wait for node 1 and 3 to recognize that node 2 is gone + Stream.of(firstNode, thirdNode).forEach(node -> { + node.assertNodeDisconnects(secondNode.getIdentifier(), 5, TimeUnit.SECONDS); + }); + + // restart node + secondNode.start(); + System.out.println("\n\nNode 2 Restarted\n\n"); + + secondNode.waitUntilConnected(20, TimeUnit.SECONDS); + System.out.println("\n\nNode 2 Reconnected\n\n"); + + // wait for all 3 nodes to agree that node 2 is connected + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS, + () -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED); + }); + + // Ensure that all 3 nodes see a cluster of 3 connected nodes. + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + node.assertNodeIsConnected(firstNode.getIdentifier()); + node.assertNodeIsConnected(secondNode.getIdentifier()); + node.assertNodeIsConnected(thirdNode.getIdentifier()); + }); + + // Ensure that we get both a cluster coordinator and a primary node elected + cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); + } finally { + cluster.stop(); + } + } + + + @Test(timeout = 60000) + public void testRestartAllNodes() throws IOException { + final Cluster cluster = new Cluster(); + cluster.start(); + + try { + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); + final Node thirdNode = cluster.createNode(); + + firstNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 1 Connected ****"); + secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 2 Connected ****"); + thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 3 Connected ****"); + + // shutdown node + firstNode.stop(); + secondNode.stop(); + thirdNode.stop(); + + System.out.println("\n\nRestarting all nodes\n\n"); + thirdNode.start(); + firstNode.start(); + secondNode.start(); + + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + node.waitUntilConnected(10, TimeUnit.SECONDS); + }); + + // wait for all 3 nodes to agree that node 2 is connected + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS, + () -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED); + }); + + // Ensure that all 3 nodes see a cluster of 3 connected nodes. + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + node.assertNodeConnects(firstNode.getIdentifier(), 10, TimeUnit.SECONDS); + node.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS); + node.assertNodeConnects(thirdNode.getIdentifier(), 10, TimeUnit.SECONDS); + }); + + // Ensure that we get both a cluster coordinator and a primary node elected + cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); + } finally { + cluster.stop(); + } + } + + + @Test(timeout = 30000) + public void testHeartbeatsMonitored() throws IOException { + final Cluster cluster = new Cluster(); + cluster.start(); + + try { + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); + + firstNode.waitUntilConnected(10, TimeUnit.SECONDS); + secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + + secondNode.suspendHeartbeating(); + + // Heartbeat interval in nifi.properties is set to 1 sec. This means that the node should be kicked out + // due to lack of heartbeat after 8 times this amount of time, or 8 seconds. + firstNode.assertNodeDisconnects(secondNode.getIdentifier(), 12, TimeUnit.SECONDS); + + secondNode.resumeHeartbeating(); + firstNode.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS); + } finally { + cluster.stop(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java new file mode 100644 index 0000000000..972d2c7a5a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.integration; + +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; + +public class ClusterUtils { + + public static void waitUntilConditionMet(final long time, final TimeUnit timeUnit, final BooleanSupplier test) { + final long nanosToWait = timeUnit.toNanos(time); + final long start = System.nanoTime(); + final long maxTime = start + nanosToWait; + + while (!test.getAsBoolean()) { + if (System.nanoTime() > maxTime) { + throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit); + } + } + } + + public static T waitUntilNonNull(final long time, final TimeUnit timeUnit, final Supplier test) { + final long nanosToWait = timeUnit.toNanos(time); + final long start = System.nanoTime(); + final long maxTime = start + nanosToWait; + + T returnVal; + while ((returnVal = test.get()) == null) { + if (System.nanoTime() > maxTime) { + throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit); + } + } + + return returnVal; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java new file mode 100644 index 0000000000..5bfe83c638 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.integration; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.cluster.ReportedEvent; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor; +import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; +import org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender; +import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolListener; +import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener; +import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener; +import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener; +import org.apache.nifi.cluster.protocol.impl.StandardClusterCoordinationProtocolSender; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.StandardFlowService; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.revision.RevisionManager; +import org.junit.Assert; +import org.mockito.Mockito; + +public class Node { + private final NodeIdentifier nodeId; + private final NiFiProperties nodeProperties; + + private final List reportedEvents = Collections.synchronizedList(new ArrayList()); + private final RevisionManager revisionManager; + + private NodeClusterCoordinator clusterCoordinator; + private CuratorNodeProtocolSender protocolSender; + private FlowController flowController; + private StandardFlowService flowService; + + private ProtocolListener protocolListener; + + private volatile boolean running = false; + + private ScheduledExecutorService executor = new FlowEngine(8, "Node tasks", true); + + public Node(final NiFiProperties properties) { + this(new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null), properties); + } + + public Node(final NodeIdentifier nodeId, final NiFiProperties properties) { + this.nodeId = nodeId; + this.nodeProperties = properties; + + nodeProperties.setProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT, String.valueOf(nodeId.getSocketPort())); + nodeProperties.setProperty(NiFiProperties.WEB_HTTP_PORT, String.valueOf(nodeId.getApiPort())); + + revisionManager = Mockito.mock(RevisionManager.class); + Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections. emptyList()); + } + + + public synchronized void start() { + running = true; + + protocolSender = createNodeProtocolSender(); + clusterCoordinator = createClusterCoordinator(); + clusterCoordinator.setLocalNodeIdentifier(nodeId); + clusterCoordinator.setConnected(true); + + final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor(); + flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties, + null, null, StringEncryptor.createEncryptor(), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, VariableRegistry.EMPTY_REGISTRY); + + try { + flowController.initializeFlow(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + final NodeProtocolSenderListener senderListener = new NodeProtocolSenderListener(protocolSender, protocolListener); + try { + flowController.getStateManagerProvider().getStateManager("Cluster Node Configuration").setState(Collections.singletonMap("Node UUID", nodeId.getId()), Scope.LOCAL); + + flowService = StandardFlowService.createClusteredInstance(flowController, nodeProperties, senderListener, clusterCoordinator, + StringEncryptor.createEncryptor(), revisionManager, Mockito.mock(Authorizer.class)); + + flowService.start(); + flowService.load(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void stop() throws IOException { + running = false; + + flowController.shutdown(true); + flowService.stop(true); + + clusterCoordinator.shutdown(); + executor.shutdownNow(); + + // protocol listener is closed by flow controller + } + + public void suspendHeartbeating() { + flowController.suspendHeartbeats(); + } + + public void resumeHeartbeating() { + flowController.resumeHeartbeats(); + } + + public NodeIdentifier getIdentifier() { + return nodeId; + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(nodeId).build(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof Node)) { + return false; + } + + return getIdentifier().equals(((Node) obj).getIdentifier()); + } + + @Override + public String toString() { + return "Node[id=" + getIdentifier() + ", started=" + isRunning() + "]"; + } + + public boolean isRunning() { + return running; + } + + private static int createPort() { + // get an unused port + while (true) { + try (ServerSocket ss = new ServerSocket(0)) { + return ss.getLocalPort(); + } catch (final IOException ioe) { + } + } + } + + public Set getRoles() { + final NodeConnectionStatus status = getConnectionStatus(); + return status == null ? Collections.emptySet() : status.getRoles(); + } + + public NodeConnectionStatus getConnectionStatus() { + return clusterCoordinator.getConnectionStatus(nodeId); + } + + @SuppressWarnings("unchecked") + private CuratorNodeProtocolSender createNodeProtocolSender() { + final SocketConfiguration socketConfig = new SocketConfiguration(); + socketConfig.setSocketTimeout(3000); + socketConfig.setReuseAddress(true); + + final ProtocolContext protocolContext = new JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT); + final CuratorNodeProtocolSender protocolSender = new CuratorNodeProtocolSender(socketConfig, protocolContext, nodeProperties); + return protocolSender; + } + + @SuppressWarnings("unchecked") + private ClusterCoordinationProtocolSender createCoordinatorProtocolSender() { + final SocketConfiguration socketConfig = new SocketConfiguration(); + socketConfig.setSocketTimeout(3000); + socketConfig.setReuseAddress(true); + + final ProtocolContext protocolContext = new JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT); + return new StandardClusterCoordinationProtocolSender(socketConfig, protocolContext, 1); + } + + private HeartbeatMonitor createHeartbeatMonitor() { + return new ClusterProtocolHeartbeatMonitor(clusterCoordinator, protocolListener, nodeProperties); + } + + @SuppressWarnings("unchecked") + private NodeClusterCoordinator createClusterCoordinator() { + final EventReporter eventReporter = new EventReporter() { + @Override + public void reportEvent(Severity severity, String category, String message) { + reportedEvents.add(new ReportedEvent(nodeId, severity, message)); + } + }; + + final ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration(); + serverSocketConfiguration.setSocketTimeout(5000); + final ProtocolContext protocolContext = new JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT); + + protocolListener = new SocketProtocolListener(3, Integer.parseInt(nodeProperties.getProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT)), serverSocketConfiguration, protocolContext); + try { + protocolListener.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener); + return new NodeClusterCoordinator(protocolSenderListener, eventReporter, null, revisionManager, nodeProperties); + } + + + public ClusterCoordinator getClusterCoordinator() { + return clusterCoordinator; + } + + + // + // Methods for checking conditions + // + public boolean isConnected() { + final NodeConnectionStatus status = getConnectionStatus(); + if (status == null) { + return false; + } + + return status.getState() == NodeConnectionState.CONNECTED; + } + + // + // Methods to wait for conditions + // + public void waitUntilConnected(final long time, final TimeUnit timeUnit) { + ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> isConnected()); + } + + public void waitUntilElectedForRole(final String roleName, final long time, final TimeUnit timeUnit) { + ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> getRoles().contains(roleName)); + } + + // Assertions + /** + * Assert that the node with the given ID connects (According to this node!) within the given amount of time + * + * @param nodeId id of the node + * @param time how long to wait + * @param timeUnit unit of time provided by the 'time' argument + */ + public void assertNodeConnects(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) { + ClusterUtils.waitUntilConditionMet(time, timeUnit, + () -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.CONNECTED); + } + + + /** + * Assert that the node with the given ID disconnects (According to this node!) within the given amount of time + * + * @param nodeId id of the node + * @param time how long to wait + * @param timeUnit unit of time provided by the 'time' argument + */ + public void assertNodeDisconnects(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) { + ClusterUtils.waitUntilConditionMet(time, timeUnit, + () -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.DISCONNECTED); + } + + + /** + * Asserts that the node with the given ID is currently connected (According to this node!) + * + * @param nodeId id of the node + */ + public void assertNodeIsConnected(final NodeIdentifier nodeId) { + Assert.assertEquals(NodeConnectionState.CONNECTED, getClusterCoordinator().getConnectionStatus(nodeId).getState()); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java new file mode 100644 index 0000000000..fb80ab2db9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.integration; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.components.state.StateProvider; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.controller.state.StandardStateMap; + +public class NopStateProvider implements StateProvider { + private final String id = UUID.randomUUID().toString(); + private final Map> componentStateMap = new HashMap<>(); + + @Override + public Collection validate(ValidationContext context) { + return Collections.emptyList(); + } + + @Override + public PropertyDescriptor getPropertyDescriptor(String name) { + return null; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + } + + @Override + public List getPropertyDescriptors() { + return Collections.emptyList(); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public void initialize(StateProviderInitializationContext context) throws IOException { + } + + @Override + public void shutdown() { + } + + @Override + public synchronized void setState(Map state, String componentId) throws IOException { + final Map stateMap = componentStateMap.computeIfAbsent(componentId, compId -> new HashMap()); + stateMap.clear(); + stateMap.putAll(state); + } + + @Override + public synchronized StateMap getState(String componentId) throws IOException { + return new StandardStateMap(componentStateMap.computeIfAbsent(componentId, compId -> new HashMap()), 0L); + } + + @Override + public synchronized boolean replace(StateMap oldValue, Map newValue, String componentId) throws IOException { + return false; + } + + @Override + public void clear(String componentId) throws IOException { + } + + @Override + public void onComponentRemoved(String componentId) throws IOException { + } + + @Override + public void enable() { + } + + @Override + public void disable() { + } + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public Scope[] getSupportedScopes() { + return new Scope[] {Scope.LOCAL}; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties index 78a649b06b..44b2a4e84e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties @@ -30,11 +30,25 @@ nifi.ui.autorefresh.interval=30 sec nifi.nar.library.directory=./target/lib nifi.nar.working.directory=./target/work/nar/ +#################### +# State Management # +#################### +nifi.state.management.configuration.file=src/test/resources/conf/state-management.xml +# The ID of the local state provider +nifi.state.management.provider.local=local-provider +# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster. +nifi.state.management.provider.cluster=zk-provider +# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server +nifi.state.management.embedded.zookeeper.start=false +# Properties file that provides the ZooKeeper properties to use if is set to true +nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties + # H2 Settings nifi.database.directory=./database_repository nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE # FlowFile Repository +nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.VolatileFlowFileRepository nifi.flowfile.repository.directory=./target/test-repo nifi.flowfile.repository.partitions=1 nifi.flowfile.repository.checkpoint.interval=2 mins @@ -56,10 +70,11 @@ nifi.provenance.repository.max.storage.time=24 hours nifi.provenance.repository.max.storage.size=1 GB nifi.provenance.repository.rollover.time=30 secs nifi.provenance.repository.rollover.size=100 MB +nifi.provenance.repository.implementation=org.apache.nifi.provenance.MockProvenanceRepository # Site to Site properties -nifi.remote.input.socket.port=9990 -nifi.remote.input.secure=true +nifi.remote.input.socket.port= +nifi.remote.input.secure=false # web properties # nifi.web.war.directory=./target/lib @@ -89,7 +104,7 @@ nifi.security.support.new.account.requests= nifi.security.default.user.roles= # cluster common properties (cluster manager and nodes must have same values) # -nifi.cluster.protocol.heartbeat.interval=5 sec +nifi.cluster.protocol.heartbeat.interval=1 secs nifi.cluster.protocol.is.secure=false nifi.cluster.protocol.socket.timeout=30 sec nifi.cluster.protocol.connection.handshake.timeout=45 sec diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/state-management.xml new file mode 100644 index 0000000000..6e95b156de --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/state-management.xml @@ -0,0 +1,57 @@ + + + + + + + local-provider + org.apache.nifi.cluster.integration.NopStateProvider + + + + + zk-provider + org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider + localhost:8320 + /nifi + 10 seconds + Open + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml index 92eb78cbbf..0f5adc83c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml @@ -23,6 +23,11 @@ + + + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index fdb6f58d0c..2635fc472b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -581,7 +581,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); if (configuredForClustering) { - leaderElectionManager = new CuratorLeaderElectionManager(4); + leaderElectionManager = new CuratorLeaderElectionManager(4, properties); heartbeater = new ClusterProtocolHeartbeater(protocolSender, properties); // Check if there is already a cluster coordinator elected. If not, go ahead diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index b634e74298..091e59ca09 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -788,7 +788,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } catch (final Exception pe) { // could not create a socket and communicate with manager - logger.warn("Failed to connect to cluster due to: " + pe, pe); + logger.warn("Failed to connect to cluster due to: " + pe); + if (logger.isDebugEnabled()) { + logger.warn("", pe); + } + if (retryOnCommsFailure) { try { Thread.sleep(response == null ? 5000 : response.getTryLaterSeconds()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index 9d076c022c..7bf749410d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.leader.election; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -48,10 +49,13 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { private final Map leaderRoles = new HashMap<>(); private final Map registeredRoles = new HashMap<>(); - public CuratorLeaderElectionManager(final int threadPoolSize) { - leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true); - final NiFiProperties properties = NiFiProperties.getInstance(); + public CuratorLeaderElectionManager(final int threadPoolSize) { + this(threadPoolSize, NiFiProperties.getInstance()); + } + + public CuratorLeaderElectionManager(final int threadPoolSize, final Properties properties) { + leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true); zkConfig = ZooKeeperClientConfig.createConfig(properties); } @@ -65,7 +69,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { stopped = false; final RetryPolicy retryPolicy = new RetryForever(5000); - curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); + curatorClient = CuratorFrameworkFactory.builder() + .connectString(zkConfig.getConnectString()) + .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis()) + .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis()) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); + curatorClient.start(); // Call #register for each already-registered role. This will @@ -227,6 +238,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e); logger.error("", e); leader = false; + Thread.sleep(1000L); return; } } @@ -251,8 +263,10 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { try { listener.onLeaderRelinquish(); } catch (final Exception e) { - logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e); - logger.error("", e); + logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); + } } } }