From 1465711762de026cbfc573e386972247a4fab479 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 4 Apr 2017 12:04:01 +0100 Subject: [PATCH] [ML] Wait for state processing to complete before log processing (elastic/x-pack-elasticsearch#946) State processing can take a lot longer than log processing, even after the C++ process has closed its end of the pipe. The pipe has a buffer, and indexing the state document(s) in that buffer can take more than a second. relates elastic/x-pack-elasticsearch#945 Original commit: elastic/x-pack-elasticsearch@65f5075028622ff361e4054f0b5380367445b349 --- .../autodetect/NativeAutodetectProcess.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index d2b9c9c68f0..ed594403f5b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -134,11 +134,15 @@ class NativeAutodetectProcess implements AutodetectProcess { processCloseInitiated = true; // closing its input causes the process to exit processInStream.close(); - // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger - // this may take a long time as it persists the model state - logTailFuture.get(30, TimeUnit.MINUTES); - // the state processor should have stopped by now as the process should have exit - stateProcessorFuture.get(1, TimeUnit.SECONDS); + // wait for the process to exit by waiting for end-of-file on the named pipe connected + // to the state processor - it may take a long time for all the model state to be + // indexed + stateProcessorFuture.get(30, TimeUnit.MINUTES); + // the log processor should have stopped by now too - assume processing the logs will + // take no more than 5 seconds longer than processing the state (usually it should + // finish first) + logTailFuture.get(5, TimeUnit.SECONDS); + if (cppLogHandler.seenFatalError()) { throw ExceptionsHelper.serverError(cppLogHandler.getErrors()); }