From f0401e47747e8bfc0b96ff824f23f6f0f91017f1 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 28 Jul 2016 11:26:00 -0400 Subject: [PATCH] NIFI-2419: Ensure that if a node is disconnected that we unregister for 'cluster coordinator' and 'primary node' roles by updating FlowController to know that it is disconnected. Also removed dead code that was needed in the master-worker clustering paradigm but not for zero-master-clustering Signed-off-by: Yolanda M. Davis This closes #739 --- .../protocol/AbstractNodeProtocolSender.java | 20 +++-------- .../cluster/protocol/ConnectionResponse.java | 12 ------- .../message/ConnectionResponseMessage.java | 20 ----------- .../nifi/controller/FlowController.java | 36 ++----------------- .../nifi/controller/StandardFlowService.java | 6 ++-- 5 files changed, 8 insertions(+), 86 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java index f2c3548373..4131dc5f1e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java @@ -17,6 +17,10 @@ package org.apache.nifi.cluster.protocol; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; @@ -24,12 +28,6 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.io.socket.SocketConfiguration; import org.apache.nifi.io.socket.SocketUtils; -import org.apache.nifi.security.util.CertificateUtils; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.security.cert.CertificateException; public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { private final SocketConfiguration socketConfiguration; @@ -46,8 +44,6 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { try { socket = createSocket(); - String coordinatorDN = getCoordinatorDN(socket); - try { // marshal message to output stream final ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); @@ -68,7 +64,6 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { if (MessageType.CONNECTION_RESPONSE == response.getType()) { final ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage) response; - connectionResponse.setCoordinatorDN(coordinatorDN); return connectionResponse; } else { throw new ProtocolException("Expected message type '" + MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'"); @@ -93,13 +88,6 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { sendProtocolMessage(msg, hostname, port); } - private String getCoordinatorDN(Socket socket) { - try { - return CertificateUtils.extractPeerDNFromSSLSocket(socket); - } catch (CertificateException e) { - throw new ProtocolException(e); - } - } private Socket createSocket() { InetSocketAddress socketAddress = null; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java index 2a9b87c445..4e572af7d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java @@ -43,7 +43,6 @@ public class ConnectionResponse { private final List nodeStatuses; private final List componentRevisions; - private volatile String coordinatorDN; public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow, final String instanceId, final List nodeStatuses, final List componentRevisions) { @@ -121,9 +120,6 @@ public class ConnectionResponse { return instanceId; } - public void setCoordinatorDN(final String dn) { - this.coordinatorDN = dn; - } public List getNodeConnectionStatuses() { return nodeStatuses; @@ -132,12 +128,4 @@ public class ConnectionResponse { public List getComponentRevisions() { return componentRevisions; } - - /** - * @return the DN of the Coordinator, if it is available or null - * otherwise - */ - public String getCoordinatorDN() { - return coordinatorDN; - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java index b49bef4771..815427c0ef 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java @@ -24,7 +24,6 @@ import javax.xml.bind.annotation.XmlRootElement; public class ConnectionResponseMessage extends ProtocolMessage { private ConnectionResponse connectionResponse; - private String coordinatorDN; public ConnectionResponseMessage() { } @@ -35,25 +34,6 @@ public class ConnectionResponseMessage extends ProtocolMessage { public void setConnectionResponse(final ConnectionResponse connectionResponse) { this.connectionResponse = connectionResponse; - - if (coordinatorDN != null) { - this.connectionResponse.setCoordinatorDN(coordinatorDN); - } - } - - public void setCoordinatorDN(final String dn) { - if (connectionResponse != null) { - connectionResponse.setCoordinatorDN(dn); - } - this.coordinatorDN = dn; - } - - /** - * @return the DN of the Coordinator, if it is available or null - * otherwise - */ - public String getCoordinatorDN() { - return coordinatorDN; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index b7b32ad76e..6b952f369e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -353,7 +353,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * true if controller is connected or trying to connect to the cluster */ private boolean clustered; - private String clusterManagerDN; // guarded by rwLock private NodeConnectionStatus connectionStatus; @@ -3307,32 +3306,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return configuredForClustering; } - /** - * @return the DN of the Cluster Manager that we are currently connected to, - * if available. This will return null if the instance is not clustered or - * if the instance is clustered but the NCM's DN is not available - for - * instance, if cluster communications are not secure - */ - public String getClusterManagerDN() { - readLock.lock(); - try { - return clusterManagerDN; - } finally { - readLock.unlock(); - } - } - - /** - * Sets whether this instance is clustered. Clustered means that a node is - * either connected or trying to connect to the cluster. - * - * @param clustered true if clustered - * @param clusterInstanceId if clustered is true, indicates the InstanceID - * of the Cluster Manager - */ - public void setClustered(final boolean clustered, final String clusterInstanceId) { - setClustered(clustered, clusterInstanceId, null); - } private void registerForClusterCoordinator() { leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @@ -3375,11 +3348,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * either connected or trying to connect to the cluster. * * @param clustered true if clustered - * @param clusterInstanceId if clustered is true, indicates the InstanceID - * of the Cluster Manager - * @param clusterManagerDn the DN of the NCM + * @param clusterInstanceId if clustered is true, indicates the InstanceID of the Cluster Manager */ - public void setClustered(final boolean clustered, final String clusterInstanceId, final String clusterManagerDn) { + public void setClustered(final boolean clustered, final String clusterInstanceId) { writeLock.lock(); try { // verify whether the this node's clustered status is changing @@ -3396,9 +3367,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // mark the new cluster status this.clustered = clustered; - if (clusterManagerDn != null) { - this.clusterManagerDN = clusterManagerDn; - } eventDrivenWorkerQueue.setClustered(clustered); if (clusterInstanceId != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 1b26e402af..b634e74298 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -107,7 +107,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private final FlowConfigurationDAO dao; private final int gracefulShutdownSeconds; private final boolean autoResumeState; - private final StringEncryptor encryptor; private final Authorizer authorizer; // Lock is used to protect the flow.xml file. @@ -175,7 +174,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { final Authorizer authorizer) throws IOException { this.controller = controller; - this.encryptor = encryptor; flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS); @@ -520,6 +518,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { disconnectionCode = DisconnectionCode.STARTUP_FAILURE; } clusterCoordinator.disconnectionRequestedByNode(getNodeId(), disconnectionCode, ex.toString()); + controller.setClustered(false, null); } private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException { @@ -587,7 +586,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions()); - connectionResponse.setCoordinatorDN(request.getRequestorDN()); loadFromConnectionResponse(connectionResponse); clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream() @@ -849,7 +847,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { revisionManager.reset(response.getComponentRevisions().stream().map(rev -> rev.toRevision()).collect(Collectors.toList())); // mark the node as clustered - controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN()); + controller.setClustered(true, response.getInstanceId()); final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); final Set roles = status == null ? Collections.emptySet() : status.getRoles();