diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java index 381787512d..d925fa3164 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java @@ -87,6 +87,22 @@ public class BootstrapCodec { writer.flush(); } break; + case "STARTED": { + if (args.length != 1) { + throw new InvalidCommandException("STARTED command must contain a status argument"); + } + + if (!"true".equals(args[0]) && !"false".equals(args[0])) { + throw new InvalidCommandException("Invalid status for STARTED command; should be true or false, but was '" + args[0] + "'"); + } + + final boolean started = Boolean.parseBoolean(args[0]); + runner.setNiFiStarted(started); + writer.write("OK"); + writer.newLine(); + writer.flush(); + } + break; } } } diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index c2f4feb988..636fd4a9b9 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -92,7 +92,9 @@ public class RunNiFi { private volatile long nifiPid = -1L; private volatile String secretKey; private volatile ShutdownHook shutdownHook; + private volatile boolean nifiStarted; + private final Lock startedLock = new ReentrantLock(); private final Lock lock = new ReentrantLock(); private final Condition startupCondition = lock.newCondition(); @@ -799,10 +801,18 @@ public class RunNiFi { if (autoRestartNiFi) { final File statusFile = getStatusFile(defaultLogger); if (!statusFile.exists()) { - defaultLogger.debug("Status File no longer exists. Will not restart NiFi"); + defaultLogger.info("Status File no longer exists. Will not restart NiFi"); return; } + final boolean previouslyStarted = getNifiStarted(); + if (!previouslyStarted) { + defaultLogger.info("NiFi never started. Will not restart NiFi"); + return; + } else { + setNiFiStarted(false); + } + defaultLogger.warn("Apache NiFi appears to have died. Restarting..."); process = builder.start(); handleLogging(process); @@ -973,6 +983,24 @@ public class RunNiFi { return this.ccPort; } + void setNiFiStarted(final boolean nifiStarted) { + startedLock.lock(); + try { + this.nifiStarted = nifiStarted; + } finally { + startedLock.unlock(); + } + } + + boolean getNifiStarted() { + startedLock.lock(); + try { + return nifiStarted; + } finally { + startedLock.unlock(); + } + } + private static class Status { private final Integer port; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java index c1bdf97846..373212af02 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -80,13 +80,37 @@ public class BootstrapListener { listenThread.start(); logger.debug("Notifying Bootstrap that local port is {}", localPort); + sendCommand("PORT", new String[] { String.valueOf(localPort), secretKey}); + } + + public void stop() { + if (listener != null) { + listener.stop(); + } + } + + public void sendStartedStatus(boolean status) throws IOException { + logger.debug("Notifying Bootstrap that the status of starting NiFi is {}", status); + sendCommand("STARTED", new String[]{ String.valueOf(status) }); + } + + private void sendCommand(final String command, final String[] args) throws IOException { try (final Socket socket = new Socket()) { socket.setSoTimeout(60000); socket.connect(new InetSocketAddress("localhost", bootstrapPort)); socket.setSoTimeout(60000); + final StringBuilder commandBuilder = new StringBuilder(command); + for (final String arg : args) { + commandBuilder.append(" ").append(arg); + } + commandBuilder.append("\n"); + + final String commandWithArgs = commandBuilder.toString(); + logger.debug("Sending command to Bootstrap: " + commandWithArgs); + final OutputStream out = socket.getOutputStream(); - out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.write((commandWithArgs).getBytes(StandardCharsets.UTF_8)); out.flush(); logger.debug("Awaiting response from Bootstrap..."); @@ -100,12 +124,6 @@ public class BootstrapListener { } } - public void stop() { - if (listener != null) { - listener.stop(); - } - } - private class Listener implements Runnable { private final ServerSocket serverSocket; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java index ef2377f25b..ae4cf40caa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java @@ -136,6 +136,10 @@ public class NiFi { } else { nifiServer.start(); + if (bootstrapListener != null) { + bootstrapListener.sendStartedStatus(true); + } + final long endTime = System.nanoTime(); logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds."); }