NIFI-12453 Added cluster-status command to nifi.sh

NIFI-12454 Allow decommissioning cluster node without shutdown

This closes #8100

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-12-03 16:00:56 -05:00 committed by exceptionfactory
parent a21993ef72
commit 1d06185f13
No known key found for this signature in database
11 changed files with 279 additions and 17 deletions

View File

@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@ -115,6 +116,7 @@ public class RunNiFi {
public static final String PING_CMD = "PING";
public static final String DUMP_CMD = "DUMP";
public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
public static final String CLUSTER_STATUS_CMD = "CLUSTER_STATUS";
public static final String IS_LOADED_CMD = "IS_LOADED";
public static final String STATUS_HISTORY_CMD = "STATUS_HISTORY";
@ -204,6 +206,10 @@ public class RunNiFi {
dumpFile = new File(args[1]);
}
}
} else if (cmd.equalsIgnoreCase("cluster-status")) {
if (args.length > 1) {
dumpFile = new File(args[1]);
}
} else if (cmd.equalsIgnoreCase("status-history")) {
if (args.length < 2) {
System.err.printf("Wrong number of arguments: %d instead of 1 or 2, the command parameters are: " +
@ -253,6 +259,7 @@ public class RunNiFi {
case "status-history":
case "restart":
case "env":
case "cluster-status":
break;
default:
printUsage();
@ -274,7 +281,8 @@ public class RunNiFi {
runNiFi.stop();
break;
case "decommission":
exitStatus = runNiFi.decommission();
final boolean shutdown = args.length < 2 || !"--shutdown=false".equals(args[1]);
exitStatus = runNiFi.decommission(shutdown);
break;
case "status":
exitStatus = runNiFi.status();
@ -296,6 +304,9 @@ public class RunNiFi {
case "diagnostics":
runNiFi.diagnostics(dumpFile, verbose);
break;
case "cluster-status":
runNiFi.clusterStatus(dumpFile);
break;
case "status-history":
runNiFi.statusHistory(dumpFile, statusHistoryDays);
break;
@ -659,7 +670,16 @@ public class RunNiFi {
*/
public void diagnostics(final File dumpFile, final boolean verbose) throws IOException {
final String args = verbose ? "--verbose=true" : null;
makeRequest(DIAGNOSTICS_CMD, args, dumpFile, "diagnostics information");
makeRequest(DIAGNOSTICS_CMD, args, dumpFile, null, "diagnostics information");
}
public void clusterStatus(final File dumpFile) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
makeRequest(CLUSTER_STATUS_CMD, null, dumpFile, baos, "cluster status");
final String response = baos.toString(StandardCharsets.UTF_8);
System.out.println("Cluster Status: " + response);
}
}
/**
@ -670,7 +690,7 @@ public class RunNiFi {
* @throws IOException if any issues occur while writing the dump file
*/
public void dump(final File dumpFile) throws IOException {
makeRequest(DUMP_CMD, null, dumpFile, "thread dump");
makeRequest(DUMP_CMD, null, dumpFile, null, "thread dump");
}
/**
@ -681,7 +701,7 @@ public class RunNiFi {
*/
public void statusHistory(final File dumpFile, final String days) throws IOException {
// Due to input validation, the dumpFile cannot currently be null in this scenario.
makeRequest(STATUS_HISTORY_CMD, days, dumpFile, "status history information");
makeRequest(STATUS_HISTORY_CMD, days, dumpFile, null, "status history information");
}
private boolean isNiFiFullyLoaded() throws IOException, NiFiNotRunningException {
@ -703,7 +723,16 @@ public class RunNiFi {
}
}
private void makeRequest(final String request, final String arguments, final File dumpFile, final String contentsDescription) throws IOException {
/**
* Makes a request to the Bootstrap Listener
* @param request the request to send
* @param arguments any arguments for the command, or <code>null</code> if the command takes no arguments
* @param dumpFile a file to write the results to, or <code>null</code> to skip writing the results to any file
* @param outputStream an OutputStream to write the results to, or <code>null</code> to skip writing the results to any OutputStream
* @param contentsDescription a description of the contents being written; used for logging purposes
* @throws IOException if unable to communicate with the NiFi instance or write out the results
*/
private void makeRequest(final String request, final String arguments, final File dumpFile, final OutputStream outputStream, final String contentsDescription) throws IOException {
final Logger logger = defaultLogger; // dump to bootstrap log file by default
final Integer port = getCurrentPort(logger);
if (port == null) {
@ -721,11 +750,21 @@ public class RunNiFi {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
if (fileOut == null) {
logger.info(line);
} else {
boolean written = false;
if (fileOut != null) {
fileOut.write(line.getBytes(StandardCharsets.UTF_8));
fileOut.write('\n');
written = true;
}
if (outputStream != null) {
outputStream.write(line.getBytes(StandardCharsets.UTF_8));
outputStream.write('\n');
written = true;
}
if (!written) {
logger.info(line);
}
}
}
@ -760,7 +799,8 @@ public class RunNiFi {
socketOut.flush();
}
public Integer decommission() throws IOException {
public Integer decommission(final boolean shutdown) throws IOException {
final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
if (port == null) {
@ -792,7 +832,8 @@ public class RunNiFi {
logger.debug("Sending DECOMMISSION Command to port {}", port);
final OutputStream out = socket.getOutputStream();
out.write((DECOMMISSION_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
final String command = DECOMMISSION_CMD + " " + secretKey + " --shutdown=" + shutdown + "\n";
out.write(command.getBytes(StandardCharsets.UTF_8));
out.flush();
socket.shutdownOutput();

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster;
public interface ClusterDetailsFactory {
/**
* @return the current Connection State of this NiFi instance
*/
ConnectionState getConnectionState();
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster;
public enum ConnectionState {
/**
* This NiFi instance is not part of a cluster
*/
NOT_CLUSTERED,
/**
* Instance is in the process of connecting to the cluster
*/
CONNECTING,
/**
* Instance is connected to the cluster
*/
CONNECTED,
/**
* Instance is in the process of disconnecting from the cluster
*/
DISCONNECTING,
/**
* Instance is disconnected from the cluster
*/
DISCONNECTED,
/**
* Instance is offloading
*/
OFFLOADING,
/**
* Instances has completed offloading
*/
OFFLOADED,
/**
* Instance has been removed from the cluster
*/
REMOVED,
/**
* The state is not currently known
*/
UNKNOWN;
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardClusterDetailsFactory implements ClusterDetailsFactory {
private static final Logger logger = LoggerFactory.getLogger(StandardClusterDetailsFactory.class);
private final ClusterCoordinator clusterCoordinator;
/**
* Constructor marked as never used because it is constructed via Spring
*/
public StandardClusterDetailsFactory(final ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}
@Override
public ConnectionState getConnectionState() {
if (clusterCoordinator == null) {
logger.debug("No Cluster Coordinator has been configured; returning Connection State of NOT_CLUSTERED");
return ConnectionState.NOT_CLUSTERED;
}
final NodeIdentifier nodeIdentifier = clusterCoordinator.getLocalNodeIdentifier();
if (nodeIdentifier == null) {
logger.info("Local Node Identifier has not yet been established; returning Connection State of UNKNOWN");
return ConnectionState.UNKNOWN;
}
final NodeConnectionStatus connectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier);
if (connectionStatus == null) {
logger.info("Cluster connection status is not currently known for Node Identifier {}; returning Connection State of UNKNOWN", nodeIdentifier.getId());
return ConnectionState.UNKNOWN;
}
final String stateName = connectionStatus.getState().name();
try {
final ConnectionState connectionState = ConnectionState.valueOf(stateName);
logger.debug("Returning Connection State of {}", connectionState);
return connectionState;
} catch (final IllegalArgumentException iae) {
logger.warn("Cluster Coordinator reports Connection State of {}, which is not a known state; returning UNKNOWN", stateName);
return ConnectionState.UNKNOWN;
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowController.GroupStatusCounts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -139,6 +140,7 @@ public class ClusterDecommissionTask implements DecommissionTask {
private void waitForOffloadToFinish() throws InterruptedException {
logger.info("Waiting for Node to finish offloading");
int iterations = 0;
while (true) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
final NodeConnectionState state = status.getState();
@ -150,7 +152,16 @@ public class ClusterDecommissionTask implements DecommissionTask {
throw new IllegalStateException("Expected state of Node to be OFFLOADING but Node is now in a state of " + state);
}
// Every 10th iteration log how many FlowFiles are left
if (++iterations % 10 == 0) {
final GroupStatusCounts statusCounts = flowController.getGroupStatusCounts(flowController.getFlowManager().getRootGroup());
final int flowFileCount = statusCounts.getQueuedCount();
final long byteCount = statusCounts.getQueuedContentSize();
logger.info("Node state is OFFLOADING. Currently, there are {} FlowFiles ({} bytes) left on node.", flowFileCount, byteCount);
} else {
logger.debug("Node state is OFFLOADING. Will wait {} seconds and check again", delaySeconds);
}
TimeUnit.SECONDS.sleep(delaySeconds);
}
}

View File

@ -61,4 +61,7 @@
<constructor-arg ref="flowController" />
</bean>
<bean id="clusterDetailsFactory" class="org.apache.nifi.cluster.StandardClusterDetailsFactory">
<constructor-arg ref="clusterCoordinator" />
</bean>
</beans>

View File

@ -27,6 +27,8 @@ import org.apache.nifi.authorization.exception.AuthorizationAccessException;
import org.apache.nifi.authorization.exception.AuthorizerCreationException;
import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.cluster.ConnectionState;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.StandardFlowService;
@ -216,6 +218,11 @@ public class HeadlessNiFiServer implements NiFiServer {
return null;
}
@Override
public ClusterDetailsFactory getClusterDetailsFactory() {
return () -> ConnectionState.NOT_CLUSTERED;
}
@Override
public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
return null;

View File

@ -455,7 +455,7 @@ run() {
# Wait just a bit (3 secs) to wait for the logging to finish and then echo a new-line.
# We do this to avoid having logs spewed on the console after running the command and then not giving
# control back to the user
sleep 3
sleep 1
echo
}
@ -470,7 +470,7 @@ case "$1" in
install "$@"
;;
start|stop|decommission|run|status|is_loaded|dump|diagnostics|status-history|env|stateless|set-sensitive-properties-algorithm|set-sensitive-properties-key|set-single-user-credentials)
start|stop|decommission|run|status|is_loaded|dump|diagnostics|status-history|env|stateless|set-sensitive-properties-algorithm|set-sensitive-properties-key|set-single-user-credentials|cluster-status)
main "$@"
;;
@ -480,6 +480,6 @@ case "$1" in
run "start"
;;
*)
echo "Usage nifi {start|stop|decommission|run|restart|status|dump|diagnostics|status-history|install|stateless|set-sensitive-properties-algorithm|set-sensitive-properties-key|set-single-user-credentials}"
echo "Usage nifi {start|stop|decommission|run|restart|status|dump|diagnostics|status-history|install|stateless|set-sensitive-properties-algorithm|set-sensitive-properties-key|set-single-user-credentials|cluster-status}"
;;
esac

View File

@ -205,21 +205,44 @@ public class BootstrapListener {
logger.info("Received DUMP request from Bootstrap");
writeDump(socket.getOutputStream());
break;
case CLUSTER_STATUS:
logger.info("Received CLUSTER_STATUS request from Bootstrap");
final String clusterStatus = getClusterStatus();
sendAnswer(socket.getOutputStream(), clusterStatus);
break;
case DECOMMISSION:
logger.info("Received DECOMMISSION request from Bootstrap");
boolean shutdown = true;
final String[] decommissionArgs = request.getArgs();
if (decommissionArgs != null) {
for (final String arg : decommissionArgs) {
if ("--shutdown=false".equalsIgnoreCase(arg)) {
shutdown = false;
break;
}
}
}
logger.info("Command indicates that after decommission, shutdown={}", shutdown);
try {
decommission();
sendAnswer(socket.getOutputStream(), "DECOMMISSION");
if (shutdown) {
nifi.shutdownHook(false);
}
} catch (final Exception e) {
final OutputStream out = socket.getOutputStream();
out.write(("Failed to decommission node: " + e + "; see app-log for additional details").getBytes(StandardCharsets.UTF_8));
out.flush();
} finally {
if (shutdown) {
socket.close();
}
}
break;
case DIAGNOSTICS:
@ -275,6 +298,10 @@ public class BootstrapListener {
diagnosticsDump.writeTo(out);
}
private String getClusterStatus() {
return nifi.getServer().getClusterDetailsFactory().getConnectionState().name();
}
private void decommission() throws InterruptedException {
final DecommissionTask decommissionTask = nifi.getServer().getDecommissionTask();
if (decommissionTask == null) {
@ -346,7 +373,8 @@ public class BootstrapListener {
DECOMMISSION,
PING,
IS_LOADED,
STATUS_HISTORY
STATUS_HISTORY,
CLUSTER_STATUS
}
private final RequestType requestType;

View File

@ -52,6 +52,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
@ -167,6 +168,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
private DiagnosticsFactory diagnosticsFactory;
private DecommissionTask decommissionTask;
private StatusHistoryDumpFactory statusHistoryDumpFactory;
private ClusterDetailsFactory clusterDetailsFactory;
private WebAppContext webApiContext;
private WebAppContext webDocsContext;
@ -849,6 +851,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
diagnosticsFactory = webApplicationContext.getBean("diagnosticsFactory", DiagnosticsFactory.class);
decommissionTask = webApplicationContext.getBean("decommissionTask", DecommissionTask.class);
statusHistoryDumpFactory = webApplicationContext.getBean("statusHistoryDumpFactory", StatusHistoryDumpFactory.class);
clusterDetailsFactory = webApplicationContext.getBean("clusterDetailsFactory", ClusterDetailsFactory.class);
}
// ensure the web document war was loaded and provide the extension mapping
@ -956,6 +959,11 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
return decommissionTask;
}
@Override
public ClusterDetailsFactory getClusterDetailsFactory() {
return clusterDetailsFactory;
}
@Override
public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
return statusHistoryDumpFactory;

View File

@ -17,6 +17,7 @@
package org.apache.nifi;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
@ -42,5 +43,7 @@ public interface NiFiServer {
DecommissionTask getDecommissionTask();
ClusterDetailsFactory getClusterDetailsFactory();
StatusHistoryDumpFactory getStatusHistoryDumpFactory();
}