[ML] Don’t wait on flush ack if results parser has failed (elastic/x-pack-elasticsearch#1540)
Original commit: elastic/x-pack-elasticsearch@f1a82ae315
This commit is contained in:
parent
6befa83337
commit
ce25e1f4f3
|
@ -40,7 +40,6 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -178,13 +177,14 @@ public class AutodetectCommunicator implements Closeable {
|
||||||
}, handler);
|
}, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitFlushToCompletion(String flushId) {
|
void waitFlushToCompletion(String flushId) {
|
||||||
LOGGER.debug("[{}] waiting for flush", job.getId());
|
LOGGER.debug("[{}] waiting for flush", job.getId());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
|
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
|
||||||
while (isFlushComplete == false) {
|
while (isFlushComplete == false) {
|
||||||
checkProcessIsAlive();
|
checkProcessIsAlive();
|
||||||
|
checkResultsProcessorIsAlive();
|
||||||
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
|
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -212,6 +212,15 @@ public class AutodetectCommunicator implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkResultsProcessorIsAlive() {
|
||||||
|
if (autoDetectResultProcessor.isFailed()) {
|
||||||
|
ParameterizedMessage message =
|
||||||
|
new ParameterizedMessage("[{}] Unexpected death of the result processor", job.getId());
|
||||||
|
LOGGER.error(message);
|
||||||
|
throw new ElasticsearchException(message.getFormattedMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public JobTask getJobTask() {
|
public JobTask getJobTask() {
|
||||||
return jobTask;
|
return jobTask;
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,7 +190,8 @@ public class AutodetectProcessManager extends AbstractComponent {
|
||||||
if (communicator == null) {
|
if (communicator == null) {
|
||||||
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
|
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
|
||||||
logger.debug(message);
|
logger.debug(message);
|
||||||
throw ExceptionsHelper.conflictStatusException(message);
|
handler.accept(ExceptionsHelper.conflictStatusException(message));
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
communicator.flushJob(params, (aVoid, e) -> {
|
communicator.flushJob(params, (aVoid, e) -> {
|
||||||
|
|
|
@ -67,6 +67,7 @@ public class AutoDetectResultProcessor {
|
||||||
final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1);
|
final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1);
|
||||||
private final FlushListener flushListener;
|
private final FlushListener flushListener;
|
||||||
private volatile boolean processKilled;
|
private volatile boolean processKilled;
|
||||||
|
private volatile boolean failed;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* New model size stats are read as the process is running
|
* New model size stats are read as the process is running
|
||||||
|
@ -124,6 +125,8 @@ public class AutoDetectResultProcessor {
|
||||||
|
|
||||||
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
|
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
failed = true;
|
||||||
|
|
||||||
if (processKilled) {
|
if (processKilled) {
|
||||||
// Don't log the stack trace in this case. Log just enough to hint
|
// Don't log the stack trace in this case. Log just enough to hint
|
||||||
// that it would have been better to close jobs before shutting down,
|
// that it would have been better to close jobs before shutting down,
|
||||||
|
@ -288,6 +291,9 @@ public class AutoDetectResultProcessor {
|
||||||
* @return {@code true} if the flush has completed or the parsing finished; {@code false} if the timeout expired
|
* @return {@code true} if the flush has completed or the parsing finished; {@code false} if the timeout expired
|
||||||
*/
|
*/
|
||||||
public boolean waitForFlushAcknowledgement(String flushId, Duration timeout) {
|
public boolean waitForFlushAcknowledgement(String flushId, Duration timeout) {
|
||||||
|
if (failed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return flushListener.waitForFlush(flushId, timeout);
|
return flushListener.waitForFlush(flushId, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,6 +305,14 @@ public class AutoDetectResultProcessor {
|
||||||
renormalizer.waitUntilIdle();
|
renormalizer.waitUntilIdle();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If failed then there was an error parsing the results that cannot be recovered from
|
||||||
|
* @return true if failed
|
||||||
|
*/
|
||||||
|
public boolean isFailed() {
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
|
||||||
static class Context {
|
static class Context {
|
||||||
|
|
||||||
private final String jobId;
|
private final String jobId;
|
||||||
|
|
|
@ -65,6 +65,16 @@ public class AutodetectCommunicatorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testWaitForFlushReturnsIfParserFails() throws IOException {
|
||||||
|
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||||
|
when(process.isProcessAlive()).thenReturn(true);
|
||||||
|
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
|
||||||
|
when(processor.isFailed()).thenReturn(true);
|
||||||
|
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(false);
|
||||||
|
AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor);
|
||||||
|
expectThrows(ElasticsearchException.class, () -> communicator.waitFlushToCompletion("foo"));
|
||||||
|
}
|
||||||
|
|
||||||
public void testFlushJob_throwsIfProcessIsDead() throws IOException {
|
public void testFlushJob_throwsIfProcessIsDead() throws IOException {
|
||||||
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||||
when(process.isProcessAlive()).thenReturn(false);
|
when(process.isProcessAlive()).thenReturn(false);
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
|
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
|
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
|
||||||
|
@ -25,6 +26,8 @@ import org.elasticsearch.xpack.ml.job.results.ModelPlot;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.mockito.InOrder;
|
import org.mockito.InOrder;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -327,6 +330,21 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
||||||
verify(persister, times(2)).persistModelSizeStats(any());
|
verify(persister, times(2)).persistModelSizeStats(any());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testParsingErrorSetsFailed() {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Iterator<AutodetectResult> iterator = mock(Iterator.class);
|
||||||
|
when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws"));
|
||||||
|
AutodetectProcess process = mock(AutodetectProcess.class);
|
||||||
|
when(process.readAutodetectResults()).thenReturn(iterator);
|
||||||
|
|
||||||
|
assertFalse(processorUnderTest.isFailed());
|
||||||
|
processorUnderTest.process(process);
|
||||||
|
assertTrue(processorUnderTest.isFailed());
|
||||||
|
|
||||||
|
// Wait for flush should return immediately
|
||||||
|
assertFalse(processorUnderTest.waitForFlushAcknowledgement("foo", Duration.of(300, ChronoUnit.SECONDS)));
|
||||||
|
}
|
||||||
|
|
||||||
public void testKill() throws TimeoutException {
|
public void testKill() throws TimeoutException {
|
||||||
AutodetectResult autodetectResult = mock(AutodetectResult.class);
|
AutodetectResult autodetectResult = mock(AutodetectResult.class);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
Loading…
Reference in New Issue