replaced custom ClosableIterator with Stream

Stream is closable, which is the reason ClosableIterator was introduced.

Original commit: elastic/x-pack-elasticsearch@b5a4a37e9e
This commit is contained in:
Martijn van Groningen 2016-12-07 12:07:47 +01:00
parent 5812ef4a86
commit d807eda9ed
6 changed files with 36 additions and 46 deletions

View File

@ -21,14 +21,14 @@ import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
import java.io.InputStream; import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
/** /**
* A runnable class that reads the autodetect process output * A runnable class that reads the autodetect process output
@ -67,8 +67,9 @@ public class AutoDetectResultProcessor {
} }
public void process(String jobId, InputStream in, boolean isPerPartitionNormalisation) { public void process(String jobId, InputStream in, boolean isPerPartitionNormalisation) {
try (CloseableIterator<AutodetectResult> iterator = parser.parseResults(in)) { try (Stream<AutodetectResult> stream = parser.parseResults(in)) {
int bucketCount = 0; int bucketCount = 0;
Iterator<AutodetectResult> iterator = stream.iterator();
Context context = new Context(jobId, isPerPartitionNormalisation); Context context = new Context(jobId, isPerPartitionNormalisation);
while (iterator.hasNext()) { while (iterator.hasNext()) {
AutodetectResult result = iterator.next(); AutodetectResult result = iterator.next();

View File

@ -13,10 +13,14 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult; import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/** /**
@ -34,7 +38,7 @@ public class AutodetectResultsParser extends AbstractComponent {
this.parseFieldMatcherSupplier = parseFieldMatcherSupplier; this.parseFieldMatcherSupplier = parseFieldMatcherSupplier;
} }
public CloseableIterator<AutodetectResult> parseResults(InputStream in) throws ElasticsearchParseException { public Stream<AutodetectResult> parseResults(InputStream in) throws ElasticsearchParseException {
try { try {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(in); XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(in);
XContentParser.Token token = parser.nextToken(); XContentParser.Token token = parser.nextToken();
@ -42,7 +46,9 @@ public class AutodetectResultsParser extends AbstractComponent {
if (token != XContentParser.Token.START_ARRAY) { if (token != XContentParser.Token.START_ARRAY) {
throw new ElasticsearchParseException("unexpected token [" + token + "]"); throw new ElasticsearchParseException("unexpected token [" + token + "]");
} }
return new AutodetectResultIterator(in, parser); Spliterator<AutodetectResult> spliterator = Spliterators.spliterator(new AutodetectResultIterator(parser), Long.MAX_VALUE, 0);
return StreamSupport.stream(spliterator, false)
.onClose(() -> consumeAndCloseStream(in));
} catch (IOException e) { } catch (IOException e) {
consumeAndCloseStream(in); consumeAndCloseStream(in);
throw new ElasticsearchParseException(e.getMessage(), e); throw new ElasticsearchParseException(e.getMessage(), e);
@ -65,15 +71,12 @@ public class AutodetectResultsParser extends AbstractComponent {
} }
} }
private class AutodetectResultIterator implements CloseableIterator<AutodetectResult> { private class AutodetectResultIterator implements Iterator<AutodetectResult> {
private final InputStream in;
private final XContentParser parser; private final XContentParser parser;
private XContentParser.Token token; private XContentParser.Token token;
private AutodetectResultIterator(InputStream in, XContentParser parser) { private AutodetectResultIterator(XContentParser parser) {
this.in = in;
this.parser = parser; this.parser = parser;
token = parser.currentToken(); token = parser.currentToken();
} }
@ -99,11 +102,6 @@ public class AutodetectResultsParser extends AbstractComponent {
return AutodetectResult.PARSER.apply(parser, parseFieldMatcherSupplier); return AutodetectResult.PARSER.apply(parser, parseFieldMatcherSupplier);
} }
@Override
public void close() throws IOException {
consumeAndCloseStream(in);
}
} }
} }

View File

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.utils;
import java.io.Closeable;
import java.util.Iterator;
/**
* An interface for iterators that can have resources that will be automatically cleaned up
* if iterator is created in a try-with-resources block.
*/
public interface CloseableIterator<T> extends Iterator<T>, Closeable {
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadPar
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult; import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -45,9 +44,11 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Stream;
import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.doThrow; import static org.elasticsearch.mock.orig.Mockito.doThrow;
@ -119,9 +120,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME)).thenReturn(executorService); when(threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME)).thenReturn(executorService);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
CloseableIterator<AutodetectResult> iterator = mock(CloseableIterator.class); Stream<AutodetectResult> stream = mock(Stream.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(false); when(iterator.hasNext()).thenReturn(false);
when(parser.parseResults(any())).thenReturn(iterator); when(parser.parseResults(any())).thenReturn(stream);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
when(autodetectProcess.isProcessAlive()).thenReturn(true); when(autodetectProcess.isProcessAlive()).thenReturn(true);
when(autodetectProcess.getPersistStream()).thenReturn(new ByteArrayInputStream(new byte[0])); when(autodetectProcess.getPersistStream()).thenReturn(new ByteArrayInputStream(new byte[0]));

View File

@ -18,12 +18,13 @@ import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
import org.mockito.InOrder; import org.mockito.InOrder;
import java.io.InputStream; import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.stream.Stream;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
@ -39,11 +40,14 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcess() { public void testProcess() {
AutodetectResult autodetectResult = mock(AutodetectResult.class); AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
CloseableIterator<AutodetectResult> iterator = mock(CloseableIterator.class); Stream<AutodetectResult> stream = mock(Stream.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false); when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult); when(iterator.next()).thenReturn(autodetectResult);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
when(parser.parseResults(any())).thenReturn(iterator); when(parser.parseResults(any())).thenReturn(stream);
Renormaliser renormaliser = mock(Renormaliser.class); Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class); JobResultsPersister persister = mock(JobResultsPersister.class);

View File

@ -229,7 +229,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT);
List<AutodetectResult> results = new ArrayList<>(); List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add); parser.parseResults(inputStream).iterator().forEachRemaining(results::add);
List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket) List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket)
.filter(b -> b != null) .filter(b -> b != null)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -324,7 +324,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT);
List<AutodetectResult> results = new ArrayList<>(); List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add); parser.parseResults(inputStream).iterator().forEachRemaining(results::add);
List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket) List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket)
.filter(b -> b != null) .filter(b -> b != null)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -351,7 +351,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
String json = "[]"; String json = "[]";
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT);
assertFalse(parser.parseResults(inputStream).hasNext()); assertFalse(parser.parseResults(inputStream).iterator().hasNext());
} }
public void testParse_GivenModelSizeStats() throws ElasticsearchParseException, IOException { public void testParse_GivenModelSizeStats() throws ElasticsearchParseException, IOException {
@ -360,7 +360,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT);
List<AutodetectResult> results = new ArrayList<>(); List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add); parser.parseResults(inputStream).iterator().forEachRemaining(results::add);
assertEquals(1, results.size()); assertEquals(1, results.size());
assertEquals(300, results.get(0).getModelSizeStats().getModelBytes()); assertEquals(300, results.get(0).getModelSizeStats().getModelBytes());
@ -371,7 +371,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT);
List<AutodetectResult> results = new ArrayList<>(); List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add); parser.parseResults(inputStream).iterator().forEachRemaining(results::add);
assertEquals(1, results.size()); assertEquals(1, results.size());
assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId()); assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId());
@ -382,7 +382,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> parser.parseResults(inputStream).forEachRemaining(a -> {})); () -> parser.parseResults(inputStream).iterator().forEachRemaining(a -> {}));
assertEquals("[autodetect_result] unknown field [unknown], parser not found", e.getMessage()); assertEquals("[autodetect_result] unknown field [unknown], parser not found", e.getMessage());
} }
@ -391,7 +391,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY, () -> ParseFieldMatcher.STRICT);
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> parser.parseResults(inputStream).forEachRemaining(a -> {})); () -> parser.parseResults(inputStream).iterator().forEachRemaining(a -> {}));
assertEquals("unexpected token [START_ARRAY]", e.getMessage()); assertEquals("unexpected token [START_ARRAY]", e.getMessage());
} }