[ML] Catch exceptions in AutoDetectResultProcessor#process and continue (elastic/x-pack-elasticsearch#1484)

Original commit: elastic/x-pack-elasticsearch@f1f6a322e0
This commit is contained in:
David Kyle 2017-05-18 18:40:41 +01:00 committed by GitHub
parent 489a4eb685
commit e5810f894c
4 changed files with 65 additions and 10 deletions

View File

@ -90,21 +90,38 @@ public class AutoDetectResultProcessor {
public void process(AutodetectProcess process, boolean isPerPartitionNormalization) { public void process(AutodetectProcess process, boolean isPerPartitionNormalization) {
Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId)); Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId));
// If a function call in this throws for some reason we don't want it
// to kill the results reader thread as autodetect will be blocked
// trying to write its output.
try { try {
int bucketCount = 0; int bucketCount = 0;
Iterator<AutodetectResult> iterator = process.readAutodetectResults(); Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) { while (iterator.hasNext()) {
AutodetectResult result = iterator.next(); try {
processResult(context, result); AutodetectResult result = iterator.next();
if (result.getBucket() != null) { processResult(context, result);
bucketCount++; if (result.getBucket() != null) {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); bucketCount++;
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
}
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e);
} }
} }
context.bulkResultsPersister.executeRequest();
try {
context.bulkResultsPersister.executeRequest();
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
}
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
} catch (Exception e) { }
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e); catch (Exception e) {
// We should only get here if the iterator throws in which
// case parsing the autodetect output has failed.
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", jobId), e);
} finally { } finally {
try { try {
waitUntilRenormalizerIsIdle(); waitUntilRenormalizerIsIdle();
@ -208,7 +225,7 @@ public class AutoDetectResultProcessor {
updateModelSnapshotIdSemaphore.acquire(); updateModelSnapshotIdSemaphore.acquire();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOGGER.info("[{}] Interrupted acquiring update model snaphot semaphore", jobId); LOGGER.info("[{}] Interrupted acquiring update model snapshot semaphore", jobId);
return; return;
} }

View File

@ -35,7 +35,7 @@ public class AutodetectResultsParser extends AbstractComponent {
try { try {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, in); XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, in);
XContentParser.Token token = parser.nextToken(); XContentParser.Token token = parser.nextToken();
// if start of an array ignore it, we expect an array of buckets // if start of an array ignore it, we expect an array of results
if (token != XContentParser.Token.START_ARRAY) { if (token != XContentParser.Token.START_ARRAY) {
throw new ElasticsearchParseException("unexpected token [" + token + "]"); throw new ElasticsearchParseException("unexpected token [" + token + "]");
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.autodetect.output; package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction;
@ -34,6 +35,7 @@ import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -308,4 +310,22 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
assertEquals(0, processorUnderTest.completionLatch.getCount()); assertEquals(0, processorUnderTest.completionLatch.getCount());
assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits()); assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits());
} }
public void testPersisterThrowingDoesntBlockProcessing() {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
when(autodetectResult.getModelSizeStats()).thenReturn(modelSizeStats);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSizeStats(any());
processorUnderTest.process(process, randomBoolean());
verify(persister, times(2)).persistModelSizeStats(any());
}
} }

View File

@ -22,6 +22,12 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** /**
* Tests for parsing the JSON output of autodetect * Tests for parsing the JSON output of autodetect
*/ */
@ -399,4 +405,16 @@ public class AutodetectResultsParserTests extends ESTestCase {
assertEquals("unexpected token [START_ARRAY]", e.getMessage()); assertEquals("unexpected token [START_ARRAY]", e.getMessage());
} }
public void testConsumeAndCloseStream() throws IOException {
String json = "some string of data";
ByteArrayInputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser.consumeAndCloseStream(inputStream);
assertEquals(0, inputStream.available());
InputStream mockStream = mock(InputStream.class);
when(mockStream.read(any())).thenReturn(-1);
AutodetectResultsParser.consumeAndCloseStream(mockStream);
verify(mockStream, times(1)).close();
}
} }