diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java index 08069597f5..ee6e930da9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java @@ -23,7 +23,7 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java index 03de329092..c5ee6718cc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index b1a088edba..e50d8fa901 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -39,10 +39,10 @@ import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback; import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.event.NodeEvent; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java index c13e9d95c6..bf0138e9d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; import org.apache.nifi.cluster.protocol.NodeIdentifier; /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java index e10e9c399b..1ba9c8f1c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Signals that an operation to be performed on a cluster has been invoked at an illegal or inappropriate time. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java index 7a828e3f0b..8694878166 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when the cluster is unable to service a request because no nodes are connected. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java index 9616ad3ca6..9198c5ede7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when the cluster is unable to service a request because no nodes returned a response. When the given request is not mutable the nodes are left in their previous * state. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java index 2f59eed08d..c788f16bc4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when a disconnection request to a node failed. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java index be3edf4572..e7c2baa176 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when a reconnection request to a node failed. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java index 2d43e8a265..21843cd545 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when a request is made for a node that does not exist. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java index a0c8648e04..ce2017ee87 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.integration; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -40,7 +41,7 @@ public class Cluster { private final Set nodes = new HashSet<>(); private final TestingServer zookeeperServer; - public Cluster() { + public Cluster() throws IOException { try { zookeeperServer = new TestingServer(); } catch (final Exception e) { @@ -114,7 +115,8 @@ public class Cluster { addProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString()); addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true"); - final Node node = new Node(NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps)); + final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps); + final Node node = new Node(nifiProperties); node.start(); nodes.add(node); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java index 3439263cc1..45a2e4261c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java @@ -46,7 +46,7 @@ public class ClusterConnectionIT { } @Before - public void createCluster() { + public void createCluster() throws IOException { cluster = new Cluster(); cluster.start(); } @@ -140,9 +140,8 @@ public class ClusterConnectionIT { cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); } - @Test(timeout = 60000) - public void testRestartAllNodes() throws IOException { + public void testRestartAllNodes() throws IOException, InterruptedException { final Node firstNode = cluster.createNode(); final Node secondNode = cluster.createNode(); final Node thirdNode = cluster.createNode(); @@ -164,7 +163,13 @@ public class ClusterConnectionIT { firstNode.start(); secondNode.start(); - cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); + + firstNode.waitUntilConnected(20, TimeUnit.SECONDS); + System.out.println("\n\n\n**** Node 1 Re-Connected ****\n\n\n"); + secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 2 Re-Connected ****"); + thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 3 Re-Connected ****"); // wait for all 3 nodes to agree that node 2 is connected Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { @@ -205,7 +210,7 @@ public class ClusterConnectionIT { otherNode.assertNodeConnects(nodeToSuspend.getIdentifier(), 10, TimeUnit.SECONDS); } - @Test + @Test(timeout = 60000) public void testNodeInheritsClusterTopologyOnHeartbeat() throws InterruptedException { final Node node1 = cluster.createNode(); final Node node2 = cluster.createNode(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index b9372a67e4..7ba718bdac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -87,8 +87,9 @@ public class Node { private ScheduledExecutorService executor = new FlowEngine(8, "Node tasks", true); + public Node(final NiFiProperties properties) { - this(new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null), properties); + this(createNodeId(), properties); } public Node(final NodeIdentifier nodeId, final NiFiProperties properties) { @@ -121,6 +122,10 @@ public class Node { } + private static NodeIdentifier createNodeId() { + return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null); + } + public synchronized void start() { running = true; @@ -148,6 +153,7 @@ public class Node { StringEncryptor.createEncryptor(nodeProperties), revisionManager, Mockito.mock(Authorizer.class)); flowService.start(); + flowService.load(null); } catch (Exception e) { throw new RuntimeException(e); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java similarity index 95% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java index e93acca710..22de7c6da5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.cluster.manager.exception; +package org.apache.nifi.cluster.exception; /** * The base exception class for cluster related exceptions. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java similarity index 92% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java index 89b6722a30..10e8457f3a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java @@ -15,7 +15,9 @@ * limitations under the License. */ -package org.apache.nifi.cluster.manager.exception; +package org.apache.nifi.cluster.exception; + +import org.apache.nifi.cluster.exception.ClusterException; public class NoClusterCoordinatorException extends ClusterException { private static final long serialVersionUID = -1782098541351698293L; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 4e675583f4..3acba9487c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -591,16 +591,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // kicking everyone out. This way, we instead inherit the cluster flow before we attempt to be // the coordinator. LOG.info("Checking if there is already a Cluster Coordinator Elected..."); - final NodeIdentifier electedCoordinatorNodeId = clusterCoordinator.getElectedActiveCoordinatorNode(); - if (electedCoordinatorNodeId == null) { + final String clusterCoordinatorAddress = leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR); + if (StringUtils.isEmpty(clusterCoordinatorAddress)) { LOG.info("It appears that no Cluster Coordinator has been Elected yet. Registering for Cluster Coordinator Role."); - registerForClusterCoordinator(); + registerForClusterCoordinator(true); } else { - LOG.info("The Elected Cluster Coordinator is {}. Will not register to be elected for this role until after connecting " - + "to the cluster and inheriting the cluster's flow.", electedCoordinatorNodeId); + // At this point, we have determined that there is a Cluster Coordinator elected. It is important to note, though, + // that if we are running an embedded ZooKeeper, and we have just restarted the cluster (at least the nodes that run the + // embedded ZooKeeper), that we could possibly determine that the Cluster Coordinator is at an address that is not really + // valid. This is because the latest stable ZooKeeper does not support "Container ZNodes" and as a result the ZNodes that + // are created are persistent, not ephemeral. Upon restart, we can get this persisted value, even though the node that belongs + // to that address has not started. ZooKeeper/Curator will recognize this after a while and delete the ZNode. As a result, + // we may later determine that there is in fact no Cluster Coordinator. If this happens, we will automatically register for + // Cluster Coordinator through the StandardFlowService. + LOG.info("The Election for Cluster Coordinator has already begun (Leader is {}). Will not register to be elected for this role until after connecting " + + "to the cluster and inheriting the cluster's flow.", clusterCoordinatorAddress); + registerForClusterCoordinator(false); } leaderElectionManager.start(); + heartbeatMonitor.start(); } else { heartbeater = null; } @@ -3321,8 +3331,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return configuredForClustering; } - private void registerForClusterCoordinator() { - final String participantId = heartbeatMonitor.getHeartbeatAddress(); + void registerForClusterCoordinator(final boolean participate) { + final String participantId = participate ? heartbeatMonitor.getHeartbeatAddress() : null; leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override @@ -3342,12 +3352,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderElection() { LOG.info("This node elected Active Cluster Coordinator"); - heartbeatMonitor.start(); // ensure heartbeat monitor is started } }, participantId); } - private void registerForPrimaryNode() { + void registerForPrimaryNode() { final String participantId = heartbeatMonitor.getHeartbeatAddress(); leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new LeaderElectionStateChangeListener() { @@ -3401,7 +3410,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Participate in Leader Election for Heartbeat Monitor. Start the heartbeat monitor // if/when we become leader and stop it when we lose leader role - registerForClusterCoordinator(); + registerForClusterCoordinator(true); leaderElectionManager.start(); stateManagerProvider.enableClusterProvider(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index c5f9684b10..af96cfd555 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -54,6 +54,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.DataFlow; @@ -787,6 +788,17 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // we received a successful connection response from manager break; } + } catch (final NoClusterCoordinatorException ncce) { + logger.warn("There is currently no Cluster Coordinator. This often happens upon restart of NiFi when running an embedded ZooKeeper. Will register this node " + + "to become the active Cluster Coordinator and will attempt to connect to cluster again"); + controller.registerForClusterCoordinator(true); + + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } } catch (final Exception pe) { // could not create a socket and communicate with manager logger.warn("Failed to connect to cluster due to: " + pe); @@ -798,6 +810,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { try { Thread.sleep(response == null ? 5000 : response.getTryLaterSeconds()); } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); break; } } else { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index dfe456b30c..efc736646c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -31,6 +31,7 @@ import org.apache.curator.retry.RetryNTimes; import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.util.NiFiProperties; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.common.PathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,16 +63,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { stopped = false; - final RetryPolicy retryPolicy = new RetryNTimes(1, 100); - curatorClient = CuratorFrameworkFactory.builder() - .connectString(zkConfig.getConnectString()) - .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis()) - .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis()) - .retryPolicy(retryPolicy) - .defaultData(new byte[0]) - .build(); - - curatorClient.start(); + curatorClient = createClient(); // Call #register for each already-registered role. This will // cause us to start listening for leader elections for that @@ -84,52 +76,60 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { logger.info("{} started", this); } - @Override - public synchronized void register(final String roleName) { - register(roleName, null); - } - @Override public void register(String roleName, LeaderElectionStateChangeListener listener) { register(roleName, listener, null); } + private String getElectionPath(final String roleName) { + final String rootPath = zkConfig.getRootPath(); + final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName; + return leaderPath; + } + @Override public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) { logger.debug("{} Registering new Leader Selector for role {}", this, roleName); - if (leaderRoles.containsKey(roleName)) { + // If we already have a Leader Role registered and either the Leader Role is participating in election, + // or the given participant id == null (don't want to participant in election) then we're done. + final LeaderRole currentRole = leaderRoles.get(roleName); + if (currentRole != null && (currentRole.isParticipant() || participantId == null)) { logger.info("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName); return; } - final String rootPath = zkConfig.getRootPath(); - final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName; + final String leaderPath = getElectionPath(roleName); try { - PathUtils.validatePath(rootPath); + PathUtils.validatePath(leaderPath); } catch (final IllegalArgumentException e) { throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name"); } registeredRoles.put(roleName, new RegisteredRole(participantId, listener)); + final boolean isParticipant = participantId != null && !participantId.trim().isEmpty(); + if (!isStopped()) { final ElectionListener electionListener = new ElectionListener(roleName, listener); final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener); - leaderSelector.autoRequeue(); - if (participantId != null) { + if (isParticipant) { + leaderSelector.autoRequeue(); leaderSelector.setId(participantId); + leaderSelector.start(); } - leaderSelector.start(); - - final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener); + final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener, isParticipant); leaderRoles.put(roleName, leaderRole); } - logger.info("{} Registered new Leader Selector for role {}", this, roleName); + if (isParticipant) { + logger.info("{} Registered new Leader Selector for role {}; this node is an active participant in the election.", this, roleName); + } else { + logger.info("{} Registered new Leader Selector for role {}; this node is a silent observer in the election.", this, roleName); + } } @Override @@ -151,9 +151,15 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { public synchronized void stop() { stopped = true; - for (final LeaderRole role : leaderRoles.values()) { + for (final Map.Entry entry : leaderRoles.entrySet()) { + final LeaderRole role = entry.getValue(); final LeaderSelector selector = role.getLeaderSelector(); - selector.close(); + + try { + selector.close(); + } catch (final Exception e) { + logger.warn("Failed to close Leader Selector for {}", entry.getKey(), e); + } } leaderRoles.clear(); @@ -192,9 +198,13 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { @Override public String getLeader(final String roleName) { + if (isStopped()) { + return determineLeaderExternal(roleName); + } + final LeaderRole role = getLeaderRole(roleName); if (role == null) { - return null; + return determineLeaderExternal(roleName); } Participant participant; @@ -217,14 +227,92 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { return participantId; } + + /** + * Determines whether or not leader election has already begun for the role with the given name + * + * @param roleName the role of interest + * @return true if leader election has already begun, false if it has not or if unable to determine this. + */ + @Override + public boolean isLeaderElected(final String roleName) { + final String leaderAddress = determineLeaderExternal(roleName); + return !StringUtils.isEmpty(leaderAddress); + } + + + /** + * Use a new Curator client to determine which node is the elected leader for the given role. + * + * @param roleName the name of the role + * @return the id of the elected leader, or null if no leader has been selected or if unable to determine + * the leader from ZooKeeper + */ + private String determineLeaderExternal(final String roleName) { + final CuratorFramework client = createClient(); + try { + final LeaderSelectorListener electionListener = new LeaderSelectorListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + } + + @Override + public void takeLeadership(CuratorFramework client) throws Exception { + } + }; + + final String electionPath = getElectionPath(roleName); + + // Note that we intentionally do not auto-requeue here, and we do not start the selector. We do not + // want to join the leader election. We simply want to observe. + final LeaderSelector selector = new LeaderSelector(client, electionPath, electionListener); + + try { + final Participant leader = selector.getLeader(); + return leader == null ? null : leader.getId(); + } catch (final KeeperException.NoNodeException nne) { + // If there is no ZNode, then there is no elected leader. + return null; + } catch (final Exception e) { + logger.warn("Unable to determine the Elected Leader for role '{}' due to {}; assuming no leader has been elected", roleName, e.toString()); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + + return null; + } + } finally { + client.close(); + } + } + + private CuratorFramework createClient() { + // Create a new client because we don't want to try indefinitely for this to occur. + final RetryPolicy retryPolicy = new RetryNTimes(1, 100); + + final CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(zkConfig.getConnectString()) + .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis()) + .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis()) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); + + client.start(); + return client; + } + + private static class LeaderRole { private final LeaderSelector leaderSelector; private final ElectionListener electionListener; + private final boolean participant; - public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener) { + public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener, final boolean participant) { this.leaderSelector = leaderSelector; this.electionListener = electionListener; + this.participant = participant; } public LeaderSelector getLeaderSelector() { @@ -234,6 +322,10 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { public boolean isLeader() { return electionListener.isLeader(); } + + public boolean isParticipant() { + return participant; + } } private static class RegisteredRole { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java index ef36528c9d..d9d4e71836 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java @@ -24,14 +24,9 @@ public interface LeaderElectionManager { void start(); /** - * Adds a new role for which a leader is required - * - * @param roleName the name of the role - */ - void register(String roleName); - - /** - * Adds a new role for which a leader is required, without providing a Participant ID + * Adds a new role for which a leader is required, without participating in the leader election. I.e., this node + * will not be elected leader but will passively observe changes to the leadership. This allows calls to {@link #isLeader(String)} + * and {@link #getLeader(String)} to know which node is currently elected the leader. * * @param roleName the name of the role * @param listener a listener that will be called when the node gains or relinquishes @@ -40,7 +35,8 @@ public interface LeaderElectionManager { void register(String roleName, LeaderElectionStateChangeListener listener); /** - * Adds a new role for which a leader is required, providing the given value for this node as the Participant ID + * Adds a new role for which a leader is required, providing the given value for this node as the Participant ID. If the Participant ID + * is null, this node will never be elected leader but will passively observe changes to the leadership. * * @param roleName the name of the role * @param listener a listener that will be called when the node gains or relinquishes @@ -90,4 +86,12 @@ public interface LeaderElectionManager { * again, all previously registered roles will still be registered. */ void stop(); + + /** + * Returns true if a leader has been elected for the given role, false otherwise. + * + * @param roleName the name of the role + * @return true if a leader has been elected, false otherwise. + */ + boolean isLeaderElected(String roleName); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java index a2ed86e94d..182e83a08a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java @@ -28,10 +28,6 @@ public class StandaloneLeaderElectionManager implements LeaderElectionManager { public void start() { } - @Override - public void register(final String roleName) { - } - @Override public void register(final String roleName, final LeaderElectionStateChangeListener listener) { } @@ -62,4 +58,9 @@ public class StandaloneLeaderElectionManager implements LeaderElectionManager { @Override public void stop() { } + + @Override + public boolean isLeaderElected(String roleName) { + return false; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java index cbb96b105c..c65501cc9d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java @@ -45,7 +45,7 @@ public class NarCloseable implements Closeable { frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader(); } catch (final Exception e) { // This should never happen in a running instance, but it will occur in unit tests - logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without change ClassLoaders."); + logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without changing ClassLoaders."); if (logger.isDebugEnabled()) { logger.error("", e); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml index 871265e699..1a52cc6332 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml @@ -96,6 +96,7 @@ + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java index e17b1474c5..c39fbc37c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java @@ -23,9 +23,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.util.NiFiProperties; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java index 8269d7925a..021f2165eb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java @@ -40,9 +40,9 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.reporting.ReportingTaskProvider; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 880186de7d..b233a35f20 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -30,8 +30,8 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.Snippet; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java index 2a67cf8306..dd332f01a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java @@ -19,7 +19,8 @@ package org.apache.nifi.web.api.config; import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; -import org.apache.nifi.cluster.manager.exception.ClusterException; + +import org.apache.nifi.cluster.exception.ClusterException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java index 4b15a70402..c052c88109 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java @@ -20,7 +20,7 @@ package org.apache.nifi.web.api.config; import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; import org.apache.nifi.util.StringUtils; import org.slf4j.Logger;