From f40a090073d62160302b849a46c72ef413905b15 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 20 May 2016 09:05:44 -0400 Subject: [PATCH] NIFI-1905 enabled ExecuteProcess to terminate process NIFI-1905 polishing NIFI-1905 changed WARN to INFO during shutdown This closes #456 --- .../processors/standard/ExecuteProcess.java | 32 +++++++++++++------ .../standard/TestExecuteProcess.java | 31 ++++++++++++++++++ 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java index 2af1f5383b..62860ed4a2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java @@ -132,6 +132,8 @@ public class ExecuteProcess extends AbstractProcessor { .description("All created FlowFiles are routed to this relationship") .build(); + private volatile Process externalProcess; + private volatile ExecutorService executor; private Future longRunningProcess; private AtomicBoolean failure = new AtomicBoolean(false); @@ -181,7 +183,14 @@ public class ExecuteProcess extends AbstractProcessor { @OnUnscheduled public void shutdownExecutor() { - executor.shutdown(); + try { + executor.shutdown(); + } finally { + if (this.externalProcess.isAlive()) { + this.getLogger().info("Process hasn't terminated, forcing the interrupt"); + this.externalProcess.destroyForcibly(); + } + } } @Override @@ -299,14 +308,14 @@ public class ExecuteProcess extends AbstractProcessor { } getLogger().info("Start creating new Process > {} ", new Object[] { commandStrings }); - final Process newProcess = builder.redirectErrorStream(redirectErrorStream).start(); + this.externalProcess = builder.redirectErrorStream(redirectErrorStream).start(); // Submit task to read error stream from process if (!redirectErrorStream) { executor.submit(new Runnable() { @Override public void run() { - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getErrorStream()))) { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(externalProcess.getErrorStream()))) { while (reader.read() >= 0) { } } catch (final IOException ioe) { @@ -324,7 +333,7 @@ public class ExecuteProcess extends AbstractProcessor { if (batchNanos == null) { // if we aren't batching, just copy the stream from the // process to the flowfile. - try (final BufferedInputStream bufferedIn = new BufferedInputStream(newProcess.getInputStream())) { + try (final BufferedInputStream bufferedIn = new BufferedInputStream(externalProcess.getInputStream())) { final byte[] buffer = new byte[4096]; int len; while ((len = bufferedIn.read(buffer)) > 0) { @@ -351,7 +360,7 @@ public class ExecuteProcess extends AbstractProcessor { // Also, we don't want that text to get split up in the // middle of a line, so we use BufferedReader // to read lines of text and write them as lines of text. - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(externalProcess.getInputStream()))) { String line; while ((line = reader.readLine()) != null) { @@ -367,13 +376,15 @@ public class ExecuteProcess extends AbstractProcessor { failure.set(true); throw ioe; } finally { - int exitCode; try { - exitCode = newProcess.exitValue(); - } catch (final Exception e) { - exitCode = -99999; + // Since we are going to exit anyway, one sec gives it an extra chance to exit gracefully. + // In the future consider exposing it via configuration. + boolean terminated = externalProcess.waitFor(1000, TimeUnit.MILLISECONDS); + int exitCode = terminated ? externalProcess.exitValue() : -9999; + getLogger().info("Process finished with exit code {} ", new Object[] { exitCode }); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); } - getLogger().info("Process finished with exit code {} ", new Object[] { exitCode }); } return null; @@ -412,6 +423,7 @@ public class ExecuteProcess extends AbstractProcessor { try { Thread.sleep(millis); } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java index c91d24df18..160bbdbe42 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java @@ -18,11 +18,15 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; +import java.lang.reflect.Field; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.ArgumentUtils; @@ -97,6 +101,33 @@ public class TestExecuteProcess { } } + @Test + public void validateProcessInterruptOnStop() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class); + runner.setProperty(ExecuteProcess.COMMAND, "ping"); + runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "nifi.apache.org"); + runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis"); + + runner.run(); + Thread.sleep(500); + ExecuteProcess processor = (ExecuteProcess) runner.getProcessor(); + try { + Field executorF = ExecuteProcess.class.getDeclaredField("executor"); + executorF.setAccessible(true); + ExecutorService executor = (ExecutorService) executorF.get(processor); + assertTrue(executor.isShutdown()); + assertTrue(executor.isTerminated()); + + Field processF = ExecuteProcess.class.getDeclaredField("externalProcess"); + processF.setAccessible(true); + Process process = (Process) processF.get(processor); + assertFalse(process.isAlive()); + } catch (Exception e) { + fail(); + } + + } + // @Test public void testBigBinaryInputData() { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");