diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index d336558c8a..5a74301669 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -18,12 +18,14 @@ package org.apache.nifi.cluster.coordination.node; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -32,6 +34,10 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; @@ -55,11 +61,14 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; import org.apache.nifi.services.FlowService; import org.apache.nifi.web.revision.RevisionManager; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,19 +85,37 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private final EventReporter eventReporter; private final ClusterNodeFirewall firewall; private final RevisionManager revisionManager; + + // Curator used to determine which node is coordinator + private final CuratorFramework curatorClient; + private final String nodesPathPrefix; + private final String coordinatorPath; + private volatile FlowService flowService; private volatile boolean connected; + private volatile String coordinatorAddress; private final ConcurrentMap nodeStatuses = new ConcurrentHashMap<>(); private final ConcurrentMap> nodeEvents = new ConcurrentHashMap<>(); - public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, - final EventReporter eventReporter, final ClusterNodeFirewall firewall, final RevisionManager revisionManager) { + public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, + final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final Properties nifiProperties) { this.senderListener = senderListener; this.flowService = null; this.eventReporter = eventReporter; this.firewall = firewall; this.revisionManager = revisionManager; + + final RetryPolicy retryPolicy = new RetryForever(5000); + final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties); + + curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), + zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); + + curatorClient.start(); + nodesPathPrefix = zkConfig.resolvePath("cluster/nodes"); + coordinatorPath = nodesPathPrefix + "/coordinator"; + senderListener.addHandler(this); } @@ -101,6 +128,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN); updateNodeStatus(shutdownStatus); logger.info("Successfully notified other nodes that I am shutting down"); + + curatorClient.close(); } @Override @@ -128,9 +157,33 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return localNodeId; } + private String getElectedActiveCoordinatorAddress() throws IOException { + final String curAddress = coordinatorAddress; + if (curAddress != null) { + return curAddress; + } + + try { + // Get coordinator address and add watcher to change who we are heartbeating to if the value changes. + final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() { + @Override + public void process(final WatchedEvent event) { + coordinatorAddress = null; + } + }).forPath(coordinatorPath); + final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8); + + logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address); + return address; + } catch (Exception e) { + throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e); + } + } + @Override public void resetNodeStatuses(final Map statusMap) { logger.info("Resetting cluster node statuses from {} to {}", nodeStatuses, statusMap); + coordinatorAddress = null; // For each proposed replacement, update the nodeStatuses map if and only if the replacement // has a larger update id than the current value. @@ -455,13 +508,41 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public NodeIdentifier getElectedActiveCoordinatorNode() { + final String electedNodeAddress; + try { + electedNodeAddress = getElectedActiveCoordinatorAddress(); + } catch (final IOException ioe) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe); + return null; + } + + final int colonLoc = electedNodeAddress.indexOf(':'); + if (colonLoc < 1) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); + return null; + } + + final String electedNodeHostname = electedNodeAddress.substring(0, colonLoc); + final String portString = electedNodeAddress.substring(colonLoc + 1); + final int electedNodePort; + try { + electedNodePort = Integer.parseInt(portString); + } catch (final NumberFormatException nfe) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); + return null; + } + final Set connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED); - return connectedNodeIds.stream() - .map(nodeId -> getConnectionStatus(nodeId)) - .filter(status -> status.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)) + final NodeIdentifier electedNodeId = connectedNodeIds.stream() + .filter(nodeId -> nodeId.getSocketAddress().equals(electedNodeHostname) && nodeId.getSocketPort() == electedNodePort) .findFirst() - .map(status -> status.getNodeIdentifier()) .orElse(null); + + if (electedNodeId == null) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress); + } + + return electedNodeId; } @Override @@ -858,6 +939,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public void setConnected(final boolean connected) { this.connected = connected; + this.coordinatorAddress = null; // if connection state changed, we are not sure about the coordinator. Check for address again. } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java index 6229ba195a..b414e0d58a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java @@ -43,7 +43,7 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean nodeStatusChangeMessages; + private Properties createProperties() { + final Properties props = new Properties(); + props.put("nifi.zookeeper.connect.string", "localhost:2181"); + return props; + } + @Before @SuppressWarnings("unchecked") public void setup() throws IOException { @@ -77,7 +84,7 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager); + coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()); final FlowService flowService = Mockito.mock(FlowService.class); final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]); @@ -127,7 +134,7 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager); + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()); final NodeIdentifier requestedNodeId = createNodeId(6); final ConnectionRequest request = new ConnectionRequest(requestedNodeId); @@ -161,7 +168,7 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager); + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()); final FlowService flowService = Mockito.mock(FlowService.class); final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]); @@ -225,7 +232,7 @@ public class TestNodeClusterCoordinator { final EventReporter eventReporter = Mockito.mock(EventReporter.class); final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager); + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()); final FlowService flowService = Mockito.mock(FlowService.class); final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);