Reintroduce chunking to improve data extractor performance (elastic/elasticsearch#849)

* Reintroduce chunking to improve data extractor performance

Performing a sorted search/scroll over a period of time that matches
a lot of documents is very expensive because for each page all
documents are traversed.

The solution is to chunk the search time and perform separate
search/scrolls for each chunk.

This commit is introducing a new `chung` config in `datafeed_config`
whose mode can be set to either of AUTO, OFF, MANUAL, with the latter
allowing to specify an explicit chunk size.

When set to AUTO, a heuristic is used in order to determine the chunk
size. The heuristic is based on estimating the time interval within
which we expect `scroll_size` documents and then taking the 10x multiple
of that. Based on benchmarking, this method gives a dramatic performance
increase. For example, for the citizens dataset it improved the ingest
rate from 0.33M docs / minute to 13.6M docs / minute. Farequote is now
done in ~1 second.

Finally, note that when `chunk` is not specified, it defaults to AUTO
when aggregations are not set and to OFF otherwise. This is because
the chunk size heuristic does not lend itself great for aggregations
where one needs to chunk based on the cardinality of buckets rather
than simply time.

Relates to elastic/elasticsearch#734

Original commit: elastic/x-pack-elasticsearch@a738e86d21
This commit is contained in:
Dimitris Athanasiou 2017-02-03 15:50:01 +00:00 committed by GitHub
parent 21adb19b22
commit 9d9572e2b2
11 changed files with 1115 additions and 11 deletions

View File

@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
/**
* The description of how searches should be chunked.
*/
public class ChunkingConfig extends ToXContentToBytes implements Writeable {
public static final ParseField MODE_FIELD = new ParseField("mode");
public static final ParseField TIME_SPAN_FIELD = new ParseField("time_span");
public static final ConstructingObjectParser<ChunkingConfig, Void> PARSER = new ConstructingObjectParser<>(
"chunking_config", a -> new ChunkingConfig((Mode) a[0], (Long) a[1]));
static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Mode.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, MODE_FIELD, ValueType.STRING);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIME_SPAN_FIELD);
}
private final Mode mode;
private final Long timeSpan;
public ChunkingConfig(StreamInput in) throws IOException {
mode = Mode.readFromStream(in);
timeSpan = in.readOptionalLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
mode.writeTo(out);
out.writeOptionalLong(timeSpan);
}
ChunkingConfig(Mode mode, @Nullable Long timeSpan) {
this.mode = ExceptionsHelper.requireNonNull(mode, MODE_FIELD.getPreferredName());
this.timeSpan = timeSpan;
if (mode == Mode.MANUAL) {
if (timeSpan == null) {
throw new IllegalArgumentException("when chunk mode is manual time_span is required");
}
if (timeSpan <= 0) {
throw new IllegalArgumentException("chunk time_span has to be positive");
}
} else {
if (timeSpan != null) {
throw new IllegalArgumentException("chunk time_span may only be set when mode is manual");
}
}
}
@Nullable
public Long getTimeSpan() {
return timeSpan;
}
public boolean isEnabled() {
return mode != Mode.OFF;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MODE_FIELD.getPreferredName(), mode);
if (timeSpan != null) {
builder.field(TIME_SPAN_FIELD.getPreferredName(), timeSpan);
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(mode, timeSpan);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ChunkingConfig other = (ChunkingConfig) obj;
return Objects.equals(this.mode, other.mode) &&
Objects.equals(this.timeSpan, other.timeSpan);
}
public static ChunkingConfig newAuto() {
return new ChunkingConfig(Mode.AUTO, null);
}
public static ChunkingConfig newOff() {
return new ChunkingConfig(Mode.OFF, null);
}
public static ChunkingConfig newManual(long timeSpan) {
return new ChunkingConfig(Mode.MANUAL, timeSpan);
}
public enum Mode implements Writeable {
AUTO, MANUAL, OFF;
public static Mode fromString(String value) {
return Mode.valueOf(value.toUpperCase(Locale.ROOT));
}
public static Mode readFromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown Mode ordinal [" + ordinal + "]");
}
return values()[ordinal];
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(ordinal());
}
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
}

