diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index dc213b5e02..7d3ecc86d8 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -111,8 +111,10 @@ public class RunNiFi { public static final String PID_KEY = "pid"; public static final int STARTUP_WAIT_SECONDS = 60; + public static final long GRACEFUL_SHUTDOWN_RETRY_MILLIS = 2000L; public static final String SHUTDOWN_CMD = "SHUTDOWN"; + public static final String DECOMMISSION_CMD = "DECOMMISSION"; public static final String PING_CMD = "PING"; public static final String DUMP_CMD = "DUMP"; public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS"; @@ -169,6 +171,7 @@ public class RunNiFi { System.out.println("Start : Start a new instance of Apache NiFi"); System.out.println("Stop : Stop a running instance of Apache NiFi"); System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance"); + System.out.println("Decommission : Disconnects Apache NiFi from its cluster, offloads its data to other nodes in the cluster, removes itself from the cluster, and shuts down the instance"); System.out.println("Status : Determine if there is a running instance of Apache NiFi"); System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given"); System.out.println("Diagnostics : Write diagnostic information to the file specified by [options], or to the log if no file is given. The --verbose flag may be provided as an option before " + @@ -219,6 +222,7 @@ public class RunNiFi { case "start": case "run": case "stop": + case "decommission": case "status": case "is_loaded": case "dump": @@ -245,6 +249,9 @@ public class RunNiFi { case "stop": runNiFi.stop(); break; + case "decommission": + exitStatus = runNiFi.decommission(); + break; case "status": exitStatus = runNiFi.status(); break; @@ -810,6 +817,63 @@ public class RunNiFi { "Hello,\n\nApache NiFi has been told to initiate a shutdown on host " + hostname + " at " + now + " by user " + user); } + public Integer decommission() throws IOException { + final Logger logger = cmdLogger; + final Integer port = getCurrentPort(logger); + if (port == null) { + logger.info("Apache NiFi is not currently running"); + return 15; + } + + // indicate that a stop command is in progress + final File lockFile = getLockFile(logger); + if (!lockFile.exists()) { + lockFile.createNewFile(); + } + + final Properties nifiProps = loadProperties(logger); + final String secretKey = nifiProps.getProperty("secret.key"); + final String pid = nifiProps.getProperty(PID_KEY); + final File statusFile = getStatusFile(logger); + final File pidFile = getPidFile(logger); + + try (final Socket socket = new Socket()) { + logger.debug("Connecting to NiFi instance"); + socket.setSoTimeout(10000); + socket.connect(new InetSocketAddress("localhost", port)); + logger.debug("Established connection to NiFi instance."); + + // We don't know how long it will take for the offloading to complete. It could be a while. So don't timeout. + // User can press Ctrl+C to terminate if they don't want to wait + socket.setSoTimeout(0); + + logger.debug("Sending DECOMMISSION Command to port {}", port); + final OutputStream out = socket.getOutputStream(); + out.write((DECOMMISSION_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + socket.shutdownOutput(); + + final String response = readResponse(socket.getInputStream()); + + if (DECOMMISSION_CMD.equals(response)) { + logger.debug("Received response to DECOMMISSION command: {}", response); + + if (pid != null) { + waitForShutdown(pid, logger, statusFile, pidFile); + } + + return null; + } else { + logger.error("When sending DECOMMISSION command to NiFi, got unexpected response {}", response); + return 18; + } + } finally { + if (lockFile.exists() && !lockFile.delete()) { + logger.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile); + } + } + } + public void stop() throws IOException { final Logger logger = cmdLogger; final Integer port = getCurrentPort(logger); @@ -843,69 +907,17 @@ public class RunNiFi { out.flush(); socket.shutdownOutput(); - final InputStream in = socket.getInputStream(); - int lastChar; - final StringBuilder sb = new StringBuilder(); - while ((lastChar = in.read()) > -1) { - sb.append((char) lastChar); - } - final String response = sb.toString().trim(); - + final String response = readResponse(socket.getInputStream()); logger.debug("Received response to SHUTDOWN command: {}", response); if (SHUTDOWN_CMD.equals(response)) { logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now"); if (pid != null) { - final Properties bootstrapProperties = new Properties(); - try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { - bootstrapProperties.load(fis); - } - - String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE); - int gracefulShutdownSeconds; - try { - gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); - } catch (final NumberFormatException nfe) { - gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE); - } - - notifyStop(); - final long startWait = System.nanoTime(); - while (isProcessRunning(pid, logger)) { - logger.info("Waiting for Apache NiFi to finish shutting down..."); - final long waitNanos = System.nanoTime() - startWait; - final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); - if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) { - if (isProcessRunning(pid, logger)) { - logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds); - try { - killProcessTree(pid, logger); - } catch (final IOException ioe) { - logger.error("Failed to kill Process with PID {}", pid); - } - } - break; - } else { - try { - Thread.sleep(2000L); - } catch (final InterruptedException ie) { - } - } - } - - if (statusFile.exists() && !statusFile.delete()) { - logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile); - } - - if (pidFile.exists() && !pidFile.delete()) { - logger.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile); - } - - logger.info("NiFi has finished shutting down."); + waitForShutdown(pid, logger, statusFile, pidFile); } } else { - logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response); + logger.error("When sending SHUTDOWN command to NiFi, got unexpected response: {}", response); } } catch (final IOException ioe) { if (pid == null) { @@ -926,6 +938,65 @@ public class RunNiFi { } } + private String readResponse(final InputStream in) throws IOException { + int lastChar; + final StringBuilder sb = new StringBuilder(); + while ((lastChar = in.read()) > -1) { + sb.append((char) lastChar); + } + + return sb.toString().trim(); + } + + private void waitForShutdown(final String pid, final Logger logger, final File statusFile, final File pidFile) throws IOException { + final Properties bootstrapProperties = new Properties(); + try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { + bootstrapProperties.load(fis); + } + + String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE); + int gracefulShutdownSeconds; + try { + gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); + } catch (final NumberFormatException nfe) { + gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE); + } + + notifyStop(); + final long startWait = System.nanoTime(); + while (isProcessRunning(pid, logger)) { + logger.info("Waiting for Apache NiFi to finish shutting down..."); + final long waitNanos = System.nanoTime() - startWait; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) { + if (isProcessRunning(pid, logger)) { + logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds); + try { + killProcessTree(pid, logger); + } catch (final IOException ioe) { + logger.error("Failed to kill Process with PID {}", pid); + } + } + break; + } else { + try { + Thread.sleep(GRACEFUL_SHUTDOWN_RETRY_MILLIS); + } catch (final InterruptedException ie) { + } + } + } + + if (statusFile.exists() && !statusFile.delete()) { + logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile); + } + + if (pidFile.exists() && !pidFile.delete()) { + logger.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile); + } + + logger.info("NiFi has finished shutting down."); + } + private static List getChildProcesses(final String ppid) throws IOException { final Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", ppid}); final List childPids = new ArrayList<>(); diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java new file mode 100644 index 0000000000..58aa3deb63 --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +public interface DecommissionTask { + void decommission() throws InterruptedException; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java index f77b9091d7..d82bec94ea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java @@ -18,6 +18,7 @@ package org.apache.nifi.documentation.example; import org.apache.nifi.NiFiServer; import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.controller.DecommissionTask; import org.apache.nifi.diagnostics.DiagnosticsFactory; import org.apache.nifi.nar.ExtensionMapping; import org.apache.nifi.util.NiFiProperties; @@ -54,4 +55,9 @@ public class NiFiServerStub implements NiFiServer { public DiagnosticsFactory getThreadDumpFactory() { return null; } + + @Override + public DecommissionTask getDecommissionTask() { + return null; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index 2b7c07fa1f..a27dac332e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -18,11 +18,11 @@ package org.apache.nifi.cluster.coordination; import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; -import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.coordination.node.NodeWorkload; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.reporting.Severity; @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; /** *

@@ -85,7 +86,7 @@ public interface ClusterCoordinator { * @param offloadCode the code that represents why this node is being asked to be offloaded * @param explanation an explanation as to why the node is being asked to be offloaded */ - void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation); + Future requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation); /** * Sends a request to the node to disconnect from the cluster. @@ -96,7 +97,7 @@ public interface ClusterCoordinator { * @param explanation an explanation as to why the node is being asked to disconnect * from the cluster */ - void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation); + Future requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation); /** * Notifies the Cluster Coordinator that the node with the given ID has requested to disconnect diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java index b21068ffe5..462117245b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java @@ -16,19 +16,6 @@ */ package org.apache.nifi.cluster.protocol.impl; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -36,11 +23,11 @@ import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; +import org.apache.nifi.cluster.protocol.message.OffloadMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -52,6 +39,20 @@ import org.apache.nifi.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** * A protocol sender for sending protocol messages from the cluster manager to * nodes. @@ -286,14 +287,35 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin executor.submit(new Runnable() { @Override public void run() { - try (final Socket socket = createSocket(nodeId, true)) { - // marshal message to output stream - socket.getOutputStream().write(msgBytes); - } catch (final IOException ioe) { - throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe); + final int attempts = 10; + final int retrySeconds = 6; + Exception lastException = null; + + for (int i = 0; i < attempts; i++) { + try (final Socket socket = createSocket(nodeId, true)) { + // marshal message to output stream + final OutputStream out = socket.getOutputStream(); + out.write(msgBytes); + } catch (final Exception e) { + logger.warn("Failed to send Node Status Change message to {}", nodeId, e); + + lastException = e; + + try { + Thread.sleep(retrySeconds * 1000L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + + continue; + } + + logger.debug("Notified {} of status change {}", nodeId, msg); + return; } - logger.debug("Notified {} of status change {}", nodeId, msg); + throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, lastException); } }); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 2b02a49232..c8a77ed2f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -343,4 +343,5 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { public void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState newState) { } } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index d029034fa9..5326073094 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -86,9 +86,11 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -270,7 +272,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return nodeId; } - private NodeIdentifier waitForElectedClusterCoordinator() { + public NodeIdentifier waitForElectedClusterCoordinator() { return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false)); } @@ -312,7 +314,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl if (proposedStatus.getState() == NodeConnectionState.REMOVED) { removeNode(nodeId); } else { - updateNodeStatus(nodeId, proposedStatus, false); + forcefullyUpdateNodeStatus(nodeId, proposedStatus, false); } } @@ -339,11 +341,45 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return removed; } + /** + * Updates the status of the node with the given ID to the given status if and only if the updated status's Update ID is >= the Update ID of the current + * Node status or if there is currently no node with the given identifier + * @param nodeId the NodeIdentifier for the node whose ID is to be updated + * @param updatedStatus the new status for the node + * @return the previous status for the node + */ private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus) { return updateNodeStatus(nodeId, updatedStatus, true); } private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) { + final String nodeUuid = nodeId.getId(); + + while (true) { + final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeUuid); + if (currentStatus == null) { + onNodeAdded(nodeId, storeState); + + // Return null because that was the previous state + return null; + } + + if (currentStatus.getUpdateIdentifier() > updatedStatus.getUpdateIdentifier()) { + logger.debug("Received status update {} but ignoring it because it has an Update ID of {} and the current status has an Update ID of {}", + updatedStatus, updatedStatus.getUpdateIdentifier(), currentStatus.getUpdateIdentifier()); + return currentStatus; + } + + final boolean updated = nodeStatuses.replace(nodeUuid, currentStatus, updatedStatus); + if (updated) { + onNodeStateChange(nodeId, updatedStatus.getState()); + logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, updatedStatus); + return currentStatus; + } + } + } + + private NodeConnectionStatus forcefullyUpdateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) { final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId.getId(), updatedStatus); if (evictedStatus == null) { onNodeAdded(nodeId, storeState); @@ -502,12 +538,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } @Override - public void requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) { + public Future requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) { final Set offloadNodeIds = getNodeIdentifiers(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED); if (offloadNodeIds.contains(nodeId)) { logger.debug("Attempted to offload node but the node is already offloading or offloaded"); // no need to do anything here, the node is currently offloading or already offloaded - return; + return CompletableFuture.completedFuture(null); } final Set disconnectedNodeIds = getNodeIdentifiers(NodeConnectionState.DISCONNECTED); @@ -524,11 +560,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl request.setExplanation(explanation); addNodeEvent(nodeId, "Offload requested due to " + explanation); - offloadAsynchronously(request, 10, 5); + return offloadAsynchronously(request, 10, 5); } @Override - public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { + public Future requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { final Set connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED); if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) { throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected"); @@ -541,7 +577,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // There is no need to tell the node that it's disconnected if it is due to being // shutdown, as we will not be able to connect to the node anyway. if (disconnectionCode == DisconnectionCode.NODE_SHUTDOWN) { - return; + return CompletableFuture.completedFuture(null); } final DisconnectMessage request = new DisconnectMessage(); @@ -549,7 +585,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl request.setExplanation(explanation); addNodeEvent(nodeId, "Disconnection requested due to " + explanation); - disconnectAsynchronously(request, 10, 5); + return disconnectAsynchronously(request, 10, 5); } @Override @@ -834,12 +870,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl void updateNodeStatus(final NodeConnectionStatus status, final boolean waitForCoordinator) { final NodeIdentifier nodeId = status.getNodeIdentifier(); - // In this case, we are using nodeStatuses.put() instead of getting the current value and + // In this case, we are using nodeStatuses.put() (i.e., forcefully updating node status) instead of getting the current value and // comparing that to the new value and using the one with the largest update id. This is because // this method is called when something occurs that causes this node to change the status of the // node in question. We only use comparisons against the current value when we receive an update // about a node status from a different node, since those may be received out-of-order. - final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status); + final NodeConnectionStatus currentStatus = forcefullyUpdateNodeStatus(nodeId, status, true); final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState(); if (Objects.equals(status, currentStatus)) { logger.debug("Received notification of Node Status Change for {} but the status remained the same: {}", nodeId, status); @@ -901,19 +937,24 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl senderListener.notifyNodeStatusChange(nodesToNotify, message); } - private void offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) { + private Future offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) { + final CompletableFuture future = new CompletableFuture<>(); + final Thread offloadThread = new Thread(new Runnable() { @Override public void run() { final NodeIdentifier nodeId = request.getNodeId(); + Exception lastException = null; for (int i = 0; i < attempts; i++) { try { senderListener.offload(request); reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation()); + future.complete(null); return; } catch (final Exception e) { logger.error("Failed to notify {} that it has been offloaded due to {}", request.getNodeId(), request.getExplanation(), e); + lastException = e; try { Thread.sleep(retrySeconds * 1000L); @@ -923,38 +964,50 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } } } + + future.completeExceptionally(lastException); } }, "Offload " + request.getNodeId()); offloadThread.start(); + return future; } - private void disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) { + private Future disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) { + final CompletableFuture future = new CompletableFuture<>(); + final Thread disconnectThread = new Thread(new Runnable() { @Override public void run() { final NodeIdentifier nodeId = request.getNodeId(); + Exception lastException = null; for (int i = 0; i < attempts; i++) { try { senderListener.disconnect(request); reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation()); + future.complete(null); return; } catch (final Exception e) { logger.error("Failed to notify {} that it has been disconnected from the cluster due to {}", request.getNodeId(), request.getExplanation()); + lastException = e; try { Thread.sleep(retrySeconds * 1000L); } catch (final InterruptedException ie) { + future.completeExceptionally(ie); Thread.currentThread().interrupt(); return; } } } + + future.completeExceptionally(lastException); } }, "Disconnect " + request.getNodeId()); disconnectThread.start(); + return future; } public void validateHeartbeat(final NodeHeartbeat heartbeat) { @@ -1092,12 +1145,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { if (removeNodeConditionally(nodeId, oldStatus)) { storeState(); + logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); } } else { updateNodeStatus(nodeId, updatedStatus); } - logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); logger.debug("State of cluster nodes is now {}", nodeStatuses); final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java new file mode 100644 index 0000000000..592e51cc87 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.lifecycle; + +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.coordination.node.OffloadCode; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.DecommissionTask; +import org.apache.nifi.controller.FlowController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class ClusterDecommissionTask implements DecommissionTask { + private static final Logger logger = LoggerFactory.getLogger(ClusterDecommissionTask.class); + private static final int delaySeconds = 3; + + private final ClusterCoordinator clusterCoordinator; + private final FlowController flowController; + private NodeIdentifier localNodeIdentifier; + + public ClusterDecommissionTask(final ClusterCoordinator clusterCoordinator, final FlowController flowController) { + this.clusterCoordinator = clusterCoordinator; + this.flowController = flowController; + } + + @Override + public synchronized void decommission() throws InterruptedException { + if (clusterCoordinator == null) { + throw new IllegalStateException("Cannot decommission Node because it is not part of a cluster"); + } + + logger.info("Decommissioning Node..."); + localNodeIdentifier = clusterCoordinator.getLocalNodeIdentifier(); + if (localNodeIdentifier == null) { + throw new IllegalStateException("Node has not yet connected to the cluster"); + } + + flowController.stopHeartbeating(); + flowController.setClustered(false, null); + logger.info("Instructed FlowController to stop sending heartbeats to Cluster Coordinator and take Cluster Disconnect actions"); + + disconnectNode(); + logger.info("Requested that node be disconnected from cluster"); + + waitForDisconnection(); + logger.info("Successfully disconnected node from cluster"); + + offloadNode(); + logger.info("Successfully triggered Node Offload. Will wait for offload to complete"); + + waitForOffloadToFinish(); + logger.info("Offload has successfully completed."); + + removeFromCluster(); + logger.info("Requested that node be removed from cluster."); + + waitForRemoval(); + logger.info("Node successfully removed from cluster. Decommission is complete."); + } + + private void disconnectNode() throws InterruptedException { + logger.info("Requesting that Node disconnect from cluster"); + + while (true) { + final Future future = clusterCoordinator.requestNodeDisconnect(localNodeIdentifier, DisconnectionCode.USER_DISCONNECTED, "Node is being decommissioned"); + try { + future.get(); + return; + } catch (final ExecutionException e) { + final Throwable cause = e.getCause(); + logger.error("Failed when attempting to disconnect node from cluster", cause); + } + } + } + + private void waitForDisconnection() throws InterruptedException { + logger.info("Waiting for Node to be completely disconnected from cluster"); + waitForState(Collections.singleton(NodeConnectionState.DISCONNECTED)); + } + + private void offloadNode() throws InterruptedException { + logger.info("Requesting that Node be offloaded"); + + while (true) { + final Future future = clusterCoordinator.requestNodeOffload(localNodeIdentifier, OffloadCode.OFFLOADED, "Node is being decommissioned"); + try { + future.get(); + break; + } catch (final ExecutionException e) { + final Throwable cause = e.getCause(); + logger.error("Failed when attempting to disconnect node from cluster", cause); + } + } + + // Wait until status changes to either OFFLOADING or OFFLOADED. + waitForState(new HashSet<>(Arrays.asList(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED))); + } + + private void waitForState(final Set acceptableStates) throws InterruptedException { + while (true) { + final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier); + final NodeConnectionState state = status.getState(); + logger.debug("Node state is {}", state); + + if (acceptableStates.contains(state)) { + return; + } + + TimeUnit.SECONDS.sleep(delaySeconds); + } + } + + private void waitForOffloadToFinish() throws InterruptedException { + logger.info("Waiting for Node to finish offloading"); + + while (true) { + final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier); + final NodeConnectionState state = status.getState(); + if (state == NodeConnectionState.OFFLOADED) { + return; + } + + if (state != NodeConnectionState.OFFLOADING) { + throw new IllegalStateException("Expected state of Node to be OFFLOADING but Node is now in a state of " + state); + } + + logger.debug("Node state is OFFLOADING. Will wait {} seconds and check again", delaySeconds); + TimeUnit.SECONDS.sleep(delaySeconds); + } + } + + private void removeFromCluster() { + clusterCoordinator.removeNode(localNodeIdentifier, ""); + } + + private void waitForRemoval() throws InterruptedException { + logger.info("Waiting for Node to be completely removed from cluster"); + + while (true) { + final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier); + if (status == null) { + return; + } + + final NodeConnectionState state = status.getState(); + if (state == NodeConnectionState.REMOVED) { + return; + } + + logger.debug("Node state is {}. Will wait {} seconds and check again", state, delaySeconds); + TimeUnit.SECONDS.sleep(delaySeconds); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml index c1a7665791..a4c2573b9f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml @@ -51,9 +51,15 @@ - + + + + + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 98aa0b4b18..5efb9ca033 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -20,11 +20,11 @@ package org.apache.nifi.cluster.coordination.heartbeat; import org.apache.nifi.cluster.ReportedEvent; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; -import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.coordination.node.NodeWorkload; +import org.apache.nifi.cluster.coordination.node.OffloadCode; import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.reporting.Severity; @@ -45,7 +45,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -119,9 +121,10 @@ public class TestAbstractHeartbeatMonitor { } @Override - public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { + public synchronized Future requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { super.requestNodeDisconnect(nodeId, disconnectionCode, explanation); requestedToDisconnect.add(nodeId); + return CompletableFuture.completedFuture(null); } }; @@ -253,13 +256,15 @@ public class TestAbstractHeartbeatMonitor { } @Override - public synchronized void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) { + public synchronized Future requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) { statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED)); + return CompletableFuture.completedFuture(null); } @Override - public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) { + public synchronized Future requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) { statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED)); + return CompletableFuture.completedFuture(null); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2cb5e28a60..5154963ede 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2223,6 +2223,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } if (heartbeatSenderFuture != null) { + LOG.info("FlowController will stop sending heartbeats to Cluster Coordinator"); heartbeatSenderFuture.cancel(false); } } finally { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java index 833075fe21..c2b5143814 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java @@ -28,6 +28,7 @@ import org.apache.nifi.authorization.exception.AuthorizationAccessException; import org.apache.nifi.authorization.exception.AuthorizerCreationException; import org.apache.nifi.authorization.exception.AuthorizerDestructionException; import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.controller.DecommissionTask; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.StandardFlowService; import org.apache.nifi.controller.flow.FlowManager; @@ -193,6 +194,11 @@ public class HeadlessNiFiServer implements NiFiServer { return new ThreadDumpDiagnosticsFactory(); } + @Override + public DecommissionTask getDecommissionTask() { + return null; + } + public void stop() { try { flowService.stop(false); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh index f1d1f597ea..63fbfecc5d 100755 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh @@ -430,7 +430,7 @@ case "$1" in install "$@" ;; - start|stop|run|status|is_loaded|dump|diagnostics|env|stateless) + start|stop|decommission|run|status|is_loaded|dump|diagnostics|env|stateless) main "$@" ;; @@ -440,6 +440,6 @@ case "$1" in run "start" ;; *) - echo "Usage nifi {start|stop|run|restart|status|dump|diagnostics|install|stateless}" + echo "Usage nifi {start|stop|decommission|run|restart|status|dump|diagnostics|install|stateless}" ;; esac diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java index 9eeebaf591..b9b12ec41e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -16,6 +16,7 @@ */ package org.apache.nifi; +import org.apache.nifi.controller.DecommissionTask; import org.apache.nifi.diagnostics.DiagnosticsDump; import org.apache.nifi.util.LimitingInputStream; import org.slf4j.Logger; @@ -202,6 +203,23 @@ public class BootstrapListener { case DUMP: logger.info("Received DUMP request from Bootstrap"); writeDump(socket.getOutputStream()); + break; + case DECOMMISSION: + logger.info("Received DECOMMISSION request from Bootstrap"); + + try { + decommission(); + sendAnswer(socket.getOutputStream(), "DECOMMISSION"); + nifi.shutdownHook(false); + } catch (final Exception e) { + final OutputStream out = socket.getOutputStream(); + + out.write(("Failed to decommission node: " + e + "; see app-log for additional details").getBytes(StandardCharsets.UTF_8)); + out.flush(); + } finally { + socket.close(); + } + break; case DIAGNOSTICS: logger.info("Received DIAGNOSTICS request from Bootstrap"); @@ -250,6 +268,15 @@ public class BootstrapListener { diagnosticsDump.writeTo(out); } + private void decommission() throws InterruptedException { + final DecommissionTask decommissionTask = nifi.getServer().getDecommissionTask(); + if (decommissionTask == null) { + throw new IllegalArgumentException("This NiFi instance does not support decommissioning"); + } + + decommissionTask.decommission(); + } + private void writeDiagnostics(final OutputStream out, final boolean verbose) throws IOException { final DiagnosticsDump diagnosticsDump = nifi.getServer().getDiagnosticsFactory().create(verbose); diagnosticsDump.writeTo(out); @@ -304,6 +331,7 @@ public class BootstrapListener { SHUTDOWN, DUMP, DIAGNOSTICS, + DECOMMISSION, PING, IS_LOADED } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java index 3804ea7493..efcf271d50 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.NiFiServer; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleDetails; +import org.apache.nifi.controller.DecommissionTask; import org.apache.nifi.controller.UninheritableFlowException; import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSynchronizationException; @@ -153,6 +154,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader { private NarAutoLoader narAutoLoader; private DiagnosticsFactory diagnosticsFactory; private SslContextFactory.Server sslContextFactory; + private DecommissionTask decommissionTask; private WebAppContext webApiContext; private WebAppContext webDocsContext; @@ -1172,6 +1174,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader { } diagnosticsFactory = webApplicationContext.getBean("diagnosticsFactory", DiagnosticsFactory.class); + decommissionTask = webApplicationContext.getBean("decommissionTask", DecommissionTask.class); } // ensure the web document war was loaded and provide the extension mapping @@ -1245,6 +1248,11 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader { return new ThreadDumpDiagnosticsFactory(); } + @Override + public DecommissionTask getDecommissionTask() { + return decommissionTask; + } + private void performInjectionForComponentUis(final Collection componentUiExtensionWebContexts, final NiFiWebConfigurationContext configurationContext, final FilterHolder securityFilter) { if (CollectionUtils.isNotEmpty(componentUiExtensionWebContexts)) { diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java index 8124876975..cc87079f6b 100644 --- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java +++ b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java @@ -17,6 +17,7 @@ package org.apache.nifi; import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.controller.DecommissionTask; import org.apache.nifi.diagnostics.DiagnosticsFactory; import org.apache.nifi.nar.ExtensionMapping; import org.apache.nifi.util.NiFiProperties; @@ -37,4 +38,6 @@ public interface NiFiServer { DiagnosticsFactory getDiagnosticsFactory(); DiagnosticsFactory getThreadDumpFactory(); + + DecommissionTask getDecommissionTask(); }