diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 04f8dfaf3e..3f8fa76bf5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -37,7 +37,7 @@ import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryForever;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@@ -46,6 +46,7 @@ import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
@@ -61,8 +62,8 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
@@ -106,7 +107,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.firewall = firewall;
this.revisionManager = revisionManager;
- final RetryPolicy retryPolicy = new RetryForever(5000);
+ final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
@@ -278,6 +279,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+ final int numConnected = getNodeIdentifiers(NodeConnectionState.CONNECTED).size();
+ if (numConnected == 1) {
+ throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
+ }
+
logger.info("Requesting that {} disconnect due to {}", nodeId, explanation == null ? disconnectionCode : explanation);
updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation));
@@ -548,15 +554,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public boolean isActiveClusterCoordinator() {
final NodeIdentifier self = getLocalNodeIdentifier();
- if (self == null) {
- return false;
- }
-
- final NodeConnectionStatus selfStatus = getConnectionStatus(self);
- if (selfStatus == null) {
- return false;
- }
- return selfStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR);
+ return self != null && self.equals(getElectedActiveCoordinatorNode());
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
index 1199ef7a9b..e19a338fbe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
@@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
- * Represents the exceptional case when a disconnection request is issued to a node that cannot be disconnected (e.g., last node in cluster, node is primary node).
- *
+ * Represents the exceptional case when a disconnection request is issued to a node that cannot be disconnected (e.g., last node in cluster).
*/
public class IllegalNodeDisconnectionException extends IllegalClusterStateException {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 06f4252e17..25c55a06ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -48,6 +49,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.web.revision.RevisionManager;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -336,8 +338,9 @@ public class TestNodeClusterCoordinator {
@Test(timeout = 5000)
public void testRequestNodeDisconnect() throws InterruptedException {
// Add a connected node
- final NodeIdentifier nodeId = createNodeId(1);
- coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
+ final NodeIdentifier nodeId1 = createNodeId(1);
+ coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it
while (nodeStatusChangeMessages.isEmpty()) {
@@ -345,18 +348,39 @@ public class TestNodeClusterCoordinator {
}
nodeStatusChangeMessages.clear();
- coordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
- assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId).getState());
+ coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
+ assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId1).getState());
while (nodeStatusChangeMessages.isEmpty()) {
Thread.sleep(10L);
}
final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0);
- assertEquals(nodeId, msg.getNodeId());
+ assertEquals(nodeId1, msg.getNodeId());
assertEquals(NodeConnectionState.DISCONNECTED, msg.getNodeConnectionStatus().getState());
}
+ @Test(timeout = 5000)
+ public void testCannotDisconnectLastNode() throws InterruptedException {
+ // Add a connected node
+ final NodeIdentifier nodeId1 = createNodeId(1);
+ coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
+
+ // wait for the status change message and clear it
+ while (nodeStatusChangeMessages.isEmpty()) {
+ Thread.sleep(10L);
+ }
+ nodeStatusChangeMessages.clear();
+
+ try {
+ coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
+ Assert.fail("Expected an IllegalNodeDisconnectionException when trying to disconnect last node but it wasn't thrown");
+ } catch (final IllegalNodeDisconnectionException inde) {
+ // expected
+ }
+ }
+
+
@Test(timeout = 5000)
public void testUpdateNodeStatusOutOfOrder() throws InterruptedException {
// Add a connected node
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index abe5e27883..d6e9308040 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -18,6 +18,38 @@ package org.apache.nifi.controller;
import com.sun.jersey.api.client.ClientHandlerException;
import org.apache.commons.collections4.Predicate;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService;
@@ -206,37 +238,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider {
@@ -3333,51 +3334,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager.start();
stateManagerProvider.enableClusterProvider();
- // Start ZooKeeper State Server if necessary
- if (zooKeeperStateServer != null) {
- processScheduler.submitFrameworkTask(new Runnable() {
- @Override
- public void run() {
- try {
- zooKeeperStateServer.start();
- } catch (final Exception e) {
- LOG.error("NiFi was connected to the cluster but failed to start embedded ZooKeeper Server", e);
- final Bulletin bulletin = BulletinFactory.createBulletin("Embedded ZooKeeper Server", Severity.ERROR.name(),
- "Unable to started embedded ZooKeeper Server. See logs for more details. Will continue trying to start embedded server.");
- getBulletinRepository().addBulletin(bulletin);
-
- // We failed to start the server. Wait a bit and try again.
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(5));
- } catch (final InterruptedException ie) {
- // If we are interrupted, stop trying.
- Thread.currentThread().interrupt();
- return;
- }
-
- processScheduler.submitFrameworkTask(this);
- }
- }
- });
-
- // Give the server just a bit to start up, so that we don't get connection
- // failures on startup if we are using the embedded ZooKeeper server. We need to launch
- // the ZooKeeper Server in the background because ZooKeeper blocks indefinitely when we start
- // the server. Unfortunately, we have no way to know when it's up & ready. So we wait 1 second.
- // We could still get connection failures if we are on a slow machine but this at least makes it far
- // less likely. If we do get connection failures, we will still reconnect, but we will get bulletins
- // showing failures. This 1-second sleep is an attempt to at least make that occurrence rare.
- LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
- }
-
heartbeat();
} else {
leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);
-
- if (zooKeeperStateServer != null) {
- zooKeeperStateServer.shutdown();
- }
stateManagerProvider.disableClusterProvider();
setPrimary(false);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
index 0ce9a1361f..02403184ec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
@@ -24,7 +24,7 @@ import java.util.Properties;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryForever;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -52,7 +52,7 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) {
this.protocolSender = protocolSender;
- final RetryPolicy retryPolicy = new RetryForever(5000);
+ final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties);
curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
diff --git a/pom.xml b/pom.xml
index b852c8f379..24ef0a4fd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -772,12 +772,12 @@ language governing permissions and limitations under the License. -->
org.apache.curator
curator-framework
- 2.10.0
+ 2.11.0
org.apache.curator
curator-recipes
- 2.10.0
+ 2.11.0