Merge branch 'NIFI-842'

This commit is contained in:
Bryan Bende 2015-08-24 18:58:56 -04:00
commit 310347fd66
4 changed files with 74 additions and 8 deletions

View File

@ -87,6 +87,22 @@ public class BootstrapCodec {
writer.flush();
}
break;
case "STARTED": {
if (args.length != 1) {
throw new InvalidCommandException("STARTED command must contain a status argument");
}
if (!"true".equals(args[0]) && !"false".equals(args[0])) {
throw new InvalidCommandException("Invalid status for STARTED command; should be true or false, but was '" + args[0] + "'");
}
final boolean started = Boolean.parseBoolean(args[0]);
runner.setNiFiStarted(started);
writer.write("OK");
writer.newLine();
writer.flush();
}
break;
}
}
}

View File

@ -92,7 +92,9 @@ public class RunNiFi {
private volatile long nifiPid = -1L;
private volatile String secretKey;
private volatile ShutdownHook shutdownHook;
private volatile boolean nifiStarted;
private final Lock startedLock = new ReentrantLock();
private final Lock lock = new ReentrantLock();
private final Condition startupCondition = lock.newCondition();
@ -799,10 +801,18 @@ public class RunNiFi {
if (autoRestartNiFi) {
final File statusFile = getStatusFile(defaultLogger);
if (!statusFile.exists()) {
defaultLogger.debug("Status File no longer exists. Will not restart NiFi");
defaultLogger.info("Status File no longer exists. Will not restart NiFi");
return;
}
final boolean previouslyStarted = getNifiStarted();
if (!previouslyStarted) {
defaultLogger.info("NiFi never started. Will not restart NiFi");
return;
} else {
setNiFiStarted(false);
}
defaultLogger.warn("Apache NiFi appears to have died. Restarting...");
process = builder.start();
handleLogging(process);
@ -973,6 +983,24 @@ public class RunNiFi {
return this.ccPort;
}
void setNiFiStarted(final boolean nifiStarted) {
startedLock.lock();
try {
this.nifiStarted = nifiStarted;
} finally {
startedLock.unlock();
}
}
boolean getNifiStarted() {
startedLock.lock();
try {
return nifiStarted;
} finally {
startedLock.unlock();
}
}
private static class Status {
private final Integer port;

View File

@ -80,13 +80,37 @@ public class BootstrapListener {
listenThread.start();
logger.debug("Notifying Bootstrap that local port is {}", localPort);
sendCommand("PORT", new String[] { String.valueOf(localPort), secretKey});
}
public void stop() {
if (listener != null) {
listener.stop();
}
}
public void sendStartedStatus(boolean status) throws IOException {
logger.debug("Notifying Bootstrap that the status of starting NiFi is {}", status);
sendCommand("STARTED", new String[]{ String.valueOf(status) });
}
private void sendCommand(final String command, final String[] args) throws IOException {
try (final Socket socket = new Socket()) {
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", bootstrapPort));
socket.setSoTimeout(60000);
final StringBuilder commandBuilder = new StringBuilder(command);
for (final String arg : args) {
commandBuilder.append(" ").append(arg);
}
commandBuilder.append("\n");
final String commandWithArgs = commandBuilder.toString();
logger.debug("Sending command to Bootstrap: " + commandWithArgs);
final OutputStream out = socket.getOutputStream();
out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.write((commandWithArgs).getBytes(StandardCharsets.UTF_8));
out.flush();
logger.debug("Awaiting response from Bootstrap...");
@ -100,12 +124,6 @@ public class BootstrapListener {
}
}
public void stop() {
if (listener != null) {
listener.stop();
}
}
private class Listener implements Runnable {
private final ServerSocket serverSocket;

View File

@ -136,6 +136,10 @@ public class NiFi {
} else {
nifiServer.start();
if (bootstrapListener != null) {
bootstrapListener.sendStartedStatus(true);
}
final long endTime = System.nanoTime();
logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds.");
}