[ML] Give kill a chance to stop autodetect before closing input (elastic/x-pack-elasticsearch#1824)

* Give kill a chance to kill the process before closing input

* Remove variable that can be refactored out

Original commit: elastic/x-pack-elasticsearch@42f7a3cece
This commit is contained in:
David Kyle 2017-06-26 10:13:06 +01:00
parent 27aa3094f6
commit 232d59b855
3 changed files with 30 additions and 4 deletions

View File

@ -47,6 +47,8 @@ import java.util.concurrent.TimeoutException;
class NativeAutodetectProcess implements AutodetectProcess { class NativeAutodetectProcess implements AutodetectProcess {
private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcess.class); private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcess.class);
private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000);
private final String jobId; private final String jobId;
private final CppLogMessageHandler cppLogHandler; private final CppLogMessageHandler cppLogHandler;
private final OutputStream processInStream; private final OutputStream processInStream;
@ -207,6 +209,10 @@ class NativeAutodetectProcess implements AutodetectProcess {
// but if the wait times out it implies the process has only just started, in which // but if the wait times out it implies the process has only just started, in which
// case it should die very quickly when we close its input stream. // case it should die very quickly when we close its input stream.
NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO)); NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO));
// Wait for the process to die before closing processInStream as if the process
// is still alive when processInStream is closed autodetect will start persisting state
cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
} catch (TimeoutException e) { } catch (TimeoutException e) {
LOGGER.warn("[{}] Failed to get PID of autodetect process to kill", jobId); LOGGER.warn("[{}] Failed to get PID of autodetect process to kill", jobId);
} finally { } finally {

View File

@ -51,8 +51,8 @@ public class CppLogMessageHandler implements Closeable {
private final Deque<String> errorStore; private final Deque<String> errorStore;
private final CountDownLatch pidLatch; private final CountDownLatch pidLatch;
private final CountDownLatch cppCopyrightLatch; private final CountDownLatch cppCopyrightLatch;
private final CountDownLatch logStreamClosedLatch;
private MessageSummary lastMessageSummary = new MessageSummary(); private MessageSummary lastMessageSummary = new MessageSummary();
private volatile boolean hasLogStreamEnded;
private volatile boolean seenFatalError; private volatile boolean seenFatalError;
private volatile long pid; private volatile long pid;
private volatile String cppCopyright; private volatile String cppCopyright;
@ -76,7 +76,7 @@ public class CppLogMessageHandler implements Closeable {
errorStore = ConcurrentCollections.newDeque(); errorStore = ConcurrentCollections.newDeque();
pidLatch = new CountDownLatch(1); pidLatch = new CountDownLatch(1);
cppCopyrightLatch = new CountDownLatch(1); cppCopyrightLatch = new CountDownLatch(1);
hasLogStreamEnded = false; logStreamClosedLatch = new CountDownLatch(1);
} }
@Override @Override
@ -104,7 +104,7 @@ public class CppLogMessageHandler implements Closeable {
readBuf = new byte[readBufSize]; readBuf = new byte[readBufSize];
} }
} finally { } finally {
hasLogStreamEnded = true; logStreamClosedLatch.countDown();
// check if there is some leftover from log summarization // check if there is some leftover from log summarization
if (lastMessageSummary.count > 0) { if (lastMessageSummary.count > 0) {
@ -114,13 +114,22 @@ public class CppLogMessageHandler implements Closeable {
} }
public boolean hasLogStreamEnded() { public boolean hasLogStreamEnded() {
return hasLogStreamEnded; return logStreamClosedLatch.getCount() == 0;
} }
public boolean seenFatalError() { public boolean seenFatalError() {
return seenFatalError; return seenFatalError;
} }
public boolean waitForLogStreamClose(Duration timeout) {
try {
return logStreamClosedLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
/** /**
* Get the process ID of the C++ process whose log messages are being read. This will * Get the process ID of the C++ process whose log messages are being read. This will
* arrive in the first log message logged by the C++ process. They all log their version * arrive in the first log message logged by the C++ process. They all log their version

View File

@ -179,6 +179,17 @@ public class CppLogMessageHandlerTests extends ESTestCase {
executeLoggingTest(is, mockAppender, Level.DEBUG); executeLoggingTest(is, mockAppender, Level.DEBUG);
} }
public void testWaitForLogStreamClose() throws IOException {
InputStream is = new ByteArrayInputStream(String.join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE,
TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DIFFERENT_MESSAGE).getBytes(StandardCharsets.UTF_8));
try (CppLogMessageHandler handler = new CppLogMessageHandler("test_throttling", is)) {
handler.tailStream();
assertTrue(handler.waitForLogStreamClose(Duration.ofMillis(100)));
assertTrue(handler.hasLogStreamEnded());
}
}
private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level) throws IOException { private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level) throws IOException {
Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class); Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class);
Loggers.addAppender(cppMessageLogger, mockAppender); Loggers.addAppender(cppMessageLogger, mockAppender);