NIFI-8433 Added ability to decommission a node in a cluster

This closes #5004

Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
Mark Payne 2021-04-14 16:25:59 -04:00 committed by Joey Frazee
parent 1090a9748a
commit 935566ba23
16 changed files with 511 additions and 98 deletions

View File

@ -111,8 +111,10 @@ public class RunNiFi {
public static final String PID_KEY = "pid";
public static final int STARTUP_WAIT_SECONDS = 60;
public static final long GRACEFUL_SHUTDOWN_RETRY_MILLIS = 2000L;
public static final String SHUTDOWN_CMD = "SHUTDOWN";
public static final String DECOMMISSION_CMD = "DECOMMISSION";
public static final String PING_CMD = "PING";
public static final String DUMP_CMD = "DUMP";
public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
@ -169,6 +171,7 @@ public class RunNiFi {
System.out.println("Start : Start a new instance of Apache NiFi");
System.out.println("Stop : Stop a running instance of Apache NiFi");
System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance");
System.out.println("Decommission : Disconnects Apache NiFi from its cluster, offloads its data to other nodes in the cluster, removes itself from the cluster, and shuts down the instance");
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
System.out.println("Diagnostics : Write diagnostic information to the file specified by [options], or to the log if no file is given. The --verbose flag may be provided as an option before " +
@ -219,6 +222,7 @@ public class RunNiFi {
case "start":
case "run":
case "stop":
case "decommission":
case "status":
case "is_loaded":
case "dump":
@ -245,6 +249,9 @@ public class RunNiFi {
case "stop":
runNiFi.stop();
break;
case "decommission":
exitStatus = runNiFi.decommission();
break;
case "status":
exitStatus = runNiFi.status();
break;
@ -810,6 +817,63 @@ public class RunNiFi {
"Hello,\n\nApache NiFi has been told to initiate a shutdown on host " + hostname + " at " + now + " by user " + user);
}
public Integer decommission() throws IOException {
final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
if (port == null) {
logger.info("Apache NiFi is not currently running");
return 15;
}
// indicate that a stop command is in progress
final File lockFile = getLockFile(logger);
if (!lockFile.exists()) {
lockFile.createNewFile();
}
final Properties nifiProps = loadProperties(logger);
final String secretKey = nifiProps.getProperty("secret.key");
final String pid = nifiProps.getProperty(PID_KEY);
final File statusFile = getStatusFile(logger);
final File pidFile = getPidFile(logger);
try (final Socket socket = new Socket()) {
logger.debug("Connecting to NiFi instance");
socket.setSoTimeout(10000);
socket.connect(new InetSocketAddress("localhost", port));
logger.debug("Established connection to NiFi instance.");
// We don't know how long it will take for the offloading to complete. It could be a while. So don't timeout.
// User can press Ctrl+C to terminate if they don't want to wait
socket.setSoTimeout(0);
logger.debug("Sending DECOMMISSION Command to port {}", port);
final OutputStream out = socket.getOutputStream();
out.write((DECOMMISSION_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
socket.shutdownOutput();
final String response = readResponse(socket.getInputStream());
if (DECOMMISSION_CMD.equals(response)) {
logger.debug("Received response to DECOMMISSION command: {}", response);
if (pid != null) {
waitForShutdown(pid, logger, statusFile, pidFile);
}
return null;
} else {
logger.error("When sending DECOMMISSION command to NiFi, got unexpected response {}", response);
return 18;
}
} finally {
if (lockFile.exists() && !lockFile.delete()) {
logger.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile);
}
}
}
public void stop() throws IOException {
final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
@ -843,20 +907,48 @@ public class RunNiFi {
out.flush();
socket.shutdownOutput();
final InputStream in = socket.getInputStream();
int lastChar;
final StringBuilder sb = new StringBuilder();
while ((lastChar = in.read()) > -1) {
sb.append((char) lastChar);
}
final String response = sb.toString().trim();
final String response = readResponse(socket.getInputStream());
logger.debug("Received response to SHUTDOWN command: {}", response);
if (SHUTDOWN_CMD.equals(response)) {
logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now");
if (pid != null) {
waitForShutdown(pid, logger, statusFile, pidFile);
}
} else {
logger.error("When sending SHUTDOWN command to NiFi, got unexpected response: {}", response);
}
} catch (final IOException ioe) {
if (pid == null) {
logger.error("Failed to send shutdown command to port {} due to {}. No PID found for the NiFi process, so unable to kill process; "
+ "the process should be killed manually.", new Object[]{port, ioe.toString()});
} else {
logger.error("Failed to send shutdown command to port {} due to {}. Will kill the NiFi Process with PID {}.", port, ioe.toString(), pid);
notifyStop();
killProcessTree(pid, logger);
if (statusFile.exists() && !statusFile.delete()) {
logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
}
}
} finally {
if (lockFile.exists() && !lockFile.delete()) {
logger.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile);
}
}
}
private String readResponse(final InputStream in) throws IOException {
int lastChar;
final StringBuilder sb = new StringBuilder();
while ((lastChar = in.read()) > -1) {
sb.append((char) lastChar);
}
return sb.toString().trim();
}
private void waitForShutdown(final String pid, final Logger logger, final File statusFile, final File pidFile) throws IOException {
final Properties bootstrapProperties = new Properties();
try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
bootstrapProperties.load(fis);
@ -888,7 +980,7 @@ public class RunNiFi {
break;
} else {
try {
Thread.sleep(2000L);
Thread.sleep(GRACEFUL_SHUTDOWN_RETRY_MILLIS);
} catch (final InterruptedException ie) {
}
}
@ -904,27 +996,6 @@ public class RunNiFi {
logger.info("NiFi has finished shutting down.");
}
} else {
logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response);
}
} catch (final IOException ioe) {
if (pid == null) {
logger.error("Failed to send shutdown command to port {} due to {}. No PID found for the NiFi process, so unable to kill process; "
+ "the process should be killed manually.", new Object[]{port, ioe.toString()});
} else {
logger.error("Failed to send shutdown command to port {} due to {}. Will kill the NiFi Process with PID {}.", port, ioe.toString(), pid);
notifyStop();
killProcessTree(pid, logger);
if (statusFile.exists() && !statusFile.delete()) {
logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
}
}
} finally {
if (lockFile.exists() && !lockFile.delete()) {
logger.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile);
}
}
}
private static List<String> getChildProcesses(final String ppid) throws IOException {
final Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", ppid});

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
public interface DecommissionTask {
void decommission() throws InterruptedException;
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.documentation.example;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
@ -54,4 +55,9 @@ public class NiFiServerStub implements NiFiServer {
public DiagnosticsFactory getThreadDumpFactory() {
return null;
}
@Override
public DecommissionTask getDecommissionTask() {
return null;
}
}

View File

@ -18,11 +18,11 @@
package org.apache.nifi.cluster.coordination;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
@ -32,6 +32,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
/**
* <p>
@ -85,7 +86,7 @@ public interface ClusterCoordinator {
* @param offloadCode the code that represents why this node is being asked to be offloaded
* @param explanation an explanation as to why the node is being asked to be offloaded
*/
void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation);
Future<Void> requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation);
/**
* Sends a request to the node to disconnect from the cluster.
@ -96,7 +97,7 @@ public interface ClusterCoordinator {
* @param explanation an explanation as to why the node is being asked to disconnect
* from the cluster
*/
void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
Future<Void> requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
/**
* Notifies the Cluster Coordinator that the node with the given ID has requested to disconnect

View File

@ -16,19 +16,6 @@
*/
package org.apache.nifi.cluster.protocol.impl;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -36,11 +23,11 @@ import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@ -52,6 +39,20 @@ import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A protocol sender for sending protocol messages from the cluster manager to
* nodes.
@ -286,14 +287,35 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin
executor.submit(new Runnable() {
@Override
public void run() {
final int attempts = 10;
final int retrySeconds = 6;
Exception lastException = null;
for (int i = 0; i < attempts; i++) {
try (final Socket socket = createSocket(nodeId, true)) {
// marshal message to output stream
socket.getOutputStream().write(msgBytes);
} catch (final IOException ioe) {
throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe);
final OutputStream out = socket.getOutputStream();
out.write(msgBytes);
} catch (final Exception e) {
logger.warn("Failed to send Node Status Change message to {}", nodeId, e);
lastException = e;
try {
Thread.sleep(retrySeconds * 1000L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
continue;
}
logger.debug("Notified {} of status change {}", nodeId, msg);
return;
}
throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, lastException);
}
});
}

