From 678ae535962bbc49a76a464c683e116d1af21215 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 7 Feb 2017 14:28:01 +0000 Subject: [PATCH] Make flush wait to completion (elastic/elasticsearch#875) Flush has the contract that when it is done results are up-to-date. Thus, it adds no value to have it timeout. In most cases, the request should be pretty responsive apart from when it advances time forward. In the latter scenario, it could force results to be calculated for a long period of time which could take long. The one use case for this is the datafeeds and there is no issue with waiting flush to finish. This PR changes flush to always wait to completion. However, it adds checking that the c++ process is alive every second, to avoid long waits in vain when something has gone horribly wrong. Fixes elastic/elasticsearch#826 Original commit: elastic/x-pack-elasticsearch@de421ab843d7439496076220b650a89c50cc12c0 --- .../xpack/ml/job/messages/Messages.java | 1 - .../autodetect/AutodetectCommunicator.java | 41 ++++++++++--------- .../output/AutoDetectResultProcessor.java | 6 ++- .../autodetect/output/FlushListener.java | 13 +++--- .../ml/job/messages/ml_messages.properties | 1 - .../AutodetectCommunicatorTests.java | 20 +++++---- .../autodetect/output/FlushListenerTests.java | 8 +++- 7 files changed, 53 insertions(+), 37 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index 2313962610d..514c5c34346 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -19,7 +19,6 @@ public final class Messages { */ private static final String BUNDLE_NAME = "org.elasticsearch.xpack.ml.job.messages.ml_messages"; public static final String AUTODETECT_FLUSH_UNEXPTECTED_DEATH = "autodetect.flush.failed.unexpected.death"; - public static final String AUTODETECT_FLUSH_TIMEOUT = "autodetect.flush.timeout"; public static final String CPU_LIMIT_JOB = "cpu.limit.jobs"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 1bdccc6b548..b44bfc9d987 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -39,7 +39,7 @@ import java.util.function.Supplier; public class AutodetectCommunicator implements Closeable { private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class); - private static final int DEFAULT_TRY_TIMEOUT_SECS = 30; + private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1); private final Job job; private final DataCountsReporter dataCountsReporter; @@ -100,30 +100,33 @@ public class AutodetectCommunicator implements Closeable { } public void flushJob(InterimResultsParams params) throws IOException { - flushJob(params, DEFAULT_TRY_TIMEOUT_SECS); - } - - void flushJob(InterimResultsParams params, int tryTimeoutSecs) throws IOException { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, job.getId()), () -> { String flushId = autodetectProcess.flushJob(params); - - Duration timeout = Duration.ofSeconds(tryTimeoutSecs); - LOGGER.info("[{}] waiting for flush", job.getId()); - boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout); - LOGGER.info("[{}] isFlushComplete={}", job.getId(), isFlushComplete); - if (!isFlushComplete) { - String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, job.getId()) + " " + autodetectProcess.readError(); - LOGGER.error(msg); - throw ExceptionsHelper.serverError(msg); - } - - // We also have to wait for the normalizer to become idle so that we block - // clients from querying results in the middle of normalization. - autoDetectResultProcessor.waitUntilRenormalizerIsIdle(); + waitFlushToCompletion(flushId); return null; }, false); } + private void waitFlushToCompletion(String flushId) throws IOException { + LOGGER.info("[{}] waiting for flush", job.getId()); + + try { + boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); + while (isFlushComplete == false) { + checkProcessIsAlive(); + isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); + } + } finally { + autoDetectResultProcessor.clearAwaitingFlush(flushId); + } + + // We also have to wait for the normalizer to become idle so that we block + // clients from querying results in the middle of normalization. + autoDetectResultProcessor.waitUntilRenormalizerIsIdle(); + + LOGGER.info("[{}] Flush completed", job.getId()); + } + /** * Throws an exception if the process has exited */ diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 7cc63c97086..472154d4916 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -185,7 +185,11 @@ public class AutoDetectResultProcessor { * @return {@code true} if the flush has completed or the parsing finished; {@code false} if the timeout expired */ public boolean waitForFlushAcknowledgement(String flushId, Duration timeout) { - return flushListener.waitForFlush(flushId, timeout.toMillis()); + return flushListener.waitForFlush(flushId, timeout); + } + + public void clearAwaitingFlush(String flushId) { + flushListener.clear(flushId); } public void waitUntilRenormalizerIsIdle() { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java index 6b8745ae83e..a268669978e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.output; +import java.time.Duration; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -17,20 +18,17 @@ class FlushListener { final ConcurrentMap awaitingFlushed = new ConcurrentHashMap<>(); final AtomicBoolean cleared = new AtomicBoolean(false); - boolean waitForFlush(String flushId, long timeout) { + boolean waitForFlush(String flushId, Duration timeout) { if (cleared.get()) { return false; } CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1)); try { - return latch.await(timeout, TimeUnit.MILLISECONDS); + return latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; - } finally { - // the flush id will no longer be used from this point, so we can remove it. - awaitingFlushed.remove(flushId); } } @@ -42,6 +40,10 @@ class FlushListener { latch.countDown(); } + void clear(String flushId) { + awaitingFlushed.remove(flushId); + } + void clear() { if (cleared.compareAndSet(false, true)) { Iterator> latches = awaitingFlushed.entrySet().iterator(); @@ -51,5 +53,4 @@ class FlushListener { } } } - } diff --git a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties index 17f9cce2ce0..d71623103a6 100644 --- a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties +++ b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties @@ -1,7 +1,6 @@ # Machine Learning API messages -autodetect.flush.timeout =[{0}] Timed out flushing job. autodetect.flush.failed.unexpected.death =[{0}] Flush failed: Unexpected death of the Autodetect process flushing job. cpu.limit.jobs = Cannot start job with id ''{0}''. The maximum number of concurrently running jobs is limited as a function of the number of CPU cores see this error code''s help documentation for details of how to elevate the setting diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 3b66bb4fac2..0cf0bffa400 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -22,6 +22,7 @@ import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.time.Duration; import java.util.Collections; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -31,6 +32,7 @@ import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -78,17 +80,21 @@ public class AutodetectCommunicatorTests extends ESTestCase { assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", e.getMessage()); } - public void testFlushJob_throwsOnTimeout() throws IOException { + public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); - when(process.readError()).thenReturn("Mock process has stalled"); AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class); - when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(false); - try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { - InterimResultsParams params = InterimResultsParams.builder().build(); - ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params, 1)); - assertEquals("[foo] Timed out flushing job. Mock process has stalled", e.getMessage()); + when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) + .thenReturn(false).thenReturn(true); + InterimResultsParams params = InterimResultsParams.builder().build(); + + try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) { + communicator.flushJob(params); } + + verify(autoDetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))); + // First in checkAndRun, second due to check between calls to waitForFlushAcknowledgement and third due to close() + verify(process, times(3)).isProcessAlive(); } public void testClose() throws IOException { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java index d8bf838c020..d1ecb4f1c0a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import org.elasticsearch.test.ESTestCase; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -17,13 +18,16 @@ public class FlushListenerTests extends ESTestCase { FlushListener listener = new FlushListener(); AtomicBoolean bool = new AtomicBoolean(); new Thread(() -> { - boolean result = listener.waitForFlush("_id", 10000); + boolean result = listener.waitForFlush("_id", Duration.ofMillis(10000)); bool.set(result); }).start(); assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id"))); assertFalse(bool.get()); listener.acknowledgeFlush("_id"); assertBusy(() -> assertTrue(bool.get())); + assertEquals(1, listener.awaitingFlushed.size()); + + listener.clear("_id"); assertEquals(0, listener.awaitingFlushed.size()); } @@ -37,7 +41,7 @@ public class FlushListenerTests extends ESTestCase { AtomicBoolean bool = new AtomicBoolean(); bools.add(bool); new Thread(() -> { - boolean result = listener.waitForFlush(String.valueOf(id), 10000); + boolean result = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000)); bool.set(result); }).start(); }