From e5810f894c3622c67bc5141d4e8105b027c3e1cf Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 18 May 2017 18:40:41 +0100 Subject: [PATCH] [ML] Catch exceptions in AutoDetectResultProcessor#process and continue (elastic/x-pack-elasticsearch#1484) Original commit: elastic/x-pack-elasticsearch@f1f6a322e0c3debf9420b37ebe86dafc4bac9950 --- .../output/AutoDetectResultProcessor.java | 35 ++++++++++++++----- .../output/AutodetectResultsParser.java | 2 +- .../AutoDetectResultProcessorTests.java | 20 +++++++++++ .../output/AutodetectResultsParserTests.java | 18 ++++++++++ 4 files changed, 65 insertions(+), 10 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 977cec0e26a..bfbf008b049 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -90,21 +90,38 @@ public class AutoDetectResultProcessor { public void process(AutodetectProcess process, boolean isPerPartitionNormalization) { Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId)); + + // If a function call in this throws for some reason we don't want it + // to kill the results reader thread as autodetect will be blocked + // trying to write its output. try { int bucketCount = 0; Iterator iterator = process.readAutodetectResults(); while (iterator.hasNext()) { - AutodetectResult result = iterator.next(); - processResult(context, result); - if (result.getBucket() != null) { - bucketCount++; - LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); + try { + AutodetectResult result = iterator.next(); + processResult(context, result); + if (result.getBucket() != null) { + bucketCount++; + LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); + } + } catch (Exception e) { + LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); } } - context.bulkResultsPersister.executeRequest(); + + try { + context.bulkResultsPersister.executeRequest(); + } catch (Exception e) { + LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); + } + LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); - } catch (Exception e) { - LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e); + } + catch (Exception e) { + // We should only get here if the iterator throws in which + // case parsing the autodetect output has failed. + LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", jobId), e); } finally { try { waitUntilRenormalizerIsIdle(); @@ -208,7 +225,7 @@ public class AutoDetectResultProcessor { updateModelSnapshotIdSemaphore.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.info("[{}] Interrupted acquiring update model snaphot semaphore", jobId); + LOGGER.info("[{}] Interrupted acquiring update model snapshot semaphore", jobId); return; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java index f34f5f39485..10c9d7f3cf5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java @@ -35,7 +35,7 @@ public class AutodetectResultsParser extends AbstractComponent { try { XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, in); XContentParser.Token token = parser.nextToken(); - // if start of an array ignore it, we expect an array of buckets + // if start of an array ignore it, we expect an array of results if (token != XContentParser.Token.START_ARRAY) { throw new ElasticsearchParseException("unexpected token [" + token + "]"); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 92cd9d58fcd..97ceeda7aeb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.output; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.Client; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.action.UpdateJobAction; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeoutException; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -308,4 +310,22 @@ public class AutoDetectResultProcessorTests extends ESTestCase { assertEquals(0, processorUnderTest.completionLatch.getCount()); assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits()); } + + public void testPersisterThrowingDoesntBlockProcessing() { + AutodetectResult autodetectResult = mock(AutodetectResult.class); + ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); + when(autodetectResult.getModelSizeStats()).thenReturn(modelSizeStats); + + @SuppressWarnings("unchecked") + Iterator iterator = mock(Iterator.class); + when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false); + when(iterator.next()).thenReturn(autodetectResult); + AutodetectProcess process = mock(AutodetectProcess.class); + when(process.readAutodetectResults()).thenReturn(iterator); + + doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSizeStats(any()); + + processorUnderTest.process(process, randomBoolean()); + verify(persister, times(2)).persistModelSizeStats(any()); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java index 80f92709f4a..ae504e5e59f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java @@ -22,6 +22,12 @@ import java.util.Date; import java.util.List; import java.util.stream.Collectors; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Tests for parsing the JSON output of autodetect */ @@ -399,4 +405,16 @@ public class AutodetectResultsParserTests extends ESTestCase { assertEquals("unexpected token [START_ARRAY]", e.getMessage()); } + public void testConsumeAndCloseStream() throws IOException { + String json = "some string of data"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + + AutodetectResultsParser.consumeAndCloseStream(inputStream); + assertEquals(0, inputStream.available()); + + InputStream mockStream = mock(InputStream.class); + when(mockStream.read(any())).thenReturn(-1); + AutodetectResultsParser.consumeAndCloseStream(mockStream); + verify(mockStream, times(1)).close(); + } }