View File

@ -49,11 +49,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("datafeeds");
/**
* The field name used to specify aggregation fields in Elasticsearch
* aggregations
*/
private static final String FIELD = "field";
/**
* The field name used to specify document counts in Elasticsearch
* aggregations
@ -71,6 +66,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static final ParseField AGGS = new ParseField("aggs");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField SOURCE = new ParseField("_source");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("datafeed_config", Builder::new);
@ -96,6 +92,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
}, SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareBoolean(Builder::setSource, SOURCE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
}
private final String id;
@ -118,10 +115,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final boolean source;
private final ChunkingConfig chunkingConfig;
private DatafeedConfig(String id, String jobId, Long queryDelay, Long frequency, List<String> indexes, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations,
List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize, boolean source) {
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, boolean source, ChunkingConfig chunkingConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -133,6 +131,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.source = source;
this.chunkingConfig = chunkingConfig;
}
public DatafeedConfig(StreamInput in) throws IOException {
@ -159,6 +158,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
}
this.scrollSize = in.readOptionalVInt();
this.source = in.readBoolean();
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
}
public String getId() {
@ -217,6 +217,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return scriptFields == null ? Collections.emptyList() : scriptFields;
}
public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
@ -245,6 +249,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
}
out.writeOptionalVInt(scrollSize);
out.writeBoolean(source);
out.writeOptionalWriteable(chunkingConfig);
}
@Override
@ -279,6 +284,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (source) {
builder.field(SOURCE.getPreferredName(), source);
}
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
return builder;
}
@ -309,12 +317,14 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.aggregations, that.aggregations)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.source, that.source);
&& Objects.equals(this.source, that.source)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source);
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source,
chunkingConfig);
}
@Override
@ -338,6 +348,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize = DEFAULT_SCROLL_SIZE;
private boolean source = false;
private ChunkingConfig chunkingConfig;
public Builder() {
}
@ -360,6 +371,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.source = config.source;
this.chunkingConfig = config.chunkingConfig;
}
public void setId(String datafeedId) {
@ -443,6 +455,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.source = enabled;
}
public void setChunkingConfig(ChunkingConfig chunkingConfig) {
this.chunkingConfig = chunkingConfig;
}
public DatafeedConfig build() {
ExceptionsHelper.requireNonNull(id, ID.getPreferredName());
ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@ -459,7 +475,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS));
}
return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize,
source);
source, chunkingConfig);
}
private static ElasticsearchException invalidOptionValue(String fieldName, Object value) {

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
@ -158,8 +159,16 @@ public class DatafeedJobRunner extends AbstractComponent {
}
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) {
return datafeedConfig.getAggregations() == null ? new ScrollDataExtractorFactory(client, datafeedConfig, job)
boolean isScrollSearch = datafeedConfig.getAggregations() == null;
DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job)
: new AggregationDataExtractorFactory(client, datafeedConfig, job);
ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig();
if (chunkingConfig == null) {
chunkingConfig = isScrollSearch ? ChunkingConfig.newAuto() : ChunkingConfig.newOff();
}
return chunkingConfig.isEnabled() ? new ChunkedDataExtractorFactory(client, datafeedConfig, job, dataExtractorFactory)
: dataExtractorFactory;
}
private static DataDescription buildDataDescription(Job job) {

View File

@ -0,0 +1,212 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
/**
* A wrapper {@link DataExtractor} that can be used with other extractors in order to perform
* searches in smaller chunks of the time range.
*
* <p> The chunk span can be either specified or not. When not specified,
* a heuristic is employed (see {@link DataSummary#estimateChunk()}) to automatically determine the chunk span.
* The search is set up (see {@link #setUpChunkedSearch()} by querying a data summary for the given time range
* that includes the number of total hits and the earliest/latest times. Those are then used to determine the chunk span,
* when necessary, and to jump the search forward to the time where the earliest data can be found.
* If a search for a chunk returns empty, the set up is performed again for the remaining time.
*
* <p> Cancellation's behaviour depends on the delegate extractor.
*
* <p> Note that this class is NOT thread-safe.
*/
public class ChunkedDataExtractor implements DataExtractor {
private static final Logger LOGGER = Loggers.getLogger(ChunkedDataExtractor.class);
private static final String EARLIEST_TIME = "earliest_time";
private static final String LATEST_TIME = "latest_time";
private static final String VALUE_SUFFIX = ".value";
/** Let us set a minimum chunk span of 1 minute */
private static final long MIN_CHUNK_SPAN = 60000L;
private final Client client;
private final DataExtractorFactory dataExtractorFactory;
private final ChunkedDataExtractorContext context;
private long currentStart;
private long currentEnd;
private long chunkSpan;
private boolean isCancelled;
private DataExtractor currentExtractor;
public ChunkedDataExtractor(Client client, DataExtractorFactory dataExtractorFactory, ChunkedDataExtractorContext context) {
this.client = Objects.requireNonNull(client);
this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory);
this.context = Objects.requireNonNull(context);
this.currentStart = context.start;
this.currentEnd = context.start;
this.isCancelled = false;
}
@Override
public boolean hasNext() {
boolean currentHasNext = currentExtractor != null && currentExtractor.hasNext();
if (isCancelled()) {
return currentHasNext;
}
return currentHasNext || currentEnd < context.end;
}
@Override
public Optional<InputStream> next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
if (currentExtractor == null) {
// This is the first time next is called
setUpChunkedSearch();
}
return getNextStream();
}
private void setUpChunkedSearch() throws IOException {
DataSummary dataSummary = requestDataSummary();
if (dataSummary.totalHits > 0) {
currentStart = dataSummary.earliestTime;
currentEnd = currentStart;
chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan;
LOGGER.info("Chunked search configured: totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms",
dataSummary.totalHits, dataSummary.getDataTimeSpread(), chunkSpan);
} else {
// search is over
currentEnd = context.end;
}
}
private DataSummary requestDataSummary() throws IOException {
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
.setSize(0)
.setIndices(context.indexes)
.setTypes(context.types)
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, currentStart, context.end))
.addAggregation(AggregationBuilders.min(EARLIEST_TIME).field(context.timeField))
.addAggregation(AggregationBuilders.max(LATEST_TIME).field(context.timeField));
SearchResponse response = executeSearchRequest(searchRequestBuilder);
ExtractorUtils.checkSearchWasSuccessful(context.jobId, response);
Aggregations aggregations = response.getAggregations();
long earliestTime = 0;
long latestTime = 0;
long totalHits = response.getHits().totalHits();
if (totalHits > 0) {
earliestTime = (long) Double.parseDouble(aggregations.getProperty(EARLIEST_TIME + VALUE_SUFFIX).toString());
latestTime = (long) Double.parseDouble(aggregations.getProperty(LATEST_TIME + VALUE_SUFFIX).toString());
}
return new DataSummary(earliestTime, latestTime, totalHits);
}
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
return searchRequestBuilder.get();
}
private Optional<InputStream> getNextStream() throws IOException {
while (hasNext()) {
boolean isNewSearch = false;
if (currentExtractor == null || currentExtractor.hasNext() == false) {
// First search or the current search finished; we can advance to the next search
advanceTime();
isNewSearch = true;
}
Optional<InputStream> nextStream = currentExtractor.next();
if (nextStream.isPresent()) {
return nextStream;
}
if (isNewSearch && hasNext()) {
// If it was a new search it means it returned 0 results. Thus,
// we reconfigure and jump to the next time interval where there are data.
setUpChunkedSearch();
}
}
return Optional.empty();
}
private void advanceTime() {
currentStart = currentEnd;
currentEnd = Math.min(currentStart + chunkSpan, context.end);
currentExtractor = dataExtractorFactory.newExtractor(currentStart, currentEnd);
LOGGER.trace("advances time to [{}, {})", currentStart, currentEnd);
}
@Override
public boolean isCancelled() {
return isCancelled;
}
@Override
public void cancel() {
if (currentExtractor != null) {
currentExtractor.cancel();
}
isCancelled = true;
}
private class DataSummary {
private long earliestTime;
private long latestTime;
private long totalHits;
private DataSummary(long earliestTime, long latestTime, long totalHits) {
this.earliestTime = earliestTime;
this.latestTime = latestTime;
this.totalHits = totalHits;
}
private long getDataTimeSpread() {
return latestTime - earliestTime;
}
/**
* The heuristic here is that we want a time interval where we expect roughly scrollSize documents
* (assuming data are uniformly spread over time).
* We have totalHits documents over dataTimeSpread (latestTime - earliestTime), we want scrollSize documents over chunk.
* Thus, the interval would be (scrollSize * dataTimeSpread) / totalHits.
* However, assuming this as the chunk span may often lead to half-filled pages or empty searches.
* It is beneficial to take a multiple of that. Based on benchmarking, we set this to 10x.
*/
private long estimateChunk() {
long dataTimeSpread = getDataTimeSpread();
if (totalHits <= 0 || dataTimeSpread <= 0) {
return context.end - currentEnd;
}
long estimatedChunk = 10 * (context.scrollSize * getDataTimeSpread()) / totalHits;
return Math.max(estimatedChunk, MIN_CHUNK_SPAN);
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import java.util.List;
import java.util.Objects;
class ChunkedDataExtractorContext {
final String jobId;
final String timeField;
final String[] indexes;
final String[] types;
final QueryBuilder query;
final int scrollSize;
final long start;
final long end;
final Long chunkSpan;
public ChunkedDataExtractorContext(String jobId, String timeField, List<String> indexes, List<String> types,
QueryBuilder query, int scrollSize, long start, long end, @Nullable Long chunkSpan) {
this.jobId = Objects.requireNonNull(jobId);
this.timeField = Objects.requireNonNull(timeField);
this.indexes = indexes.toArray(new String[indexes.size()]);
this.types = types.toArray(new String[types.size()]);
this.query = Objects.requireNonNull(query);
this.scrollSize = scrollSize;
this.start = start;
this.end = end;
this.chunkSpan = chunkSpan;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.util.Objects;
public class ChunkedDataExtractorFactory implements DataExtractorFactory {
private final Client client;
private final DatafeedConfig datafeedConfig;
private final Job job;
private final DataExtractorFactory dataExtractorFactory;
public ChunkedDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, DataExtractorFactory dataExtractorFactory) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory);
}
@Override
public DataExtractor newExtractor(long start, long end) {
ChunkedDataExtractorContext dataExtractorContext = new ChunkedDataExtractorContext(
job.getId(),
job.getDataDescription().getTimeField(),
datafeedConfig.getIndexes(),
datafeedConfig.getTypes(),
datafeedConfig.getQuery(),
datafeedConfig.getScrollSize(),
start,
end,
datafeedConfig.getChunkingConfig() == null ? null : datafeedConfig.getChunkingConfig().getTimeSpan());
return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext);
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import static org.hamcrest.Matchers.is;
public class ChunkingConfigTests extends AbstractSerializingTestCase<ChunkingConfig> {
@Override
protected ChunkingConfig createTestInstance() {
return createRandomizedChunk();
}
@Override
protected Writeable.Reader<ChunkingConfig> instanceReader() {
return ChunkingConfig::new;
}
@Override
protected ChunkingConfig parseInstance(XContentParser parser) {
return ChunkingConfig.PARSER.apply(parser, null);
}
public void testConstructorGivenAutoAndTimeSpan() {
expectThrows(IllegalArgumentException.class, () ->new ChunkingConfig(ChunkingConfig.Mode.AUTO, 1000L));
}
public void testConstructorGivenOffAndTimeSpan() {
expectThrows(IllegalArgumentException.class, () ->new ChunkingConfig(ChunkingConfig.Mode.OFF, 1000L));
}
public void testConstructorGivenManualAndNoTimeSpan() {
expectThrows(IllegalArgumentException.class, () ->new ChunkingConfig(ChunkingConfig.Mode.MANUAL, null));
}
public void testIsEnabled() {
assertThat(ChunkingConfig.newAuto().isEnabled(), is(true));
assertThat(ChunkingConfig.newManual(1000).isEnabled(), is(true));
assertThat(ChunkingConfig.newOff().isEnabled(), is(false));
}
public static ChunkingConfig createRandomizedChunk() {
ChunkingConfig.Mode mode = randomFrom(ChunkingConfig.Mode.values());
Long timeSpan = null;
if (mode == ChunkingConfig.Mode.MANUAL) {
timeSpan = randomNonNegativeLong();
if (timeSpan == 0L) {
timeSpan = 1L;
}
}
return new ChunkingConfig(mode, timeSpan);
}
}

View File

@ -67,6 +67,9 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
if (randomBoolean()) {
builder.setSource(randomBoolean());
}
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
return builder.build();
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin;
@ -22,6 +23,9 @@ import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
@ -42,6 +46,8 @@ import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.same;
@ -212,6 +218,76 @@ public class DatafeedJobRunnerTests extends ESTestCase {
}
}
public void testCreateDataExtractorFactoryGivenDefaultScroll() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig, jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenDefaultAggregation() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder());
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder());
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public static DatafeedConfig.Builder createDatafeedConfig(String datafeedId, String jobId) {
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, jobId);
datafeedConfig.setIndexes(Arrays.asList("myIndex"));

