NIFI-187: Fixed bug that prevents nifi from shutting down properly when RemoteProcessGroup is present on graph

This commit is contained in:
Mark Payne 2014-12-19 16:03:08 -05:00
parent d850768765
commit a2fd2636d0
5 changed files with 30 additions and 5 deletions

View File

@ -47,6 +47,8 @@ public interface RemoteProcessGroup {
void setComments(String comments);
void shutdown();
/**
* Returns the name of this RemoteProcessGroup. The value returned will
* never be null. If unable to communicate with the remote instance, the URI

View File

@ -333,6 +333,10 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
for ( final RemoteProcessGroup rpg : procGroup.getRemoteProcessGroups() ) {
rpg.shutdown();
}
// Recursively shutdown child groups.
for (final ProcessGroup group : procGroup.getProcessGroups()) {
shutdown(group);

View File

@ -284,6 +284,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
@Override
public void shutdown() {
backgroundThreadExecutor.shutdown();
}
@Override
public String getIdentifier() {
return id;

View File

@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -80,6 +79,8 @@ public class RunNiFi {
private volatile boolean autoRestartNiFi = true;
private volatile int ccPort = -1;
private volatile long nifiPid = -1L;
private volatile String secretKey;
private volatile ShutdownHook shutdownHook;
private final Lock lock = new ReentrantLock();
private final Condition startupCondition = lock.newCondition();
@ -675,7 +676,7 @@ public class RunNiFi {
saveProperties(nifiProps);
}
ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds);
final Runtime runtime = Runtime.getRuntime();
runtime.addShutdownHook(shutdownHook);
@ -706,7 +707,7 @@ public class RunNiFi {
saveProperties(nifiProps);
}
shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds);
runtime.addShutdownHook(shutdownHook);
final boolean started = waitForStart();
@ -812,6 +813,12 @@ public class RunNiFi {
void setNiFiCommandControlPort(final int port, final String secretKey) {
this.ccPort = port;
this.secretKey = secretKey;
if ( shutdownHook != null ) {
shutdownHook.setSecretKey(secretKey);
}
final File statusFile = getStatusFile();
final Properties nifiProps = new Properties();

View File

@ -28,12 +28,19 @@ public class ShutdownHook extends Thread {
private final RunNiFi runner;
private final int gracefulShutdownSeconds;
public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final int gracefulShutdownSeconds) {
private volatile String secretKey;
public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds) {
this.nifiProcess = nifiProcess;
this.runner = runner;
this.secretKey = secretKey;
this.gracefulShutdownSeconds = gracefulShutdownSeconds;
}
void setSecretKey(final String secretKey) {
this.secretKey = secretKey;
}
@Override
public void run() {
runner.setAutoRestartNiFi(false);
@ -44,7 +51,7 @@ public class ShutdownHook extends Thread {
try {
final Socket socket = new Socket("localhost", ccPort);
final OutputStream out = socket.getOutputStream();
out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
out.write(("SHUTDOWN " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
socket.close();