View File

@ -343,4 +343,5 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
public void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState newState) {
}
}
}

View File

@ -86,9 +86,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@ -270,7 +272,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return nodeId;
}
private NodeIdentifier waitForElectedClusterCoordinator() {
public NodeIdentifier waitForElectedClusterCoordinator() {
return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false));
}
@ -312,7 +314,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
removeNode(nodeId);
} else {
updateNodeStatus(nodeId, proposedStatus, false);
forcefullyUpdateNodeStatus(nodeId, proposedStatus, false);
}
}
@ -339,11 +341,45 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return removed;
}
/**
* Updates the status of the node with the given ID to the given status if and only if the updated status's Update ID is >= the Update ID of the current
* Node status or if there is currently no node with the given identifier
* @param nodeId the NodeIdentifier for the node whose ID is to be updated
* @param updatedStatus the new status for the node
* @return the previous status for the node
*/
private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus) {
return updateNodeStatus(nodeId, updatedStatus, true);
}
private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) {
final String nodeUuid = nodeId.getId();
while (true) {
final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeUuid);
if (currentStatus == null) {
onNodeAdded(nodeId, storeState);
// Return null because that was the previous state
return null;
}
if (currentStatus.getUpdateIdentifier() > updatedStatus.getUpdateIdentifier()) {
logger.debug("Received status update {} but ignoring it because it has an Update ID of {} and the current status has an Update ID of {}",
updatedStatus, updatedStatus.getUpdateIdentifier(), currentStatus.getUpdateIdentifier());
return currentStatus;
}
final boolean updated = nodeStatuses.replace(nodeUuid, currentStatus, updatedStatus);
if (updated) {
onNodeStateChange(nodeId, updatedStatus.getState());
logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, updatedStatus);
return currentStatus;
}
}
}
private NodeConnectionStatus forcefullyUpdateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) {
final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId.getId(), updatedStatus);
if (evictedStatus == null) {
onNodeAdded(nodeId, storeState);
@ -502,12 +538,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
@Override
public void requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) {
public Future<Void> requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) {
final Set<NodeIdentifier> offloadNodeIds = getNodeIdentifiers(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED);
if (offloadNodeIds.contains(nodeId)) {
logger.debug("Attempted to offload node but the node is already offloading or offloaded");
// no need to do anything here, the node is currently offloading or already offloaded
return;
return CompletableFuture.completedFuture(null);
}
final Set<NodeIdentifier> disconnectedNodeIds = getNodeIdentifiers(NodeConnectionState.DISCONNECTED);
@ -524,11 +560,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
request.setExplanation(explanation);
addNodeEvent(nodeId, "Offload requested due to " + explanation);
offloadAsynchronously(request, 10, 5);
return offloadAsynchronously(request, 10, 5);
}
@Override
public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
public Future<Void> requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED);
if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) {
throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
@ -541,7 +577,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// There is no need to tell the node that it's disconnected if it is due to being
// shutdown, as we will not be able to connect to the node anyway.
if (disconnectionCode == DisconnectionCode.NODE_SHUTDOWN) {
return;
return CompletableFuture.completedFuture(null);
}
final DisconnectMessage request = new DisconnectMessage();
@ -549,7 +585,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
request.setExplanation(explanation);
addNodeEvent(nodeId, "Disconnection requested due to " + explanation);
disconnectAsynchronously(request, 10, 5);
return disconnectAsynchronously(request, 10, 5);
}
@Override
@ -834,12 +870,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
void updateNodeStatus(final NodeConnectionStatus status, final boolean waitForCoordinator) {
final NodeIdentifier nodeId = status.getNodeIdentifier();
// In this case, we are using nodeStatuses.put() instead of getting the current value and
// In this case, we are using nodeStatuses.put() (i.e., forcefully updating node status) instead of getting the current value and
// comparing that to the new value and using the one with the largest update id. This is because
// this method is called when something occurs that causes this node to change the status of the
// node in question. We only use comparisons against the current value when we receive an update
// about a node status from a different node, since those may be received out-of-order.
final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status);
final NodeConnectionStatus currentStatus = forcefullyUpdateNodeStatus(nodeId, status, true);
final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState();
if (Objects.equals(status, currentStatus)) {
logger.debug("Received notification of Node Status Change for {} but the status remained the same: {}", nodeId, status);
@ -901,19 +937,24 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
senderListener.notifyNodeStatusChange(nodesToNotify, message);
}
private void offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) {
private Future<Void> offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final Thread offloadThread = new Thread(new Runnable() {
@Override
public void run() {
final NodeIdentifier nodeId = request.getNodeId();
Exception lastException = null;
for (int i = 0; i < attempts; i++) {
try {
senderListener.offload(request);
reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation());
future.complete(null);
return;
} catch (final Exception e) {
logger.error("Failed to notify {} that it has been offloaded due to {}", request.getNodeId(), request.getExplanation(), e);
lastException = e;
try {
Thread.sleep(retrySeconds * 1000L);
@ -923,38 +964,50 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
}
}
future.completeExceptionally(lastException);
}
}, "Offload " + request.getNodeId());
offloadThread.start();
return future;
}
private void disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) {
private Future<Void> disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final Thread disconnectThread = new Thread(new Runnable() {
@Override
public void run() {
final NodeIdentifier nodeId = request.getNodeId();
Exception lastException = null;
for (int i = 0; i < attempts; i++) {
try {
senderListener.disconnect(request);
reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation());
future.complete(null);
return;
} catch (final Exception e) {
logger.error("Failed to notify {} that it has been disconnected from the cluster due to {}", request.getNodeId(), request.getExplanation());
lastException = e;
try {
Thread.sleep(retrySeconds * 1000L);
} catch (final InterruptedException ie) {
future.completeExceptionally(ie);
Thread.currentThread().interrupt();
return;
}
}
}
future.completeExceptionally(lastException);
}
}, "Disconnect " + request.getNodeId());
disconnectThread.start();
return future;
}
public void validateHeartbeat(final NodeHeartbeat heartbeat) {
@ -1092,12 +1145,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
if (removeNodeConditionally(nodeId, oldStatus)) {
storeState();
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
}
} else {
updateNodeStatus(nodeId, updatedStatus);
}
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
logger.debug("State of cluster nodes is now {}", nodeStatuses);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.lifecycle;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ClusterDecommissionTask implements DecommissionTask {
private static final Logger logger = LoggerFactory.getLogger(ClusterDecommissionTask.class);
private static final int delaySeconds = 3;
private final ClusterCoordinator clusterCoordinator;
private final FlowController flowController;
private NodeIdentifier localNodeIdentifier;
public ClusterDecommissionTask(final ClusterCoordinator clusterCoordinator, final FlowController flowController) {
this.clusterCoordinator = clusterCoordinator;
this.flowController = flowController;
}
@Override
public synchronized void decommission() throws InterruptedException {
if (clusterCoordinator == null) {
throw new IllegalStateException("Cannot decommission Node because it is not part of a cluster");
}
logger.info("Decommissioning Node...");
localNodeIdentifier = clusterCoordinator.getLocalNodeIdentifier();
if (localNodeIdentifier == null) {
throw new IllegalStateException("Node has not yet connected to the cluster");
}
flowController.stopHeartbeating();
flowController.setClustered(false, null);
logger.info("Instructed FlowController to stop sending heartbeats to Cluster Coordinator and take Cluster Disconnect actions");
disconnectNode();
logger.info("Requested that node be disconnected from cluster");
waitForDisconnection();
logger.info("Successfully disconnected node from cluster");
offloadNode();
logger.info("Successfully triggered Node Offload. Will wait for offload to complete");
waitForOffloadToFinish();
logger.info("Offload has successfully completed.");
removeFromCluster();
logger.info("Requested that node be removed from cluster.");
waitForRemoval();
logger.info("Node successfully removed from cluster. Decommission is complete.");
}
private void disconnectNode() throws InterruptedException {
logger.info("Requesting that Node disconnect from cluster");
while (true) {
final Future<Void> future = clusterCoordinator.requestNodeDisconnect(localNodeIdentifier, DisconnectionCode.USER_DISCONNECTED, "Node is being decommissioned");
try {
future.get();
return;
} catch (final ExecutionException e) {
final Throwable cause = e.getCause();
logger.error("Failed when attempting to disconnect node from cluster", cause);
}
}
}
private void waitForDisconnection() throws InterruptedException {
logger.info("Waiting for Node to be completely disconnected from cluster");
waitForState(Collections.singleton(NodeConnectionState.DISCONNECTED));
}
private void offloadNode() throws InterruptedException {
logger.info("Requesting that Node be offloaded");
while (true) {
final Future<Void> future = clusterCoordinator.requestNodeOffload(localNodeIdentifier, OffloadCode.OFFLOADED, "Node is being decommissioned");
try {
future.get();
break;
} catch (final ExecutionException e) {
final Throwable cause = e.getCause();
logger.error("Failed when attempting to disconnect node from cluster", cause);
}
}
// Wait until status changes to either OFFLOADING or OFFLOADED.
waitForState(new HashSet<>(Arrays.asList(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED)));
}
private void waitForState(final Set<NodeConnectionState> acceptableStates) throws InterruptedException {
while (true) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
final NodeConnectionState state = status.getState();
logger.debug("Node state is {}", state);
if (acceptableStates.contains(state)) {
return;
}
TimeUnit.SECONDS.sleep(delaySeconds);
}
}
private void waitForOffloadToFinish() throws InterruptedException {
logger.info("Waiting for Node to finish offloading");
while (true) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
final NodeConnectionState state = status.getState();
if (state == NodeConnectionState.OFFLOADED) {
return;
}
if (state != NodeConnectionState.OFFLOADING) {
throw new IllegalStateException("Expected state of Node to be OFFLOADING but Node is now in a state of " + state);
}
logger.debug("Node state is OFFLOADING. Will wait {} seconds and check again", delaySeconds);
TimeUnit.SECONDS.sleep(delaySeconds);
}
}
private void removeFromCluster() {
clusterCoordinator.removeNode(localNodeIdentifier, "<Local Decommission>");
}
private void waitForRemoval() throws InterruptedException {
logger.info("Waiting for Node to be completely removed from cluster");
while (true) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
if (status == null) {
return;
}
final NodeConnectionState state = status.getState();
if (state == NodeConnectionState.REMOVED) {
return;
}
logger.debug("Node state is {}. Will wait {} seconds and check again", state, delaySeconds);
TimeUnit.SECONDS.sleep(delaySeconds);
}
}
}

