NIFI-2360: Leave ZooKeeper running when a node is disconnected. Do not allow the last node in the cluster to be disconnected. Change ClusterProtocoLHeartbeater to use RetryNTime retry strategy instead of RetryForever because web requests could block on this

This closes 

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Mark Payne 2016-07-22 09:21:12 -04:00 committed by jpercivall
parent 393a3925dd
commit 6932a53ec9
6 changed files with 76 additions and 95 deletions
nifi-nar-bundles/nifi-framework-bundle/nifi-framework
nifi-framework-cluster/src
main/java/org/apache/nifi/cluster
test/java/org/apache/nifi/cluster/coordination/node
nifi-framework-core/src/main/java/org/apache/nifi/controller
pom.xml

View File

@ -37,7 +37,7 @@ import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@ -46,6 +46,7 @@ import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.ConnectionResponse;
@ -61,8 +62,8 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService; import org.apache.nifi.services.FlowService;
@ -106,7 +107,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.firewall = firewall; this.firewall = firewall;
this.revisionManager = revisionManager; this.revisionManager = revisionManager;
final RetryPolicy retryPolicy = new RetryForever(5000); final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties); final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
@ -278,6 +279,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override @Override
public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
final int numConnected = getNodeIdentifiers(NodeConnectionState.CONNECTED).size();
if (numConnected == 1) {
throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
}
logger.info("Requesting that {} disconnect due to {}", nodeId, explanation == null ? disconnectionCode : explanation); logger.info("Requesting that {} disconnect due to {}", nodeId, explanation == null ? disconnectionCode : explanation);
updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation)); updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation));
@ -548,15 +554,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override @Override
public boolean isActiveClusterCoordinator() { public boolean isActiveClusterCoordinator() {
final NodeIdentifier self = getLocalNodeIdentifier(); final NodeIdentifier self = getLocalNodeIdentifier();
if (self == null) { return self != null && self.equals(getElectedActiveCoordinatorNode());
return false;
}
final NodeConnectionStatus selfStatus = getConnectionStatus(self);
if (selfStatus == null) {
return false;
}
return selfStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR);
} }
@Override @Override

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception; package org.apache.nifi.cluster.manager.exception;
/** /**
* Represents the exceptional case when a disconnection request is issued to a node that cannot be disconnected (e.g., last node in cluster, node is primary node). * Represents the exceptional case when a disconnection request is issued to a node that cannot be disconnected (e.g., last node in cluster).
*
*/ */
public class IllegalNodeDisconnectionException extends IllegalClusterStateException { public class IllegalNodeDisconnectionException extends IllegalClusterStateException {

View File

@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -48,6 +49,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.services.FlowService; import org.apache.nifi.services.FlowService;
import org.apache.nifi.web.revision.RevisionManager; import org.apache.nifi.web.revision.RevisionManager;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -336,8 +338,9 @@ public class TestNodeClusterCoordinator {
@Test(timeout = 5000) @Test(timeout = 5000)
public void testRequestNodeDisconnect() throws InterruptedException { public void testRequestNodeDisconnect() throws InterruptedException {
// Add a connected node // Add a connected node
final NodeIdentifier nodeId = createNodeId(1); final NodeIdentifier nodeId1 = createNodeId(1);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet())); coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it // wait for the status change message and clear it
while (nodeStatusChangeMessages.isEmpty()) { while (nodeStatusChangeMessages.isEmpty()) {
@ -345,18 +348,39 @@ public class TestNodeClusterCoordinator {
} }
nodeStatusChangeMessages.clear(); nodeStatusChangeMessages.clear();
coordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "Unit Test"); coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId).getState()); assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId1).getState());
while (nodeStatusChangeMessages.isEmpty()) { while (nodeStatusChangeMessages.isEmpty()) {
Thread.sleep(10L); Thread.sleep(10L);
} }
final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0); final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0);
assertEquals(nodeId, msg.getNodeId()); assertEquals(nodeId1, msg.getNodeId());
assertEquals(NodeConnectionState.DISCONNECTED, msg.getNodeConnectionStatus().getState()); assertEquals(NodeConnectionState.DISCONNECTED, msg.getNodeConnectionStatus().getState());
} }
@Test(timeout = 5000)
public void testCannotDisconnectLastNode() throws InterruptedException {
// Add a connected node
final NodeIdentifier nodeId1 = createNodeId(1);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it
while (nodeStatusChangeMessages.isEmpty()) {
Thread.sleep(10L);
}
nodeStatusChangeMessages.clear();
try {
coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
Assert.fail("Expected an IllegalNodeDisconnectionException when trying to disconnect last node but it wasn't thrown");
} catch (final IllegalNodeDisconnectionException inde) {
// expected
}
}
@Test(timeout = 5000) @Test(timeout = 5000)
public void testUpdateNodeStatusOutOfOrder() throws InterruptedException { public void testUpdateNodeStatusOutOfOrder() throws InterruptedException {
// Add a connected node // Add a connected node

View File

@ -18,6 +18,38 @@ package org.apache.nifi.controller;
import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientHandlerException;
import org.apache.commons.collections4.Predicate; import org.apache.commons.collections4.Predicate;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action; import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.admin.service.AuditService;
@ -206,37 +238,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.Objects.requireNonNull;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider {
@ -3333,51 +3334,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager.start(); leaderElectionManager.start();
stateManagerProvider.enableClusterProvider(); stateManagerProvider.enableClusterProvider();
// Start ZooKeeper State Server if necessary
if (zooKeeperStateServer != null) {
processScheduler.submitFrameworkTask(new Runnable() {
@Override
public void run() {
try {
zooKeeperStateServer.start();
} catch (final Exception e) {
LOG.error("NiFi was connected to the cluster but failed to start embedded ZooKeeper Server", e);
final Bulletin bulletin = BulletinFactory.createBulletin("Embedded ZooKeeper Server", Severity.ERROR.name(),
"Unable to started embedded ZooKeeper Server. See logs for more details. Will continue trying to start embedded server.");
getBulletinRepository().addBulletin(bulletin);
// We failed to start the server. Wait a bit and try again.
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
} catch (final InterruptedException ie) {
// If we are interrupted, stop trying.
Thread.currentThread().interrupt();
return;
}
processScheduler.submitFrameworkTask(this);
}
}
});
// Give the server just a bit to start up, so that we don't get connection
// failures on startup if we are using the embedded ZooKeeper server. We need to launch
// the ZooKeeper Server in the background because ZooKeeper blocks indefinitely when we start
// the server. Unfortunately, we have no way to know when it's up & ready. So we wait 1 second.
// We could still get connection failures if we are on a slow machine but this at least makes it far
// less likely. If we do get connection failures, we will still reconnect, but we will get bulletins
// showing failures. This 1-second sleep is an attempt to at least make that occurrence rare.
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
}
heartbeat(); heartbeat();
} else { } else {
leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE); leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR); leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);
if (zooKeeperStateServer != null) {
zooKeeperStateServer.shutdown();
}
stateManagerProvider.disableClusterProvider(); stateManagerProvider.disableClusterProvider();
setPrimary(false); setPrimary(false);

View File

@ -24,7 +24,7 @@ import java.util.Properties;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@ -52,7 +52,7 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) { public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) {
this.protocolSender = protocolSender; this.protocolSender = protocolSender;
final RetryPolicy retryPolicy = new RetryForever(5000); final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties); final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties);
curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),

View File

@ -772,12 +772,12 @@ language governing permissions and limitations under the License. -->
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId> <artifactId>curator-framework</artifactId>
<version>2.10.0</version> <version>2.11.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId> <artifactId>curator-recipes</artifactId>
<version>2.10.0</version> <version>2.11.0</version>
</dependency> </dependency>