NIFI-2289: Directly ask ZooKeeper which node is cluster coordinator and add watches on the ZNode rather than relying on Node Status Updates over the cluster protocol because cluster protocol may get the events out-of-order

This closes #665.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-07-16 21:19:44 -04:00 committed by Bryan Bende
parent 01cae23745
commit 5c8636edf4
3 changed files with 100 additions and 11 deletions

View File

@ -18,12 +18,14 @@
package org.apache.nifi.cluster.coordination.node;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -32,6 +34,10 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@ -55,11 +61,14 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -76,19 +85,37 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final EventReporter eventReporter;
private final ClusterNodeFirewall firewall;
private final RevisionManager revisionManager;
// Curator used to determine which node is coordinator
private final CuratorFramework curatorClient;
private final String nodesPathPrefix;
private final String coordinatorPath;
private volatile FlowService flowService;
private volatile boolean connected;
private volatile String coordinatorAddress;
private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener,
final EventReporter eventReporter, final ClusterNodeFirewall firewall, final RevisionManager revisionManager) {
public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter,
final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final Properties nifiProperties) {
this.senderListener = senderListener;
this.flowService = null;
this.eventReporter = eventReporter;
this.firewall = firewall;
this.revisionManager = revisionManager;
final RetryPolicy retryPolicy = new RetryForever(5000);
final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
curatorClient.start();
nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
coordinatorPath = nodesPathPrefix + "/coordinator";
senderListener.addHandler(this);
}
@ -101,6 +128,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
updateNodeStatus(shutdownStatus);
logger.info("Successfully notified other nodes that I am shutting down");
curatorClient.close();
}
@Override
@ -128,9 +157,33 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return localNodeId;
}
private String getElectedActiveCoordinatorAddress() throws IOException {
final String curAddress = coordinatorAddress;
if (curAddress != null) {
return curAddress;
}
try {
// Get coordinator address and add watcher to change who we are heartbeating to if the value changes.
final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() {
@Override
public void process(final WatchedEvent event) {
coordinatorAddress = null;
}
}).forPath(coordinatorPath);
final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
return address;
} catch (Exception e) {
throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
}
}
@Override
public void resetNodeStatuses(final Map<NodeIdentifier, NodeConnectionStatus> statusMap) {
logger.info("Resetting cluster node statuses from {} to {}", nodeStatuses, statusMap);
coordinatorAddress = null;
// For each proposed replacement, update the nodeStatuses map if and only if the replacement
// has a larger update id than the current value.
@ -455,13 +508,41 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public NodeIdentifier getElectedActiveCoordinatorNode() {
final String electedNodeAddress;
try {
electedNodeAddress = getElectedActiveCoordinatorAddress();
} catch (final IOException ioe) {
logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe);
return null;
}
final int colonLoc = electedNodeAddress.indexOf(':');
if (colonLoc < 1) {
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress);
return null;
}
final String electedNodeHostname = electedNodeAddress.substring(0, colonLoc);
final String portString = electedNodeAddress.substring(colonLoc + 1);
final int electedNodePort;
try {
electedNodePort = Integer.parseInt(portString);
} catch (final NumberFormatException nfe) {
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress);
return null;
}
final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED);
return connectedNodeIds.stream()
.map(nodeId -> getConnectionStatus(nodeId))
.filter(status -> status.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR))
final NodeIdentifier electedNodeId = connectedNodeIds.stream()
.filter(nodeId -> nodeId.getSocketAddress().equals(electedNodeHostname) && nodeId.getSocketPort() == electedNodePort)
.findFirst()
.map(status -> status.getNodeIdentifier())
.orElse(null);
if (electedNodeId == null) {
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress);
}
return electedNodeId;
}
@Override
@ -858,6 +939,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public void setConnected(final boolean connected) {
this.connected = connected;
this.coordinatorAddress = null; // if connection state changed, we are not sure about the coordinator. Check for address again.
}
@Override

View File

@ -43,7 +43,7 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste
final ClusterNodeFirewall clusterFirewall = applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);
nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, revisionManager);
nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, revisionManager, properties);
}
return nodeClusterCoordinator;

View File

@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -58,6 +59,12 @@ public class TestNodeClusterCoordinator {
private ClusterCoordinationProtocolSenderListener senderListener;
private List<NodeStatusChangeMessage> nodeStatusChangeMessages;
private Properties createProperties() {
final Properties props = new Properties();
props.put("nifi.zookeeper.connect.string", "localhost:2181");
return props;
}
@Before
@SuppressWarnings("unchecked")
public void setup() throws IOException {
@ -77,7 +84,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
@ -127,7 +134,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
final NodeIdentifier requestedNodeId = createNodeId(6);
final ConnectionRequest request = new ConnectionRequest(requestedNodeId);
@ -161,7 +168,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
@ -225,7 +232,7 @@ public class TestNodeClusterCoordinator {
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager);
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);