View File

@ -0,0 +1,474 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ChunkedDataExtractorTests extends ESTestCase {
private Client client;
private List<SearchRequestBuilder> capturedSearchRequests;
private String jobId;
private String timeField;
private List<String> types;
private List<String> indexes;
private QueryBuilder query;
private int scrollSize;
private Long chunkSpan;
private DataExtractorFactory dataExtractorFactory;
private class TestDataExtractor extends ChunkedDataExtractor {
private SearchResponse nextResponse;
public TestDataExtractor(long start, long end) {
super(client, dataExtractorFactory, createContext(start, end));
}
@Override
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
capturedSearchRequests.add(searchRequestBuilder);
return nextResponse;
}
void setNextResponse(SearchResponse searchResponse) {
nextResponse = searchResponse;
}
}
@Before
public void setUpTests() {
client = mock(Client.class);
capturedSearchRequests = new ArrayList<>();
jobId = "test-job";
timeField = "time";
indexes = Arrays.asList("index-1", "index-2");
types = Arrays.asList("type-1", "type-2");
query = QueryBuilders.matchAllQuery();
scrollSize = 1000;
chunkSpan = null;
dataExtractorFactory = mock(DataExtractorFactory.class);
}
public void testExtractionGivenNoData() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
extractor.setNextResponse(createSearchResponse(0L, 0L, 0L));
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
}
public void testExtractionGivenSpecifiedChunk() throws IOException {
chunkSpan = 1000L;
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
InputStream inputStream1 = mock(InputStream.class);
InputStream inputStream2 = mock(InputStream.class);
InputStream inputStream3 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1, inputStream2);
when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
DataExtractor subExtactor2 = new StubSubExtractor(inputStream3);
when(dataExtractorFactory.newExtractor(2000L, 2300L)).thenReturn(subExtactor2);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream2, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream3, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
verify(dataExtractorFactory).newExtractor(1000L, 2000L);
verify(dataExtractorFactory).newExtractor(2000L, 2300L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(1));
String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", "");
assertThat(searchRequest, containsString("\"size\":0"));
assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," +
"{\"range\":{\"time\":{\"from\":1000,\"to\":2300,\"include_lower\":true,\"include_upper\":false," +
"\"format\":\"epoch_millis\",\"boost\":1.0}}}]"));
assertThat(searchRequest, containsString("\"aggregations\":{\"earliest_time\":{\"min\":{\"field\":\"time\"}}," +
"\"latest_time\":{\"max\":{\"field\":\"time\"}}}}"));
assertThat(searchRequest, not(containsString("\"sort\"")));
}
public void testExtractionGivenAutoChunkAndScrollSize1000() throws IOException {
chunkSpan = null;
scrollSize = 1000;
TestDataExtractor extractor = new TestDataExtractor(100000L, 450000L);
// 300K millis * 1000 * 10 / 15K docs = 200000
extractor.setNextResponse(createSearchResponse(15000L, 100000L, 400000L));
InputStream inputStream1 = mock(InputStream.class);
InputStream inputStream2 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(100000L, 300000L)).thenReturn(subExtactor1);
DataExtractor subExtactor2 = new StubSubExtractor(inputStream2);
when(dataExtractorFactory.newExtractor(300000L, 450000L)).thenReturn(subExtactor2);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream2, extractor.next().get());
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
verify(dataExtractorFactory).newExtractor(100000L, 300000L);
verify(dataExtractorFactory).newExtractor(300000L, 450000L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenAutoChunkAndScrollSize500() throws IOException {
chunkSpan = null;
scrollSize = 500;
TestDataExtractor extractor = new TestDataExtractor(100000L, 450000L);
// 300K millis * 500 * 10 / 15K docs = 100000
extractor.setNextResponse(createSearchResponse(15000L, 100000L, 400000L));
InputStream inputStream1 = mock(InputStream.class);
InputStream inputStream2 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(100000L, 200000L)).thenReturn(subExtactor1);
DataExtractor subExtactor2 = new StubSubExtractor(inputStream2);
when(dataExtractorFactory.newExtractor(200000L, 300000L)).thenReturn(subExtactor2);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream2, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
verify(dataExtractorFactory).newExtractor(100000L, 200000L);
verify(dataExtractorFactory).newExtractor(200000L, 300000L);
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenAutoChunkIsLessThanMinChunk() throws IOException {
chunkSpan = null;
scrollSize = 1000;
TestDataExtractor extractor = new TestDataExtractor(100000L, 450000L);
// 30K millis * 1000 * 10 / 150K docs = 2000 < min of 60K
extractor.setNextResponse(createSearchResponse(150000L, 100000L, 400000L));
InputStream inputStream1 = mock(InputStream.class);
InputStream inputStream2 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(100000L, 160000L)).thenReturn(subExtactor1);
DataExtractor subExtactor2 = new StubSubExtractor(inputStream2);
when(dataExtractorFactory.newExtractor(160000L, 220000L)).thenReturn(subExtactor2);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream2, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
verify(dataExtractorFactory).newExtractor(100000L, 160000L);
verify(dataExtractorFactory).newExtractor(160000L, 220000L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenAutoChunkAndDataTimeSpreadIsZero() throws IOException {
chunkSpan = null;
scrollSize = 1000;
TestDataExtractor extractor = new TestDataExtractor(100L, 500L);
extractor.setNextResponse(createSearchResponse(150000L, 300L, 300L));
InputStream inputStream1 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(300L, 500L)).thenReturn(subExtactor1);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
verify(dataExtractorFactory).newExtractor(300L, 500L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenAutoChunkAndTotalTimeRangeSmallerThanChunk() throws IOException {
chunkSpan = null;
scrollSize = 1000;
TestDataExtractor extractor = new TestDataExtractor(1L, 101L);
// 100 millis * 1000 * 10 / 10 docs = 100000
extractor.setNextResponse(createSearchResponse(10L, 1L, 101L));
InputStream inputStream1 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(1L, 101L)).thenReturn(subExtactor1);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
verify(dataExtractorFactory).newExtractor(1L, 101L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenAutoChunkAndIntermediateEmptySearchShouldReconfigure() throws IOException {
chunkSpan = null;
scrollSize = 500;
TestDataExtractor extractor = new TestDataExtractor(100000L, 400000L);
// 300K millis * 500 * 10 / 15K docs = 100000
extractor.setNextResponse(createSearchResponse(15000L, 100000L, 400000L));
InputStream inputStream1 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(100000L, 200000L)).thenReturn(subExtactor1);
// This one is empty
DataExtractor subExtactor2 = new StubSubExtractor();
when(dataExtractorFactory.newExtractor(200000, 300000L)).thenReturn(subExtactor2);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
// Now we have: 200K millis * 500 * 10 / 5K docs = 200000
extractor.setNextResponse(createSearchResponse(5000, 200000L, 400000L));
// This is the last one
InputStream inputStream2 = mock(InputStream.class);
DataExtractor subExtactor3 = new StubSubExtractor(inputStream2);
when(dataExtractorFactory.newExtractor(200000, 400000)).thenReturn(subExtactor3);
assertEquals(inputStream2, extractor.next().get());
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
verify(dataExtractorFactory).newExtractor(100000L, 200000L);
verify(dataExtractorFactory).newExtractor(200000L, 300000L);
verify(dataExtractorFactory).newExtractor(200000L, 400000L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(2));
String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", "");
assertThat(searchRequest, containsString("\"from\":100000,\"to\":400000"));
searchRequest = capturedSearchRequests.get(1).toString().replaceAll("\\s", "");
assertThat(searchRequest, containsString("\"from\":200000,\"to\":400000"));
}
public void testCancelGivenNextWasNeverCalled() throws IOException {
chunkSpan = 1000L;
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
InputStream inputStream1 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
assertThat(extractor.hasNext(), is(true));
extractor.cancel();
assertThat(extractor.isCancelled(), is(true));
assertThat(extractor.hasNext(), is(false));
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
}
public void testCancelGivenCurrentSubExtractorHasMore() throws IOException {
chunkSpan = 1000L;
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
InputStream inputStream1 = mock(InputStream.class);
InputStream inputStream2 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1, inputStream2);
when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
extractor.cancel();
assertThat(extractor.isCancelled(), is(true));
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream2, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
verify(dataExtractorFactory).newExtractor(1000L, 2000L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
}
public void testCancelGivenCurrentSubExtractorIsDone() throws IOException {
chunkSpan = 1000L;
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
InputStream inputStream1 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
extractor.cancel();
assertThat(extractor.isCancelled(), is(true));
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
verify(dataExtractorFactory).newExtractor(1000L, 2000L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
}
public void testDataSummaryRequestIsNotOk() {
chunkSpan = 2000L;
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
extractor.setNextResponse(createErrorResponse());
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, () -> extractor.next());
}
public void testDataSummaryRequestHasShardFailures() {
chunkSpan = 2000L;
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
extractor.setNextResponse(createResponseWithShardFailures());
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, () -> extractor.next());
}
private SearchResponse createSearchResponse(long totalHits, long earliestTime, long latestTime) {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
SearchHits searchHits = mock(SearchHits.class);
when(searchHits.totalHits()).thenReturn(totalHits);
when(searchResponse.getHits()).thenReturn(searchHits);
Aggregations aggs = mock(Aggregations.class);
when(aggs.getProperty("earliest_time.value")).thenReturn((double) earliestTime);
when(aggs.getProperty("latest_time.value")).thenReturn((double) latestTime);
when(searchResponse.getAggregations()).thenReturn(aggs);
return searchResponse;
}
private SearchResponse createErrorResponse() {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR);
return searchResponse;
}
private SearchResponse createResponseWithShardFailures() {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getShardFailures()).thenReturn(
new ShardSearchFailure[] { new ShardSearchFailure(new RuntimeException("shard failed"))});
return searchResponse;
}
private ChunkedDataExtractorContext createContext(long start, long end) {
return new ChunkedDataExtractorContext(jobId, timeField, indexes, types, query, scrollSize, start, end, chunkSpan);
}
private static class StubSubExtractor implements DataExtractor {
List<InputStream> streams = new ArrayList<>();
boolean hasNext = true;
StubSubExtractor() {}
StubSubExtractor(InputStream... streams) {
for (InputStream stream : streams) {
this.streams.add(stream);
}
}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public Optional<InputStream> next() throws IOException {
if (streams.isEmpty()) {
hasNext = false;
return Optional.empty();
}
return Optional.of(streams.remove(0));
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public void cancel() {
// do nothing
}
}
}

View File

@ -140,6 +140,22 @@ setup:
xpack.ml.delete_datafeed:
datafeed_id: a-missing-datafeed
---
"Test put datafeed with chunking_config":
- do:
xpack.ml.put_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"job_id":"job-1",
"indexes":["index-foo"],
"types":["type-bar"],
"chunking_config": {"mode":"manual","time_span": 3600}
}
- match: { datafeed_id: "test-datafeed-1" }
- match: { chunking_config.mode: "manual" }
- match: { chunking_config.time_span: 3600 }
---
"Test delete datafeed":
- do: