NIFI-2419: Ensure that if a node is disconnected that we unregister for 'cluster coordinator' and 'primary node' roles by updating FlowController to know that it is disconnected. Also removed dead code that was needed in the master-worker clustering paradigm but not for zero-master-clustering

Signed-off-by: Yolanda M. Davis <ymdavis@apache.org>

This closes #739
This commit is contained in:
Mark Payne 2016-07-28 11:26:00 -04:00 committed by Yolanda M. Davis
parent c26398eaba
commit f0401e4774
5 changed files with 8 additions and 86 deletions

View File

@ -17,6 +17,10 @@
package org.apache.nifi.cluster.protocol; package org.apache.nifi.cluster.protocol;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@ -24,12 +28,6 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.io.socket.SocketConfiguration; import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils; import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.security.util.CertificateUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.cert.CertificateException;
public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
private final SocketConfiguration socketConfiguration; private final SocketConfiguration socketConfiguration;
@ -46,8 +44,6 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
try { try {
socket = createSocket(); socket = createSocket();
String coordinatorDN = getCoordinatorDN(socket);
try { try {
// marshal message to output stream // marshal message to output stream
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
@ -68,7 +64,6 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
if (MessageType.CONNECTION_RESPONSE == response.getType()) { if (MessageType.CONNECTION_RESPONSE == response.getType()) {
final ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage) response; final ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage) response;
connectionResponse.setCoordinatorDN(coordinatorDN);
return connectionResponse; return connectionResponse;
} else { } else {
throw new ProtocolException("Expected message type '" + MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'"); throw new ProtocolException("Expected message type '" + MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'");
@ -93,13 +88,6 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
sendProtocolMessage(msg, hostname, port); sendProtocolMessage(msg, hostname, port);
} }
private String getCoordinatorDN(Socket socket) {
try {
return CertificateUtils.extractPeerDNFromSSLSocket(socket);
} catch (CertificateException e) {
throw new ProtocolException(e);
}
}
private Socket createSocket() { private Socket createSocket() {
InetSocketAddress socketAddress = null; InetSocketAddress socketAddress = null;

View File

@ -43,7 +43,6 @@ public class ConnectionResponse {
private final List<NodeConnectionStatus> nodeStatuses; private final List<NodeConnectionStatus> nodeStatuses;
private final List<ComponentRevision> componentRevisions; private final List<ComponentRevision> componentRevisions;
private volatile String coordinatorDN;
public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow, public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow,
final String instanceId, final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) { final String instanceId, final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) {
@ -121,9 +120,6 @@ public class ConnectionResponse {
return instanceId; return instanceId;
} }
public void setCoordinatorDN(final String dn) {
this.coordinatorDN = dn;
}
public List<NodeConnectionStatus> getNodeConnectionStatuses() { public List<NodeConnectionStatus> getNodeConnectionStatuses() {
return nodeStatuses; return nodeStatuses;
@ -132,12 +128,4 @@ public class ConnectionResponse {
public List<ComponentRevision> getComponentRevisions() { public List<ComponentRevision> getComponentRevisions() {
return componentRevisions; return componentRevisions;
} }
/**
* @return the DN of the Coordinator, if it is available or <code>null</code>
* otherwise
*/
public String getCoordinatorDN() {
return coordinatorDN;
}
} }

View File

@ -24,7 +24,6 @@ import javax.xml.bind.annotation.XmlRootElement;
public class ConnectionResponseMessage extends ProtocolMessage { public class ConnectionResponseMessage extends ProtocolMessage {
private ConnectionResponse connectionResponse; private ConnectionResponse connectionResponse;
private String coordinatorDN;
public ConnectionResponseMessage() { public ConnectionResponseMessage() {
} }
@ -35,25 +34,6 @@ public class ConnectionResponseMessage extends ProtocolMessage {
public void setConnectionResponse(final ConnectionResponse connectionResponse) { public void setConnectionResponse(final ConnectionResponse connectionResponse) {
this.connectionResponse = connectionResponse; this.connectionResponse = connectionResponse;
if (coordinatorDN != null) {
this.connectionResponse.setCoordinatorDN(coordinatorDN);
}
}
public void setCoordinatorDN(final String dn) {
if (connectionResponse != null) {
connectionResponse.setCoordinatorDN(dn);
}
this.coordinatorDN = dn;
}
/**
* @return the DN of the Coordinator, if it is available or <code>null</code>
* otherwise
*/
public String getCoordinatorDN() {
return coordinatorDN;
} }
@Override @Override

View File

@ -353,7 +353,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* true if controller is connected or trying to connect to the cluster * true if controller is connected or trying to connect to the cluster
*/ */
private boolean clustered; private boolean clustered;
private String clusterManagerDN;
// guarded by rwLock // guarded by rwLock
private NodeConnectionStatus connectionStatus; private NodeConnectionStatus connectionStatus;
@ -3307,32 +3306,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return configuredForClustering; return configuredForClustering;
} }
/**
* @return the DN of the Cluster Manager that we are currently connected to,
* if available. This will return null if the instance is not clustered or
* if the instance is clustered but the NCM's DN is not available - for
* instance, if cluster communications are not secure
*/
public String getClusterManagerDN() {
readLock.lock();
try {
return clusterManagerDN;
} finally {
readLock.unlock();
}
}
/**
* Sets whether this instance is clustered. Clustered means that a node is
* either connected or trying to connect to the cluster.
*
* @param clustered true if clustered
* @param clusterInstanceId if clustered is true, indicates the InstanceID
* of the Cluster Manager
*/
public void setClustered(final boolean clustered, final String clusterInstanceId) {
setClustered(clustered, clusterInstanceId, null);
}
private void registerForClusterCoordinator() { private void registerForClusterCoordinator() {
leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() {
@ -3375,11 +3348,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* either connected or trying to connect to the cluster. * either connected or trying to connect to the cluster.
* *
* @param clustered true if clustered * @param clustered true if clustered
* @param clusterInstanceId if clustered is true, indicates the InstanceID * @param clusterInstanceId if clustered is true, indicates the InstanceID of the Cluster Manager
* of the Cluster Manager
* @param clusterManagerDn the DN of the NCM
*/ */
public void setClustered(final boolean clustered, final String clusterInstanceId, final String clusterManagerDn) { public void setClustered(final boolean clustered, final String clusterInstanceId) {
writeLock.lock(); writeLock.lock();
try { try {
// verify whether the this node's clustered status is changing // verify whether the this node's clustered status is changing
@ -3396,9 +3367,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// mark the new cluster status // mark the new cluster status
this.clustered = clustered; this.clustered = clustered;
if (clusterManagerDn != null) {
this.clusterManagerDN = clusterManagerDn;
}
eventDrivenWorkerQueue.setClustered(clustered); eventDrivenWorkerQueue.setClustered(clustered);
if (clusterInstanceId != null) { if (clusterInstanceId != null) {

View File

@ -107,7 +107,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private final FlowConfigurationDAO dao; private final FlowConfigurationDAO dao;
private final int gracefulShutdownSeconds; private final int gracefulShutdownSeconds;
private final boolean autoResumeState; private final boolean autoResumeState;
private final StringEncryptor encryptor;
private final Authorizer authorizer; private final Authorizer authorizer;
// Lock is used to protect the flow.xml file. // Lock is used to protect the flow.xml file.
@ -175,7 +174,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final Authorizer authorizer) throws IOException { final Authorizer authorizer) throws IOException {
this.controller = controller; this.controller = controller;
this.encryptor = encryptor;
flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE));
gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS); gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
@ -520,6 +518,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
disconnectionCode = DisconnectionCode.STARTUP_FAILURE; disconnectionCode = DisconnectionCode.STARTUP_FAILURE;
} }
clusterCoordinator.disconnectionRequestedByNode(getNodeId(), disconnectionCode, ex.toString()); clusterCoordinator.disconnectionRequestedByNode(getNodeId(), disconnectionCode, ex.toString());
controller.setClustered(false, null);
} }
private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException { private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException {
@ -587,7 +586,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(),
request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions()); request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions());
connectionResponse.setCoordinatorDN(request.getRequestorDN());
loadFromConnectionResponse(connectionResponse); loadFromConnectionResponse(connectionResponse);
clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream() clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream()
@ -849,7 +847,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
revisionManager.reset(response.getComponentRevisions().stream().map(rev -> rev.toRevision()).collect(Collectors.toList())); revisionManager.reset(response.getComponentRevisions().stream().map(rev -> rev.toRevision()).collect(Collectors.toList()));
// mark the node as clustered // mark the node as clustered
controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN()); controller.setClustered(true, response.getInstanceId());
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles(); final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles();