NIFI-11761 Fixed MiNiFi restart issue when graceful shutdown period expires. MiNiFi restart sends bootstrap to background

Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com>

This closes #7448.
This commit is contained in:
Ferenc Kis 2023-06-28 16:24:00 +02:00 committed by Csaba Bejan
parent 3aba769cc3
commit 3c3cf9976e
No known key found for this signature in database
GPG Key ID: C59951609F8BDDEB
4 changed files with 39 additions and 12 deletions

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap;
import static java.util.Collections.singleton;
@ -76,6 +77,8 @@ public class RunMiNiFi implements ConfigurationFileHolder {
private static final String STATUS_FILE_PORT_KEY = "port";
private static final String STATUS_FILE_SECRET_KEY = "secret.key";
private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION";
private static final String PROCESS_KILL_SUCCESS_CHECK_RETRIES_KEY = "process.kill.success.check.retries";
private static final String PROCESS_KILL_SUCCESS_CHECK_RETRIES_DEFAULT = "30";
private final BootstrapFileProvider bootstrapFileProvider;
private final ConfigurationChangeCoordinator configurationChangeCoordinator;
@ -98,18 +101,20 @@ public class RunMiNiFi implements ConfigurationFileHolder {
bootstrapFileProvider = new BootstrapFileProvider(bootstrapConfigFile);
objectMapper = getObjectMapper();
Properties properties = bootstrapFileProvider.getStatusProperties();
Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
miNiFiParameters = new MiNiFiParameters(
Optional.ofNullable(properties.getProperty(STATUS_FILE_PORT_KEY)).map(Integer::parseInt).orElse(UNINITIALIZED),
Optional.ofNullable(properties.getProperty(STATUS_FILE_PID_KEY)).map(Integer::parseInt).orElse(UNINITIALIZED),
properties.getProperty(STATUS_FILE_SECRET_KEY)
);
ProcessUtils processUtils = new UnixProcessUtils();
ProcessUtils processUtils = new UnixProcessUtils(Integer.parseInt(
bootstrapProperties.getProperty(PROCESS_KILL_SUCCESS_CHECK_RETRIES_KEY, PROCESS_KILL_SUCCESS_CHECK_RETRIES_DEFAULT)));
miNiFiCommandSender = new MiNiFiCommandSender(miNiFiParameters, objectMapper);
MiNiFiStatusProvider miNiFiStatusProvider = new MiNiFiStatusProvider(miNiFiCommandSender, processUtils);
periodicStatusReporterManager =
new PeriodicStatusReporterManager(bootstrapFileProvider.getBootstrapProperties(), miNiFiStatusProvider, miNiFiCommandSender, miNiFiParameters);
new PeriodicStatusReporterManager(bootstrapProperties, miNiFiStatusProvider, miNiFiCommandSender, miNiFiParameters);
MiNiFiConfigurationChangeListener configurationChangeListener = new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider);
configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider, this, singleton(configurationChangeListener));

View File

@ -45,7 +45,7 @@ public class StopRunner implements CommandRunner {
private final ProcessUtils processUtils;
public StopRunner(BootstrapFileProvider bootstrapFileProvider, MiNiFiParameters miNiFiParameters, MiNiFiCommandSender miNiFiCommandSender,
CurrentPortProvider currentPortProvider, GracefulShutdownParameterProvider gracefulShutdownParameterProvider, ProcessUtils processUtils) {
CurrentPortProvider currentPortProvider, GracefulShutdownParameterProvider gracefulShutdownParameterProvider, ProcessUtils processUtils) {
this.bootstrapFileProvider = bootstrapFileProvider;
this.miNiFiParameters = miNiFiParameters;
this.miNiFiCommandSender = miNiFiCommandSender;
@ -56,6 +56,7 @@ public class StopRunner implements CommandRunner {
/**
* Shutdown the MiNiFi and the managing bootstrap process as well.
*
* @param args the input arguments
* @return status code
*/
@ -94,6 +95,7 @@ public class StopRunner implements CommandRunner {
status = ERROR.getStatusCode();
}
} catch (IOException e) {
CMD_LOGGER.warn("An error has occurred while stopping MiNiFi. Force killing process with pid=" + minifiPid, e);
killProcessTree(minifiPid);
} finally {
if (lockFile.exists() && !lockFile.delete()) {

View File

@ -37,6 +37,12 @@ import org.slf4j.LoggerFactory;
public class UnixProcessUtils implements ProcessUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(UnixProcessUtils.class);
private final int processKillCheckRetries;
public UnixProcessUtils(int processKillCheckRetries) {
this.processKillCheckRetries = processKillCheckRetries;
}
@Override
public boolean isProcessRunning(Long pid) {
if (pid == null) {
@ -55,8 +61,8 @@ public class UnixProcessUtils implements ProcessUtils {
boolean running = false;
String line;
try (InputStream in = proc.getInputStream();
Reader streamReader = new InputStreamReader(in);
BufferedReader reader = new BufferedReader(streamReader)) {
Reader streamReader = new InputStreamReader(in);
BufferedReader reader = new BufferedReader(streamReader)) {
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith(pidString)) {
@ -113,14 +119,28 @@ public class UnixProcessUtils implements ProcessUtils {
killProcessTree(childPid);
}
Runtime.getRuntime().exec(new String[]{"kill", "-9", String.valueOf(pid)});
Runtime.getRuntime().exec(new String[] {"kill", "-9", String.valueOf(pid)});
int retries = processKillCheckRetries;
while (isProcessRunning(pid)) {
if (retries == 0) {
throw new IOException("Failed to stop process. Process is still running after killing attempt with pid=" + pid);
}
LOGGER.warn("Process is still running after killing attempt with pid=" + pid);
retries--;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
DEFAULT_LOGGER.warn("Thread interrupted while waiting for killing process with pid=" + pid);
}
}
}
private List<Long> getChildProcesses(Long ppid) throws IOException {
Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", String.valueOf(ppid)});
Process proc = Runtime.getRuntime().exec(new String[] {"ps", "-o", "pid", "--no-headers", "--ppid", String.valueOf(ppid)});
List<Long> childPids = new ArrayList<>();
try (InputStream in = proc.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {

View File

@ -326,23 +326,23 @@ run() {
fi
if [ "$1" = "run" ]; then
# Use exec to handover PID to RunMiNiFi java process, instead of foking it as a child process
# Use exec to handover PID to RunMiNiFi java process, instead of forking it as a child process
RUN_MINIFI_CMD="exec ${RUN_MINIFI_CMD}"
fi
# run 'start' in the background because the process will continue to run, monitoring MiNiFi.
# all other commands will terminate quickly so want to just wait for them
if [ "$1" = "start" ]; then
if [ "$1" = "start" ] || [ "$1" = "restart" ]; then
(eval "cd ${MINIFI_HOME} && ${RUN_MINIFI_CMD}" &)
else
eval "cd ${MINIFI_HOME} && ${RUN_MINIFI_CMD}"
fi
EXIT_STATUS=$?
# Wait just a bit (3 secs) to wait for the logging to finish and then echo a new-line.
# Wait just a bit (5 secs) to wait for the logging to finish and then echo a new-line.
# We do this to avoid having logs spewed on the console after running the command and then not giving
# control back to the user
sleep 3
sleep 5
echo
}