View File

@ -56,4 +56,10 @@
<bean id="heartbeatMonitor" class="org.apache.nifi.cluster.spring.HeartbeatMonitorFactoryBean">
<property name="properties" ref="nifiProperties"/>
</bean>
<bean id="decommissionTask" class="org.apache.nifi.cluster.lifecycle.ClusterDecommissionTask">
<constructor-arg ref="clusterCoordinator" />
<constructor-arg ref="flowController" />
</bean>
</beans>

View File

@ -20,11 +20,11 @@ package org.apache.nifi.cluster.coordination.heartbeat;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
@ -45,7 +45,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
@ -119,9 +121,10 @@ public class TestAbstractHeartbeatMonitor {
}
@Override
public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
public synchronized Future<Void> requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
requestedToDisconnect.add(nodeId);
return CompletableFuture.completedFuture(null);
}
};
@ -253,13 +256,15 @@ public class TestAbstractHeartbeatMonitor {
}
@Override
public synchronized void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) {
public synchronized Future<Void> requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) {
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED));
return CompletableFuture.completedFuture(null);
}
@Override
public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
public synchronized Future<Void> requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
return CompletableFuture.completedFuture(null);
}
@Override

View File

@ -2223,6 +2223,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
if (heartbeatSenderFuture != null) {
LOG.info("FlowController will stop sending heartbeats to Cluster Coordinator");
heartbeatSenderFuture.cancel(false);
}
} finally {

View File

@ -28,6 +28,7 @@ import org.apache.nifi.authorization.exception.AuthorizationAccessException;
import org.apache.nifi.authorization.exception.AuthorizerCreationException;
import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.controller.flow.FlowManager;
@ -193,6 +194,11 @@ public class HeadlessNiFiServer implements NiFiServer {
return new ThreadDumpDiagnosticsFactory();
}
@Override
public DecommissionTask getDecommissionTask() {
return null;
}
public void stop() {
try {
flowService.stop(false);

View File

@ -430,7 +430,7 @@ case "$1" in
install "$@"
;;
start|stop|run|status|is_loaded|dump|diagnostics|env|stateless)
start|stop|decommission|run|status|is_loaded|dump|diagnostics|env|stateless)
main "$@"
;;
@ -440,6 +440,6 @@ case "$1" in
run "start"
;;
*)
echo "Usage nifi {start|stop|run|restart|status|dump|diagnostics|install|stateless}"
echo "Usage nifi {start|stop|decommission|run|restart|status|dump|diagnostics|install|stateless}"
;;
esac

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.util.LimitingInputStream;
import org.slf4j.Logger;
@ -202,6 +203,23 @@ public class BootstrapListener {
case DUMP:
logger.info("Received DUMP request from Bootstrap");
writeDump(socket.getOutputStream());
break;
case DECOMMISSION:
logger.info("Received DECOMMISSION request from Bootstrap");
try {
decommission();
sendAnswer(socket.getOutputStream(), "DECOMMISSION");
nifi.shutdownHook(false);
} catch (final Exception e) {
final OutputStream out = socket.getOutputStream();
out.write(("Failed to decommission node: " + e + "; see app-log for additional details").getBytes(StandardCharsets.UTF_8));
out.flush();
} finally {
socket.close();
}
break;
case DIAGNOSTICS:
logger.info("Received DIAGNOSTICS request from Bootstrap");
@ -250,6 +268,15 @@ public class BootstrapListener {
diagnosticsDump.writeTo(out);
}
private void decommission() throws InterruptedException {
final DecommissionTask decommissionTask = nifi.getServer().getDecommissionTask();
if (decommissionTask == null) {
throw new IllegalArgumentException("This NiFi instance does not support decommissioning");
}
decommissionTask.decommission();
}
private void writeDiagnostics(final OutputStream out, final boolean verbose) throws IOException {
final DiagnosticsDump diagnosticsDump = nifi.getServer().getDiagnosticsFactory().create(verbose);
diagnosticsDump.writeTo(out);
@ -304,6 +331,7 @@ public class BootstrapListener {
SHUTDOWN,
DUMP,
DIAGNOSTICS,
DECOMMISSION,
PING,
IS_LOADED
}

View File

@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
@ -153,6 +154,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
private NarAutoLoader narAutoLoader;
private DiagnosticsFactory diagnosticsFactory;
private SslContextFactory.Server sslContextFactory;
private DecommissionTask decommissionTask;
private WebAppContext webApiContext;
private WebAppContext webDocsContext;
@ -1172,6 +1174,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
}
diagnosticsFactory = webApplicationContext.getBean("diagnosticsFactory", DiagnosticsFactory.class);
decommissionTask = webApplicationContext.getBean("decommissionTask", DecommissionTask.class);
}
// ensure the web document war was loaded and provide the extension mapping
@ -1245,6 +1248,11 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
return new ThreadDumpDiagnosticsFactory();
}
@Override
public DecommissionTask getDecommissionTask() {
return decommissionTask;
}
private void performInjectionForComponentUis(final Collection<WebAppContext> componentUiExtensionWebContexts,
final NiFiWebConfigurationContext configurationContext, final FilterHolder securityFilter) {
if (CollectionUtils.isNotEmpty(componentUiExtensionWebContexts)) {

View File

@ -17,6 +17,7 @@
package org.apache.nifi;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
@ -37,4 +38,6 @@ public interface NiFiServer {
DiagnosticsFactory getDiagnosticsFactory();
DiagnosticsFactory getThreadDumpFactory();
DecommissionTask getDecommissionTask();
}