diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index ec3b0b927dd..b2de0836223 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -47,6 +47,8 @@ import java.util.concurrent.TimeoutException; class NativeAutodetectProcess implements AutodetectProcess { private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcess.class); + private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000); + private final String jobId; private final CppLogMessageHandler cppLogHandler; private final OutputStream processInStream; @@ -207,6 +209,10 @@ class NativeAutodetectProcess implements AutodetectProcess { // but if the wait times out it implies the process has only just started, in which // case it should die very quickly when we close its input stream. NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO)); + + // Wait for the process to die before closing processInStream as if the process + // is still alive when processInStream is closed autodetect will start persisting state + cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT); } catch (TimeoutException e) { LOGGER.warn("[{}] Failed to get PID of autodetect process to kill", jobId); } finally { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java index c43bb1afbfa..f6df58f385c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java @@ -51,8 +51,8 @@ public class CppLogMessageHandler implements Closeable { private final Deque errorStore; private final CountDownLatch pidLatch; private final CountDownLatch cppCopyrightLatch; + private final CountDownLatch logStreamClosedLatch; private MessageSummary lastMessageSummary = new MessageSummary(); - private volatile boolean hasLogStreamEnded; private volatile boolean seenFatalError; private volatile long pid; private volatile String cppCopyright; @@ -76,7 +76,7 @@ public class CppLogMessageHandler implements Closeable { errorStore = ConcurrentCollections.newDeque(); pidLatch = new CountDownLatch(1); cppCopyrightLatch = new CountDownLatch(1); - hasLogStreamEnded = false; + logStreamClosedLatch = new CountDownLatch(1); } @Override @@ -104,7 +104,7 @@ public class CppLogMessageHandler implements Closeable { readBuf = new byte[readBufSize]; } } finally { - hasLogStreamEnded = true; + logStreamClosedLatch.countDown(); // check if there is some leftover from log summarization if (lastMessageSummary.count > 0) { @@ -114,13 +114,22 @@ public class CppLogMessageHandler implements Closeable { } public boolean hasLogStreamEnded() { - return hasLogStreamEnded; + return logStreamClosedLatch.getCount() == 0; } public boolean seenFatalError() { return seenFatalError; } + public boolean waitForLogStreamClose(Duration timeout) { + try { + return logStreamClosedLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return false; + } + /** * Get the process ID of the C++ process whose log messages are being read. This will * arrive in the first log message logged by the C++ process. They all log their version diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java index 3cfcd5e32a1..96d9b66b15a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java @@ -179,6 +179,17 @@ public class CppLogMessageHandlerTests extends ESTestCase { executeLoggingTest(is, mockAppender, Level.DEBUG); } + public void testWaitForLogStreamClose() throws IOException { + InputStream is = new ByteArrayInputStream(String.join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, + TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DIFFERENT_MESSAGE).getBytes(StandardCharsets.UTF_8)); + + try (CppLogMessageHandler handler = new CppLogMessageHandler("test_throttling", is)) { + handler.tailStream(); + assertTrue(handler.waitForLogStreamClose(Duration.ofMillis(100))); + assertTrue(handler.hasLogStreamEnded()); + } + } + private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level) throws IOException { Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class); Loggers.addAppender(cppMessageLogger, mockAppender);