diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java index 38a58f9ced6..d57548c6b55 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -21,14 +21,14 @@ import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition; import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities; -import org.elasticsearch.xpack.prelert.utils.CloseableIterator; import java.io.InputStream; import java.time.Duration; +import java.util.Iterator; import java.util.List; -import java.util.Locale; import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.stream.Stream; /** * A runnable class that reads the autodetect process output @@ -67,8 +67,9 @@ public class AutoDetectResultProcessor { } public void process(String jobId, InputStream in, boolean isPerPartitionNormalisation) { - try (CloseableIterator iterator = parser.parseResults(in)) { + try (Stream stream = parser.parseResults(in)) { int bucketCount = 0; + Iterator iterator = stream.iterator(); Context context = new Context(jobId, isPerPartitionNormalisation); while (iterator.hasNext()) { AutodetectResult result = iterator.next(); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParser.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParser.java index f26c6c4d337..62b5c17cb7c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParser.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParser.java @@ -13,10 +13,14 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.prelert.job.results.AutodetectResult; -import org.elasticsearch.xpack.prelert.utils.CloseableIterator; import java.io.IOException; import java.io.InputStream; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** @@ -34,7 +38,7 @@ public class AutodetectResultsParser extends AbstractComponent { this.parseFieldMatcherSupplier = parseFieldMatcherSupplier; } - public CloseableIterator parseResults(InputStream in) throws ElasticsearchParseException { + public Stream parseResults(InputStream in) throws ElasticsearchParseException { try { XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(in); XContentParser.Token token = parser.nextToken(); @@ -42,7 +46,9 @@ public class AutodetectResultsParser extends AbstractComponent { if (token != XContentParser.Token.START_ARRAY) { throw new ElasticsearchParseException("unexpected token [" + token + "]"); } - return new AutodetectResultIterator(in, parser); + Spliterator spliterator = Spliterators.spliterator(new AutodetectResultIterator(parser), Long.MAX_VALUE, 0); + return StreamSupport.stream(spliterator, false) + .onClose(() -> consumeAndCloseStream(in)); } catch (IOException e) { consumeAndCloseStream(in); throw new ElasticsearchParseException(e.getMessage(), e); @@ -65,15 +71,12 @@ public class AutodetectResultsParser extends AbstractComponent { } } - private class AutodetectResultIterator implements CloseableIterator { + private class AutodetectResultIterator implements Iterator { - private final InputStream in; private final XContentParser parser; - private XContentParser.Token token; - private AutodetectResultIterator(InputStream in, XContentParser parser) { - this.in = in; + private AutodetectResultIterator(XContentParser parser) { this.parser = parser; token = parser.currentToken(); } @@ -99,11 +102,6 @@ public class AutodetectResultsParser extends AbstractComponent { return AutodetectResult.PARSER.apply(parser, parseFieldMatcherSupplier); } - @Override - public void close() throws IOException { - consumeAndCloseStream(in); - } - } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/utils/CloseableIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/utils/CloseableIterator.java deleted file mode 100644 index 82b29473edc..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/utils/CloseableIterator.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.utils; - -import java.io.Closeable; -import java.util.Iterator; - -/** - * An interface for iterators that can have resources that will be automatically cleaned up - * if iterator is created in a try-with-resources block. - */ -public interface CloseableIterator extends Iterator, Closeable { - -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java index 8f0085cea63..4d7c02162e9 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadPar import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.prelert.job.results.AutodetectResult; -import org.elasticsearch.xpack.prelert.utils.CloseableIterator; import org.junit.Before; import org.mockito.Mockito; @@ -45,9 +44,11 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; +import java.util.stream.Stream; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doThrow; @@ -119,9 +120,12 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME)).thenReturn(executorService); AutodetectResultsParser parser = mock(AutodetectResultsParser.class); @SuppressWarnings("unchecked") - CloseableIterator iterator = mock(CloseableIterator.class); + Stream stream = mock(Stream.class); + @SuppressWarnings("unchecked") + Iterator iterator = mock(Iterator.class); + when(stream.iterator()).thenReturn(iterator); when(iterator.hasNext()).thenReturn(false); - when(parser.parseResults(any())).thenReturn(iterator); + when(parser.parseResults(any())).thenReturn(stream); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); when(autodetectProcess.isProcessAlive()).thenReturn(true); when(autodetectProcess.getPersistStream()).thenReturn(new ByteArrayInputStream(new byte[0])); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 4e72c5dda42..9ed332daa17 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -18,12 +18,13 @@ import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition; import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities; -import org.elasticsearch.xpack.prelert.utils.CloseableIterator; import org.mockito.InOrder; import java.io.InputStream; import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import java.util.stream.Stream; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; @@ -39,11 +40,14 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcess() { AutodetectResult autodetectResult = mock(AutodetectResult.class); @SuppressWarnings("unchecked") - CloseableIterator iterator = mock(CloseableIterator.class); + Stream stream = mock(Stream.class); + @SuppressWarnings("unchecked") + Iterator iterator = mock(Iterator.class); + when(stream.iterator()).thenReturn(iterator); when(iterator.hasNext()).thenReturn(true).thenReturn(false); when(iterator.next()).thenReturn(autodetectResult); AutodetectResultsParser parser = mock(AutodetectResultsParser.class); - when(parser.parseResults(any())).thenReturn(iterator); + when(parser.parseResults(any())).thenReturn(stream); Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java index 13a7fb9d1a0..923a4501853 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java @@ -229,7 +229,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); List results = new ArrayList<>(); - parser.parseResults(inputStream).forEachRemaining(results::add); + parser.parseResults(inputStream).iterator().forEachRemaining(results::add); List buckets = results.stream().map(AutodetectResult::getBucket) .filter(b -> b != null) .collect(Collectors.toList()); @@ -324,7 +324,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); List results = new ArrayList<>(); - parser.parseResults(inputStream).forEachRemaining(results::add); + parser.parseResults(inputStream).iterator().forEachRemaining(results::add); List buckets = results.stream().map(AutodetectResult::getBucket) .filter(b -> b != null) .collect(Collectors.toList()); @@ -351,7 +351,7 @@ public class AutodetectResultsParserTests extends ESTestCase { String json = "[]"; InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); - assertFalse(parser.parseResults(inputStream).hasNext()); + assertFalse(parser.parseResults(inputStream).iterator().hasNext()); } public void testParse_GivenModelSizeStats() throws ElasticsearchParseException, IOException { @@ -360,7 +360,7 @@ public class AutodetectResultsParserTests extends ESTestCase { AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); List results = new ArrayList<>(); - parser.parseResults(inputStream).forEachRemaining(results::add); + parser.parseResults(inputStream).iterator().forEachRemaining(results::add); assertEquals(1, results.size()); assertEquals(300, results.get(0).getModelSizeStats().getModelBytes()); @@ -371,7 +371,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); List results = new ArrayList<>(); - parser.parseResults(inputStream).forEachRemaining(results::add); + parser.parseResults(inputStream).iterator().forEachRemaining(results::add); assertEquals(1, results.size()); assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId()); @@ -382,7 +382,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> parser.parseResults(inputStream).forEachRemaining(a -> {})); + () -> parser.parseResults(inputStream).iterator().forEachRemaining(a -> {})); assertEquals("[autodetect_result] unknown field [unknown], parser not found", e.getMessage()); } @@ -391,7 +391,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, - () -> parser.parseResults(inputStream).forEachRemaining(a -> {})); + () -> parser.parseResults(inputStream).iterator().forEachRemaining(a -> {})); assertEquals("unexpected token [START_ARRAY]", e.getMessage()); }