NIFI-1905 enabled ExecuteProcess to terminate process

NIFI-1905 polishing

NIFI-1905 changed WARN to INFO during shutdown
This closes #456
This commit is contained in:
Oleg Zhurakousky 2016-05-20 09:05:44 -04:00
parent 4b74e4de74
commit f40a090073
2 changed files with 53 additions and 10 deletions

View File

@ -132,6 +132,8 @@ public class ExecuteProcess extends AbstractProcessor {
.description("All created FlowFiles are routed to this relationship") .description("All created FlowFiles are routed to this relationship")
.build(); .build();
private volatile Process externalProcess;
private volatile ExecutorService executor; private volatile ExecutorService executor;
private Future<?> longRunningProcess; private Future<?> longRunningProcess;
private AtomicBoolean failure = new AtomicBoolean(false); private AtomicBoolean failure = new AtomicBoolean(false);
@ -181,7 +183,14 @@ public class ExecuteProcess extends AbstractProcessor {
@OnUnscheduled @OnUnscheduled
public void shutdownExecutor() { public void shutdownExecutor() {
executor.shutdown(); try {
executor.shutdown();
} finally {
if (this.externalProcess.isAlive()) {
this.getLogger().info("Process hasn't terminated, forcing the interrupt");
this.externalProcess.destroyForcibly();
}
}
} }
@Override @Override
@ -299,14 +308,14 @@ public class ExecuteProcess extends AbstractProcessor {
} }
getLogger().info("Start creating new Process > {} ", new Object[] { commandStrings }); getLogger().info("Start creating new Process > {} ", new Object[] { commandStrings });
final Process newProcess = builder.redirectErrorStream(redirectErrorStream).start(); this.externalProcess = builder.redirectErrorStream(redirectErrorStream).start();
// Submit task to read error stream from process // Submit task to read error stream from process
if (!redirectErrorStream) { if (!redirectErrorStream) {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getErrorStream()))) { try (final BufferedReader reader = new BufferedReader(new InputStreamReader(externalProcess.getErrorStream()))) {
while (reader.read() >= 0) { while (reader.read() >= 0) {
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
@ -324,7 +333,7 @@ public class ExecuteProcess extends AbstractProcessor {
if (batchNanos == null) { if (batchNanos == null) {
// if we aren't batching, just copy the stream from the // if we aren't batching, just copy the stream from the
// process to the flowfile. // process to the flowfile.
try (final BufferedInputStream bufferedIn = new BufferedInputStream(newProcess.getInputStream())) { try (final BufferedInputStream bufferedIn = new BufferedInputStream(externalProcess.getInputStream())) {
final byte[] buffer = new byte[4096]; final byte[] buffer = new byte[4096];
int len; int len;
while ((len = bufferedIn.read(buffer)) > 0) { while ((len = bufferedIn.read(buffer)) > 0) {
@ -351,7 +360,7 @@ public class ExecuteProcess extends AbstractProcessor {
// Also, we don't want that text to get split up in the // Also, we don't want that text to get split up in the
// middle of a line, so we use BufferedReader // middle of a line, so we use BufferedReader
// to read lines of text and write them as lines of text. // to read lines of text and write them as lines of text.
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) { try (final BufferedReader reader = new BufferedReader(new InputStreamReader(externalProcess.getInputStream()))) {
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
@ -367,13 +376,15 @@ public class ExecuteProcess extends AbstractProcessor {
failure.set(true); failure.set(true);
throw ioe; throw ioe;
} finally { } finally {
int exitCode;
try { try {
exitCode = newProcess.exitValue(); // Since we are going to exit anyway, one sec gives it an extra chance to exit gracefully.
} catch (final Exception e) { // In the future consider exposing it via configuration.
exitCode = -99999; boolean terminated = externalProcess.waitFor(1000, TimeUnit.MILLISECONDS);
int exitCode = terminated ? externalProcess.exitValue() : -9999;
getLogger().info("Process finished with exit code {} ", new Object[] { exitCode });
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} }
getLogger().info("Process finished with exit code {} ", new Object[] { exitCode });
} }
return null; return null;
@ -412,6 +423,7 @@ public class ExecuteProcess extends AbstractProcessor {
try { try {
Thread.sleep(millis); Thread.sleep(millis);
} catch (final InterruptedException ie) { } catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
} }
} }

View File

@ -18,11 +18,15 @@ package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.lang.reflect.Field;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.ArgumentUtils; import org.apache.nifi.processors.standard.util.ArgumentUtils;
@ -97,6 +101,33 @@ public class TestExecuteProcess {
} }
} }
@Test
public void validateProcessInterruptOnStop() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
runner.setProperty(ExecuteProcess.COMMAND, "ping");
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "nifi.apache.org");
runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
runner.run();
Thread.sleep(500);
ExecuteProcess processor = (ExecuteProcess) runner.getProcessor();
try {
Field executorF = ExecuteProcess.class.getDeclaredField("executor");
executorF.setAccessible(true);
ExecutorService executor = (ExecutorService) executorF.get(processor);
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
Field processF = ExecuteProcess.class.getDeclaredField("externalProcess");
processF.setAccessible(true);
Process process = (Process) processF.get(processor);
assertFalse(process.isAlive());
} catch (Exception e) {
fail();
}
}
// @Test // @Test
public void testBigBinaryInputData() { public void testBigBinaryInputData() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");