Make flush wait to completion (elastic/elasticsearch#875)

Flush has the contract that when it is done results are up-to-date.
Thus, it adds no value to have it timeout. In most cases, the request
should be pretty responsive apart from when it advances time forward.
In the latter scenario, it could force results to be calculated for a
long period of time which could take long. The one use case for this
is the datafeeds and there is no issue with waiting flush to finish.

This PR changes flush to always wait to completion. However, it adds
checking that the c++ process is alive every second, to avoid long
waits in vain when something has gone horribly wrong.

Fixes elastic/elasticsearch#826

Original commit: elastic/x-pack-elasticsearch@de421ab843
This commit is contained in:
Dimitris Athanasiou 2017-02-07 14:28:01 +00:00 committed by GitHub
parent 0c64c22883
commit 678ae53596
7 changed files with 53 additions and 37 deletions

View File

@ -19,7 +19,6 @@ public final class Messages {
*/
private static final String BUNDLE_NAME = "org.elasticsearch.xpack.ml.job.messages.ml_messages";
public static final String AUTODETECT_FLUSH_UNEXPTECTED_DEATH = "autodetect.flush.failed.unexpected.death";
public static final String AUTODETECT_FLUSH_TIMEOUT = "autodetect.flush.timeout";
public static final String CPU_LIMIT_JOB = "cpu.limit.jobs";

View File

@ -39,7 +39,7 @@ import java.util.function.Supplier;
public class AutodetectCommunicator implements Closeable {
private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class);
private static final int DEFAULT_TRY_TIMEOUT_SECS = 30;
private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1);
private final Job job;
private final DataCountsReporter dataCountsReporter;
@ -100,30 +100,33 @@ public class AutodetectCommunicator implements Closeable {
}
public void flushJob(InterimResultsParams params) throws IOException {
flushJob(params, DEFAULT_TRY_TIMEOUT_SECS);
}
void flushJob(InterimResultsParams params, int tryTimeoutSecs) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, job.getId()), () -> {
String flushId = autodetectProcess.flushJob(params);
Duration timeout = Duration.ofSeconds(tryTimeoutSecs);
LOGGER.info("[{}] waiting for flush", job.getId());
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout);
LOGGER.info("[{}] isFlushComplete={}", job.getId(), isFlushComplete);
if (!isFlushComplete) {
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, job.getId()) + " " + autodetectProcess.readError();
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg);
}
// We also have to wait for the normalizer to become idle so that we block
// clients from querying results in the middle of normalization.
autoDetectResultProcessor.waitUntilRenormalizerIsIdle();
waitFlushToCompletion(flushId);
return null;
}, false);
}
private void waitFlushToCompletion(String flushId) throws IOException {
LOGGER.info("[{}] waiting for flush", job.getId());
try {
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
while (isFlushComplete == false) {
checkProcessIsAlive();
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
}
} finally {
autoDetectResultProcessor.clearAwaitingFlush(flushId);
}
// We also have to wait for the normalizer to become idle so that we block
// clients from querying results in the middle of normalization.
autoDetectResultProcessor.waitUntilRenormalizerIsIdle();
LOGGER.info("[{}] Flush completed", job.getId());
}
/**
* Throws an exception if the process has exited
*/

View File

@ -185,7 +185,11 @@ public class AutoDetectResultProcessor {
* @return {@code true} if the flush has completed or the parsing finished; {@code false} if the timeout expired
*/
public boolean waitForFlushAcknowledgement(String flushId, Duration timeout) {
return flushListener.waitForFlush(flushId, timeout.toMillis());
return flushListener.waitForFlush(flushId, timeout);
}
public void clearAwaitingFlush(String flushId) {
flushListener.clear(flushId);
}
public void waitUntilRenormalizerIsIdle() {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -17,20 +18,17 @@ class FlushListener {
final ConcurrentMap<String, CountDownLatch> awaitingFlushed = new ConcurrentHashMap<>();
final AtomicBoolean cleared = new AtomicBoolean(false);
boolean waitForFlush(String flushId, long timeout) {
boolean waitForFlush(String flushId, Duration timeout) {
if (cleared.get()) {
return false;
}
CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1));
try {
return latch.await(timeout, TimeUnit.MILLISECONDS);
return latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} finally {
// the flush id will no longer be used from this point, so we can remove it.
awaitingFlushed.remove(flushId);
}
}
@ -42,6 +40,10 @@ class FlushListener {
latch.countDown();
}
void clear(String flushId) {
awaitingFlushed.remove(flushId);
}
void clear() {
if (cleared.compareAndSet(false, true)) {
Iterator<ConcurrentMap.Entry<String, CountDownLatch>> latches = awaitingFlushed.entrySet().iterator();
@ -51,5 +53,4 @@ class FlushListener {
}
}
}
}

View File

@ -1,7 +1,6 @@
# Machine Learning API messages
autodetect.flush.timeout =[{0}] Timed out flushing job.
autodetect.flush.failed.unexpected.death =[{0}] Flush failed: Unexpected death of the Autodetect process flushing job.
cpu.limit.jobs = Cannot start job with id ''{0}''. The maximum number of concurrently running jobs is limited as a function of the number of CPU cores see this error code''s help documentation for details of how to elevate the setting

View File

@ -22,6 +22,7 @@ import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
@ -31,6 +32,7 @@ import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -78,17 +80,21 @@ public class AutodetectCommunicatorTests extends ESTestCase {
assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", e.getMessage());
}
public void testFlushJob_throwsOnTimeout() throws IOException {
public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
when(process.readError()).thenReturn("Mock process has stalled");
AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class);
when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(false);
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
InterimResultsParams params = InterimResultsParams.builder().build();
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params, 1));
assertEquals("[foo] Timed out flushing job. Mock process has stalled", e.getMessage());
when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))))
.thenReturn(false).thenReturn(true);
InterimResultsParams params = InterimResultsParams.builder().build();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) {
communicator.flushJob(params);
}
verify(autoDetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)));
// First in checkAndRun, second due to check between calls to waitForFlushAcknowledgement and third due to close()
verify(process, times(3)).isProcessAlive();
}
public void testClose() throws IOException {

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.test.ESTestCase;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -17,13 +18,16 @@ public class FlushListenerTests extends ESTestCase {
FlushListener listener = new FlushListener();
AtomicBoolean bool = new AtomicBoolean();
new Thread(() -> {
boolean result = listener.waitForFlush("_id", 10000);
boolean result = listener.waitForFlush("_id", Duration.ofMillis(10000));
bool.set(result);
}).start();
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
assertFalse(bool.get());
listener.acknowledgeFlush("_id");
assertBusy(() -> assertTrue(bool.get()));
assertEquals(1, listener.awaitingFlushed.size());
listener.clear("_id");
assertEquals(0, listener.awaitingFlushed.size());
}
@ -37,7 +41,7 @@ public class FlushListenerTests extends ESTestCase {
AtomicBoolean bool = new AtomicBoolean();
bools.add(bool);
new Thread(() -> {
boolean result = listener.waitForFlush(String.valueOf(id), 10000);
boolean result = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
bool.set(result);
}).start();
}