From b9030bf6feee60166d57291d5d2a91368024dbf1 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Tue, 10 May 2016 12:10:55 +0200 Subject: [PATCH] Add the ability to partition a scroll in multiple slices. API: ``` curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '{ "slice": { "field": "_uid", <1> "id": 0, <2> "max": 10 <3> }, "query": { "match" : { "title" : "elasticsearch" } } } ``` <1> (optional) The field name used to do the slicing (_uid by default) <2> The id of the slice By default the splitting is done on the shards first and then locally on each shard using the _uid field with the following formula: `slice(doc) = floorMod(hashCode(doc._uid), max)` For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard. Each scroll is independent and can be processed in parallel like any scroll request. Closes #13494 --- .../action/search/SearchRequestBuilder.java | 6 + .../elasticsearch/search/SearchService.java | 9 + .../search/builder/SearchSourceBuilder.java | 102 +++--- .../search/internal/DefaultSearchContext.java | 25 +- .../search/internal/ScrollContext.java | 1 - .../search/internal/SubSearchContext.java | 1 - .../search/slice/DocValuesSliceQuery.java | 67 ++++ .../search/slice/SliceBuilder.java | 251 +++++++++++++ .../search/slice/SliceQuery.java | 81 +++++ .../search/slice/TermsSliceQuery.java | 86 +++++ .../builder/SearchSourceBuilderTests.java | 11 + .../slice/DocValuesSliceQueryTests.java | 124 +++++++ .../search/slice/SearchSliceIT.java | 215 +++++++++++ .../search/slice/SliceBuilderTests.java | 340 ++++++++++++++++++ .../search/slice/TermsSliceQueryTests.java | 117 ++++++ docs/reference/search/request/scroll.asciidoc | 89 +++++ 16 files changed, 1462 insertions(+), 63 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/search/slice/DocValuesSliceQuery.java create mode 100644 core/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/search/slice/SliceQuery.java create mode 100644 core/src/main/java/org/elasticsearch/search/slice/TermsSliceQuery.java create mode 100644 core/src/test/java/org/elasticsearch/search/slice/DocValuesSliceQueryTests.java create mode 100644 core/src/test/java/org/elasticsearch/search/slice/SearchSliceIT.java create mode 100644 core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java create mode 100644 core/src/test/java/org/elasticsearch/search/slice/TermsSliceQueryTests.java diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index c5666fa16a5..173c15ec1a5 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -30,6 +30,7 @@ import org.elasticsearch.script.Template; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.highlight.HighlightBuilder; import org.elasticsearch.search.rescore.RescoreBuilder; @@ -352,6 +353,11 @@ public class SearchRequestBuilder extends ActionRequestBuilderfalse. diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index b4704fe5a7f..57841466a62 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -821,6 +821,15 @@ public class SearchService extends AbstractLifecycleComponent imp FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter()); context.searchAfter(fieldDoc); } + + if (source.slice() != null) { + if (context.scrollContext() == null) { + throw new SearchContextException(context, "`slice` cannot be used outside of a scroll context"); + } + context.sliceFilter(source.slice().toFilter(queryShardContext, + context.shardTarget().getShardId().getId(), + queryShardContext.getIndexSettings().getNumberOfShards())); + } } private static final int[] EMPTY_DOC_IDS = new int[0]; diff --git a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index 8c4be225127..b1a5d2d03e6 100644 --- a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.script.Script; import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorParsers; import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder; @@ -98,6 +99,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ public static final ParseField EXT_FIELD = new ParseField("ext"); public static final ParseField PROFILE_FIELD = new ParseField("profile"); public static final ParseField SEARCH_AFTER = new ParseField("search_after"); + public static final ParseField SLICE = new ParseField("slice"); public static SearchSourceBuilder fromXContent(QueryParseContext context, AggregatorParsers aggParsers, Suggesters suggesters) throws IOException { @@ -138,6 +140,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ private SearchAfterBuilder searchAfterBuilder; + private SliceBuilder sliceBuilder; + private Float minScore; private long timeoutInMillis = -1; @@ -175,9 +179,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ * Read from a stream. */ public SearchSourceBuilder(StreamInput in) throws IOException { - if (in.readBoolean()) { - aggregations = new AggregatorFactories.Builder(in); - } + aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new); explain = in.readOptionalBoolean(); fetchSourceContext = in.readOptionalStreamable(FetchSourceContext::new); boolean hasFieldDataFields = in.readBoolean(); @@ -206,15 +208,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ indexBoost.put(in.readString(), in.readFloat()); } } - if (in.readBoolean()) { - minScore = in.readFloat(); - } - if (in.readBoolean()) { - postQueryBuilder = in.readNamedWriteable(QueryBuilder.class); - } - if (in.readBoolean()) { - queryBuilder = in.readNamedWriteable(QueryBuilder.class); - } + minScore = in.readOptionalFloat(); + postQueryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class); + queryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class); if (in.readBoolean()) { int size = in.readVInt(); rescoreBuilders = new ArrayList<>(); @@ -244,29 +240,20 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ stats.add(in.readString()); } } - if (in.readBoolean()) { - suggestBuilder = new SuggestBuilder(in); - } + suggestBuilder = in.readOptionalWriteable(SuggestBuilder::new); terminateAfter = in.readVInt(); timeoutInMillis = in.readLong(); trackScores = in.readBoolean(); version = in.readOptionalBoolean(); - if (in.readBoolean()) { - ext = in.readBytesReference(); - } + ext = in.readOptionalBytesReference(); profile = in.readBoolean(); - if (in.readBoolean()) { - searchAfterBuilder = new SearchAfterBuilder(in); - } + searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new); + sliceBuilder = in.readOptionalWriteable(SliceBuilder::new); } @Override public void writeTo(StreamOutput out) throws IOException { - boolean hasAggregations = aggregations != null; - out.writeBoolean(hasAggregations); - if (hasAggregations) { - aggregations.writeTo(out); - } + out.writeOptionalWriteable(aggregations); out.writeOptionalBoolean(explain); out.writeOptionalStreamable(fetchSourceContext); boolean hasFieldDataFields = fieldDataFields != null; @@ -296,21 +283,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ out.writeFloat(indexBoost.get(key.value)); } } - boolean hasMinScore = minScore != null; - out.writeBoolean(hasMinScore); - if (hasMinScore) { - out.writeFloat(minScore); - } - boolean hasPostQuery = postQueryBuilder != null; - out.writeBoolean(hasPostQuery); - if (hasPostQuery) { - out.writeNamedWriteable(postQueryBuilder); - } - boolean hasQuery = queryBuilder != null; - out.writeBoolean(hasQuery); - if (hasQuery) { - out.writeNamedWriteable(queryBuilder); - } + out.writeOptionalFloat(minScore); + out.writeOptionalNamedWriteable(postQueryBuilder); + out.writeOptionalNamedWriteable(queryBuilder); boolean hasRescoreBuilders = rescoreBuilders != null; out.writeBoolean(hasRescoreBuilders); if (hasRescoreBuilders) { @@ -344,26 +319,15 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ out.writeString(stat); } } - boolean hasSuggestBuilder = suggestBuilder != null; - out.writeBoolean(hasSuggestBuilder); - if (hasSuggestBuilder) { - suggestBuilder.writeTo(out); - } + out.writeOptionalWriteable(suggestBuilder); out.writeVInt(terminateAfter); out.writeLong(timeoutInMillis); out.writeBoolean(trackScores); out.writeOptionalBoolean(version); - boolean hasExt = ext != null; - out.writeBoolean(hasExt); - if (hasExt) { - out.writeBytesReference(ext); - } + out.writeOptionalBytesReference(ext); out.writeBoolean(profile); - boolean hasSearchAfter = searchAfterBuilder != null; - out.writeBoolean(hasSearchAfter); - if (hasSearchAfter) { - searchAfterBuilder.writeTo(out); - } + out.writeOptionalWriteable(searchAfterBuilder); + out.writeOptionalWriteable(sliceBuilder); } /** @@ -597,6 +561,22 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ return this; } + /** + * Sets a filter that will restrict the search hits, the top hits and the aggregations to a slice of the results + * of the main query. + */ + public SearchSourceBuilder slice(SliceBuilder builder) { + this.sliceBuilder = builder; + return this; + } + + /** + * Gets the slice used to filter the search hits, the top hits and the aggregations. + */ + public SliceBuilder slice() { + return sliceBuilder; + } + /** * Add an aggregation to perform as part of the search. */ @@ -943,6 +923,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ rewrittenBuilder.rescoreBuilders = rescoreBuilders; rewrittenBuilder.scriptFields = scriptFields; rewrittenBuilder.searchAfterBuilder = searchAfterBuilder; + rewrittenBuilder.sliceBuilder = sliceBuilder; rewrittenBuilder.size = size; rewrittenBuilder.sorts = sorts; rewrittenBuilder.stats = stats; @@ -1039,6 +1020,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ } else if (context.getParseFieldMatcher().match(currentFieldName, EXT_FIELD)) { XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().copyCurrentStructure(parser); ext = xContentBuilder.bytes(); + } else if (context.getParseFieldMatcher().match(currentFieldName, SLICE)) { + sliceBuilder = SliceBuilder.fromXContent(context); } else { throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].", parser.getTokenLocation()); @@ -1193,6 +1176,10 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ builder.field(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues()); } + if (sliceBuilder != null) { + builder.field(SLICE.getPreferredName(), sliceBuilder); + } + if (indexBoost != null) { builder.startObject(INDICES_BOOST_FIELD.getPreferredName()); assert !indexBoost.containsKey(null); @@ -1355,7 +1342,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ public int hashCode() { return Objects.hash(aggregations, explain, fetchSourceContext, fieldDataFields, fieldNames, from, highlightBuilder, indexBoost, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields, - size, sorts, searchAfterBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile); + size, sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile); } @Override @@ -1383,6 +1370,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ && Objects.equals(size, other.size) && Objects.equals(sorts, other.sorts) && Objects.equals(searchAfterBuilder, other.searchAfterBuilder) + && Objects.equals(sliceBuilder, other.sliceBuilder) && Objects.equals(stats, other.stats) && Objects.equals(suggestBuilder, other.suggestBuilder) && Objects.equals(terminateAfter, other.terminateAfter) diff --git a/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java index a001ab22ac4..30e994b7656 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java @@ -115,6 +115,9 @@ public class DefaultSearchContext extends SearchContext { private Float minimumScore; private boolean trackScores = false; // when sorting, track scores as well... private FieldDoc searchAfter; + // filter for sliced scroll + private Query sliceFilter; + /** * The original query as sent by the user without the types and aliases * applied. Putting things in here leaks them into highlighting so don't add @@ -122,8 +125,7 @@ public class DefaultSearchContext extends SearchContext { */ private ParsedQuery originalQuery; /** - * Just like originalQuery but with the filters from types and aliases - * applied. + * Just like originalQuery but with the filters from types, aliases and slice applied. */ private ParsedQuery filteredQuery; /** @@ -210,7 +212,7 @@ public class DefaultSearchContext extends SearchContext { if (rescoreContext.window() > maxWindow) { throw new QueryPhaseExecutionException(this, "Rescore window [" + rescoreContext.window() + "] is too large. It must " + "be less than [" + maxWindow + "]. This prevents allocating massive heaps for storing the results to be " - + "rescored. This limit can be set by chaning the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey() + + "rescored. This limit can be set by chaining the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey() + "] index level setting."); } @@ -254,7 +256,17 @@ public class DefaultSearchContext extends SearchContext { @Override @Nullable public Query searchFilter(String[] types) { - return createSearchFilter(types, aliasFilter, mapperService().hasNested()); + Query typesFilter = createSearchFilter(types, aliasFilter, mapperService().hasNested()); + if (sliceFilter == null) { + return typesFilter; + } + if (typesFilter == null) { + return sliceFilter; + } + return new BooleanQuery.Builder() + .add(typesFilter, Occur.FILTER) + .add(sliceFilter, Occur.FILTER) + .build(); } // extracted to static helper method to make writing unit tests easier: @@ -550,6 +562,11 @@ public class DefaultSearchContext extends SearchContext { return searchAfter; } + public SearchContext sliceFilter(Query filter) { + this.sliceFilter = filter; + return this; + } + @Override public SearchContext parsedPostFilter(ParsedQuery postFilter) { this.postFilter = postFilter; diff --git a/core/src/main/java/org/elasticsearch/search/internal/ScrollContext.java b/core/src/main/java/org/elasticsearch/search/internal/ScrollContext.java index 1744b6fd745..1b7bcfb93c7 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ScrollContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ScrollContext.java @@ -29,5 +29,4 @@ public class ScrollContext { public float maxScore; public ScoreDoc lastEmittedDoc; public Scroll scroll; - } diff --git a/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java index 6cacf86d65f..37fb608fd0c 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java @@ -20,7 +20,6 @@ package org.elasticsearch.search.internal; import org.apache.lucene.search.Query; import org.apache.lucene.util.Counter; -import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.search.aggregations.SearchContextAggregations; import org.elasticsearch.search.fetch.FetchSearchResult; diff --git a/core/src/main/java/org/elasticsearch/search/slice/DocValuesSliceQuery.java b/core/src/main/java/org/elasticsearch/search/slice/DocValuesSliceQuery.java new file mode 100644 index 00000000000..4f334ebf0d1 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/slice/DocValuesSliceQuery.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.slice; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Weight; +import org.apache.lucene.search.RandomAccessWeight; +import org.apache.lucene.util.Bits; + +import java.io.IOException; + +/** + * A {@link SliceQuery} that uses the numeric doc values of a field to do the slicing. + * + * NOTE: With deterministic field values this query can be used across different readers safely. + * If updates are accepted on the field you must ensure that the same reader is used for all `slice` queries. + */ +public final class DocValuesSliceQuery extends SliceQuery { + public DocValuesSliceQuery(String field, int id, int max) { + super(field, id, max); + } + + @Override + public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException { + return new RandomAccessWeight(this) { + @Override + protected Bits getMatchingDocs(final LeafReaderContext context) throws IOException { + final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), getField()); + return new Bits() { + @Override + public boolean get(int doc) { + values.setDocument(doc); + for (int i = 0; i < values.count(); i++) { + return contains(Long.hashCode(values.valueAt(i))); + } + return contains(0); + } + + @Override + public int length() { + return context.reader().maxDoc(); + } + }; + } + }; + } +} diff --git a/core/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java b/core/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java new file mode 100644 index 00000000000..97c79aefa71 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java @@ -0,0 +1,251 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.slice; + +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.action.support.ToXContentToBytes; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lucene.search.MatchNoDocsQuery; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexNumericFieldData; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.index.query.QueryShardContext; + +import java.io.IOException; +import java.util.Objects; + +/** + * A slice builder allowing to split a scroll in multiple partitions. + * If the provided field is the "_uid" it uses a {@link org.elasticsearch.search.slice.TermsSliceQuery} + * to do the slicing. The slicing is done at the shard level first and then each shard is splitted in multiple slices. + * For instance if the number of shards is equal to 2 and the user requested 4 slices + * then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard. + * This way the total number of bitsets that we need to build on each shard is bounded by the number of slices + * (instead of {@code numShards*numSlices}). + * Otherwise the provided field must be a numeric and doc_values must be enabled. In that case a + * {@link org.elasticsearch.search.slice.DocValuesSliceQuery} is used to filter the results. + */ +public class SliceBuilder extends ToXContentToBytes implements Writeable { + public static final ParseField FIELD_FIELD = new ParseField("field"); + public static final ParseField ID_FIELD = new ParseField("id"); + public static final ParseField MAX_FIELD = new ParseField("max"); + private final static ObjectParser PARSER = + new ObjectParser<>("slice", SliceBuilder::new); + + static { + PARSER.declareString(SliceBuilder::setField, FIELD_FIELD); + PARSER.declareInt(SliceBuilder::setId, ID_FIELD); + PARSER.declareInt(SliceBuilder::setMax, MAX_FIELD); + } + + /** Name of field to slice against (_uid by default) */ + private String field = UidFieldMapper.NAME; + /** The id of the slice */ + private int id = -1; + /** Max number of slices */ + private int max = -1; + + private SliceBuilder() {} + + public SliceBuilder(int id, int max) { + this(UidFieldMapper.NAME, id, max); + } + + /** + * + * @param field The name of the field + * @param id The id of the slice + * @param max The maximum number of slices + */ + public SliceBuilder(String field, int id, int max) { + setField(field); + setId(id); + setMax(max); + } + + public SliceBuilder(StreamInput in) throws IOException { + this.field = in.readString(); + this.id = in.readVInt(); + this.max = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(field); + out.writeVInt(id); + out.writeVInt(max); + } + + private SliceBuilder setField(String field) { + if (Strings.isEmpty(field)) { + throw new IllegalArgumentException("field name is null or empty"); + } + this.field = field; + return this; + } + + /** + * The name of the field to slice against + */ + public String getField() { + return this.field; + } + + private SliceBuilder setId(int id) { + if (id < 0) { + throw new IllegalArgumentException("id must be greater than or equal to 0"); + } + if (max != -1 && id >= max) { + throw new IllegalArgumentException("max must be greater than id"); + } + this.id = id; + return this; + } + + /** + * The id of the slice. + */ + public int getId() { + return id; + } + + private SliceBuilder setMax(int max) { + if (max <= 1) { + throw new IllegalArgumentException("max must be greater than 1"); + } + if (id != -1 && id >= max) { + throw new IllegalArgumentException("max must be greater than id"); + } + this.max = max; + return this; + } + + /** + * The maximum number of slices. + */ + public int getMax() { + return max; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerToXContent(builder); + builder.endObject(); + return builder; + } + + void innerToXContent(XContentBuilder builder) throws IOException { + builder.field(FIELD_FIELD.getPreferredName(), field); + builder.field(ID_FIELD.getPreferredName(), id); + builder.field(MAX_FIELD.getPreferredName(), max); + } + + public static SliceBuilder fromXContent(QueryParseContext context) throws IOException { + SliceBuilder builder = PARSER.parse(context.parser(), new SliceBuilder(), context); + return builder; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SliceBuilder)) { + return false; + } + + SliceBuilder o = (SliceBuilder) other; + return ((field == null && o.field == null) || field.equals(o.field)) + && id == o.id && o.max == max; + } + + @Override + public int hashCode() { + return Objects.hash(this.field, this.id, this.max); + } + + public Query toFilter(QueryShardContext context, int shardId, int numShards) { + final MappedFieldType type = context.fieldMapper(field); + if (type == null) { + throw new IllegalArgumentException("field " + field + " not found"); + } + + boolean useTermQuery = false; + if (UidFieldMapper.NAME.equals(field)) { + useTermQuery = true; + } else if (type.hasDocValues() == false) { + throw new IllegalArgumentException("cannot load numeric doc values on " + field); + } else { + IndexFieldData ifm = context.getForField(type); + if (ifm instanceof IndexNumericFieldData == false) { + throw new IllegalArgumentException("cannot load numeric doc values on " + field); + } + } + + if (numShards == 1) { + return useTermQuery ? new TermsSliceQuery(field, id, max) : + new DocValuesSliceQuery(field, id, max); + } + if (max >= numShards) { + // the number of slices is greater than the number of shards + // in such case we can reduce the number of requested shards by slice + + // first we check if the slice is responsible of this shard + int targetShard = id % numShards; + if (targetShard != shardId) { + // the shard is not part of this slice, we can skip it. + return new MatchNoDocsQuery("this shard is not part of the slice"); + } + // compute the number of slices where this shard appears + int numSlicesInShard = max / numShards; + int rest = max % numShards; + if (rest > targetShard) { + numSlicesInShard++; + } + + if (numSlicesInShard == 1) { + // this shard has only one slice so we must check all the documents + return new MatchAllDocsQuery(); + } + // get the new slice id for this shard + int shardSlice = id / numShards; + + return useTermQuery ? + new TermsSliceQuery(field, shardSlice, numSlicesInShard) : + new DocValuesSliceQuery(field, shardSlice, numSlicesInShard); + } + // the number of shards is greater than the number of slices + + // check if the shard is assigned to the slice + int targetSlice = shardId % max; + if (id != targetSlice) { + // the shard is not part of this slice, we can skip it. + return new MatchNoDocsQuery("this shard is not part of the slice"); + } + return new MatchAllDocsQuery(); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/slice/SliceQuery.java b/core/src/main/java/org/elasticsearch/search/slice/SliceQuery.java new file mode 100644 index 00000000000..0d87b275403 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/slice/SliceQuery.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.slice; + +import org.apache.lucene.search.Query; + +import java.util.Objects; + +/** + * An abstract {@link Query} that defines an hash function to partition the documents in multiple slices. + */ +public abstract class SliceQuery extends Query { + private final String field; + private final int id; + private final int max; + + /** + * @param field The name of the field + * @param id The id of the slice + * @param max The maximum number of slices + */ + public SliceQuery(String field, int id, int max) { + this.field = field; + this.id = id; + this.max = max; + } + + // Returns true if the value matches the predicate + protected final boolean contains(long value) { + return Math.floorMod(value, max) == id; + } + + public String getField() { + return field; + } + + public int getId() { + return id; + } + + public int getMax() { + return max; + } + + @Override + public boolean equals(Object o) { + if (super.equals(o) == false) { + return false; + } + SliceQuery that = (SliceQuery) o; + return field.equals(that.field) && id == that.id && max == that.max; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), field, id, max); + } + + @Override + public String toString(String f) { + return getClass().getSimpleName() + "[field=" + field + ", id=" + id + ", max=" + max + "]"; + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/slice/TermsSliceQuery.java b/core/src/main/java/org/elasticsearch/search/slice/TermsSliceQuery.java new file mode 100644 index 00000000000..b967a6b6e71 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/slice/TermsSliceQuery.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.slice; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Weight; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.DocIdSetBuilder; + +import java.io.IOException; + +/** + * A {@link SliceQuery} that uses the terms dictionary of a field to do the slicing. + * + * NOTE: The cost of this filter is O(N*M) where N is the number of unique terms in the dictionary + * and M is the average number of documents per term. + * For each segment this filter enumerates the terms dictionary, computes the hash code for each term and fills + * a bit set with the documents of all terms whose hash code matches the predicate. + * NOTE: Documents with no value for that field are ignored. + */ +public final class TermsSliceQuery extends SliceQuery { + public TermsSliceQuery(String field, int id, int max) { + super(field, id, max); + } + + @Override + public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException { + return new ConstantScoreWeight(this) { + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + final DocIdSet disi = build(context.reader()); + final DocIdSetIterator leafIt = disi.iterator(); + return new ConstantScoreScorer(this, score(), leafIt); + } + }; + } + + /** + * Returns a DocIdSet per segments containing the matching docs for the specified slice. + */ + private DocIdSet build(LeafReader reader) throws IOException { + final DocIdSetBuilder builder = new DocIdSetBuilder(reader.maxDoc()); + final Terms terms = reader.terms(getField()); + final TermsEnum te = terms.iterator(); + PostingsEnum docsEnum = null; + for (BytesRef term = te.next(); term != null; term = te.next()) { + int hashCode = term.hashCode(); + if (contains(hashCode)) { + docsEnum = te.postings(docsEnum, PostingsEnum.NONE); + int docId = docsEnum.nextDoc(); + while (docId != DocIdSetIterator.NO_MORE_DOCS) { + builder.add(docId); + docId = docsEnum.nextDoc(); + } + } + } + return builder.build(); + } +} diff --git a/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java b/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java index fc3c90b5165..7ef3fa30e96 100644 --- a/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java +++ b/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java @@ -71,6 +71,7 @@ import org.elasticsearch.search.highlight.HighlightBuilderTests; import org.elasticsearch.search.rescore.QueryRescoreBuilderTests; import org.elasticsearch.search.rescore.QueryRescorerBuilder; import org.elasticsearch.search.searchafter.SearchAfterBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.ScoreSortBuilder; import org.elasticsearch.search.sort.ScriptSortBuilder.ScriptSortType; @@ -426,6 +427,16 @@ public class SearchSourceBuilderTests extends ESTestCase { xContentBuilder.endObject(); builder.ext(xContentBuilder); } + if (randomBoolean()) { + String field = randomBoolean() ? null : randomAsciiOfLengthBetween(5, 20); + int max = randomInt(1000); + int id = randomInt(max-1); + if (field == null) { + builder.slice(new SliceBuilder(id, max)); + } else { + builder.slice(new SliceBuilder(field, id, max)); + } + } return builder; } diff --git a/core/src/test/java/org/elasticsearch/search/slice/DocValuesSliceQueryTests.java b/core/src/test/java/org/elasticsearch/search/slice/DocValuesSliceQueryTests.java new file mode 100644 index 00000000000..dee2ae67914 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/slice/DocValuesSliceQueryTests.java @@ -0,0 +1,124 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.slice; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.QueryUtils; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; + +public class DocValuesSliceQueryTests extends ESTestCase { + + public void testBasics() { + DocValuesSliceQuery query1 = + new DocValuesSliceQuery("field1", 1, 10); + DocValuesSliceQuery query2 = + new DocValuesSliceQuery("field1", 1, 10); + DocValuesSliceQuery query3 = + new DocValuesSliceQuery("field2", 1, 10); + DocValuesSliceQuery query4 = + new DocValuesSliceQuery("field1", 2, 10); + QueryUtils.check(query1); + QueryUtils.checkEqual(query1, query2); + QueryUtils.checkUnequal(query1, query3); + QueryUtils.checkUnequal(query1, query4); + } + + public void testSearch() throws Exception { + final int numDocs = randomIntBetween(100, 200); + final Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + int max = randomIntBetween(2, 10); + int[] sliceCounters1 = new int[max]; + int[] sliceCounters2 = new int[max]; + Set keys = new HashSet<>(); + for (int i = 0; i < numDocs; ++i) { + Document doc = new Document(); + String uuid = UUIDs.base64UUID(); + int intValue = randomInt(); + long doubleValue = NumericUtils.doubleToSortableLong(randomDouble()); + doc.add(new StringField("uuid", uuid, Field.Store.YES)); + doc.add(new SortedNumericDocValuesField("intField", intValue)); + doc.add(new SortedNumericDocValuesField("doubleField", doubleValue)); + w.addDocument(doc); + sliceCounters1[Math.floorMod(Long.hashCode(intValue), max)] ++; + sliceCounters2[Math.floorMod(Long.hashCode(doubleValue), max)] ++; + keys.add(uuid); + } + final IndexReader reader = w.getReader(); + final IndexSearcher searcher = newSearcher(reader); + + for (int id = 0; id < max; id++) { + DocValuesSliceQuery query1 = + new DocValuesSliceQuery("intField", id, max); + assertThat(searcher.count(query1), equalTo(sliceCounters1[id])); + + DocValuesSliceQuery query2 = + new DocValuesSliceQuery("doubleField", id, max); + assertThat(searcher.count(query2), equalTo(sliceCounters2[id])); + searcher.search(query1, new Collector() { + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + return new LeafCollector() { + @Override + public void setScorer(Scorer scorer) throws IOException { + } + + @Override + public void collect(int doc) throws IOException { + Document d = context.reader().document(doc, Collections.singleton("uuid")); + String uuid = d.get("uuid"); + assertThat(keys.contains(uuid), equalTo(true)); + keys.remove(uuid); + } + }; + } + + @Override + public boolean needsScores() { + return false; + } + }); + } + assertThat(keys.size(), equalTo(0)); + w.close(); + reader.close(); + dir.close(); + } +} diff --git a/core/src/test/java/org/elasticsearch/search/slice/SearchSliceIT.java b/core/src/test/java/org/elasticsearch/search/slice/SearchSliceIT.java new file mode 100644 index 00000000000..ad93d14f21f --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/slice/SearchSliceIT.java @@ -0,0 +1,215 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.slice; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchContextException; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.startsWith; + +public class SearchSliceIT extends ESIntegTestCase { + private static final int NUM_DOCS = 1000; + + private int setupIndex(boolean withDocs) throws IOException, ExecutionException, InterruptedException { + String mapping = XContentFactory.jsonBuilder(). + startObject() + .startObject("type") + .startObject("properties") + .startObject("invalid_random_kw") + .field("type", "keyword") + .field("doc_values", "false") + .endObject() + .startObject("random_int") + .field("type", "integer") + .field("doc_values", "true") + .endObject() + .startObject("invalid_random_int") + .field("type", "integer") + .field("doc_values", "false") + .endObject() + .endObject() + .endObject() + .endObject().string(); + int numberOfShards = randomIntBetween(1, 7); + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings("number_of_shards", numberOfShards) + .addMapping("type", mapping)); + ensureGreen(); + + if (withDocs == false) { + return numberOfShards; + } + + List requests = new ArrayList<>(); + for (int i = 0; i < NUM_DOCS; i++) { + XContentBuilder builder = jsonBuilder(); + builder.startObject(); + builder.field("invalid_random_kw", randomAsciiOfLengthBetween(5, 20)); + builder.field("random_int", randomInt()); + builder.field("static_int", 0); + builder.field("invalid_random_int", randomInt()); + builder.endObject(); + requests.add(client().prepareIndex("test", "test").setSource(builder)); + } + indexRandom(true, requests); + return numberOfShards; + } + + public void testDocIdSort() throws Exception { + int numShards = setupIndex(true); + SearchResponse sr = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(0) + .get(); + int numDocs = (int) sr.getHits().getTotalHits(); + assertThat(numDocs, equalTo(NUM_DOCS)); + int max = randomIntBetween(2, numShards*3); + for (String field : new String[]{"_uid", "random_int", "static_int"}) { + int fetchSize = randomIntBetween(10, 100); + SearchRequestBuilder request = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) + .setSize(fetchSize) + .addSort(SortBuilders.fieldSort("_doc")); + assertSearchSlicesWithScroll(request, field, max); + } + } + + public void testNumericSort() throws Exception { + int numShards = setupIndex(true); + SearchResponse sr = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(0) + .get(); + int numDocs = (int) sr.getHits().getTotalHits(); + assertThat(numDocs, equalTo(NUM_DOCS)); + + int max = randomIntBetween(2, numShards*3); + for (String field : new String[]{"_uid", "random_int", "static_int"}) { + int fetchSize = randomIntBetween(10, 100); + SearchRequestBuilder request = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) + .addSort(SortBuilders.fieldSort("random_int")) + .setSize(fetchSize); + assertSearchSlicesWithScroll(request, field, max); + } + } + + public void testInvalidFields() throws Exception { + setupIndex(false); + SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class, + () -> client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) + .slice(new SliceBuilder("invalid_random_int", 0, 10)) + .get()); + Throwable rootCause = findRootCause(exc); + assertThat(rootCause.getClass(), equalTo(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), + startsWith("cannot load numeric doc values")); + + exc = expectThrows(SearchPhaseExecutionException.class, () -> client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) + .slice(new SliceBuilder("invalid_random_kw", 0, 10)) + .get()); + rootCause = findRootCause(exc); + assertThat(rootCause.getClass(), equalTo(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), + startsWith("cannot load numeric doc values")); + } + + public void testInvalidQuery() throws Exception { + setupIndex(false); + SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class, + () -> client().prepareSearch() + .setQuery(matchAllQuery()) + .slice(new SliceBuilder("invalid_random_int", 0, 10)) + .get()); + Throwable rootCause = findRootCause(exc); + assertThat(rootCause.getClass(), equalTo(SearchContextException.class)); + assertThat(rootCause.getMessage(), + equalTo("`slice` cannot be used outside of a scroll context")); + } + + private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice) { + int totalResults = 0; + List keys = new ArrayList<>(); + for (int id = 0; id < numSlice; id++) { + SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice); + SearchResponse searchResponse = request.slice(sliceBuilder).get(); + totalResults += searchResponse.getHits().getHits().length; + int expectedSliceResults = (int) searchResponse.getHits().getTotalHits(); + int numSliceResults = searchResponse.getHits().getHits().length; + String scrollId = searchResponse.getScrollId(); + for (SearchHit hit : searchResponse.getHits().getHits()) { + keys.add(hit.getId()); + } + while (searchResponse.getHits().getHits().length > 0) { + searchResponse = client().prepareSearchScroll("test") + .setScrollId(scrollId) + .setScroll(new Scroll(TimeValue.timeValueSeconds(10))) + .get(); + scrollId = searchResponse.getScrollId(); + totalResults += searchResponse.getHits().getHits().length; + numSliceResults += searchResponse.getHits().getHits().length; + for (SearchHit hit : searchResponse.getHits().getHits()) { + keys.add(hit.getId()); + } + } + assertThat(numSliceResults, equalTo(expectedSliceResults)); + clearScroll(scrollId); + } + assertThat(totalResults, equalTo(NUM_DOCS)); + assertThat(keys.size(), equalTo(NUM_DOCS)); + assertThat(new HashSet(keys).size(), equalTo(NUM_DOCS)); + } + + private Throwable findRootCause(Exception e) { + Throwable ret = e; + while (ret.getCause() != null) { + ret = ret.getCause(); + } + return ret; + } +} diff --git a/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java new file mode 100644 index 00000000000..554b9436e58 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -0,0 +1,340 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.slice; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lucene.search.MatchNoDocsQuery; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.fielddata.IndexNumericFieldData; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.index.query.QueryParser; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.indices.query.IndicesQueriesRegistry; +import org.elasticsearch.test.ESTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SliceBuilderTests extends ESTestCase { + private static final int MAX_SLICE = 20; + private static NamedWriteableRegistry namedWriteableRegistry; + private static IndicesQueriesRegistry indicesQueriesRegistry; + + /** + * setup for the whole base test class + */ + @BeforeClass + public static void init() { + namedWriteableRegistry = new NamedWriteableRegistry(); + indicesQueriesRegistry = new IndicesQueriesRegistry(); + QueryParser parser = MatchAllQueryBuilder::fromXContent; + indicesQueriesRegistry.register(parser, MatchAllQueryBuilder.QUERY_NAME_FIELD); + } + + @AfterClass + public static void afterClass() throws Exception { + namedWriteableRegistry = null; + indicesQueriesRegistry = null; + } + + private final SliceBuilder randomSliceBuilder() throws IOException { + int max = randomIntBetween(2, MAX_SLICE); + if (max == 0) max++; + int id = randomInt(max - 1); + String field = randomAsciiOfLengthBetween(5, 20); + return new SliceBuilder(field, id, max); + } + + private static SliceBuilder serializedCopy(SliceBuilder original) throws IOException { + try (BytesStreamOutput output = new BytesStreamOutput()) { + original.writeTo(output); + try (StreamInput in = + new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) { + return new SliceBuilder(in); + } + } + } + + public void testSerialization() throws Exception { + SliceBuilder original = randomSliceBuilder(); + SliceBuilder deserialized = serializedCopy(original); + assertEquals(deserialized, original); + assertEquals(deserialized.hashCode(), original.hashCode()); + assertNotSame(deserialized, original); + } + + public void testEqualsAndHashcode() throws Exception { + SliceBuilder firstBuilder = randomSliceBuilder(); + assertFalse("sliceBuilder is equal to null", firstBuilder.equals(null)); + assertFalse("sliceBuilder is equal to incompatible type", firstBuilder.equals("")); + assertTrue("sliceBuilder is not equal to self", firstBuilder.equals(firstBuilder)); + assertThat("same searchFrom's hashcode returns different values if called multiple times", + firstBuilder.hashCode(), equalTo(firstBuilder.hashCode())); + + SliceBuilder secondBuilder = serializedCopy(firstBuilder); + assertTrue("sliceBuilder is not equal to self", secondBuilder.equals(secondBuilder)); + assertTrue("sliceBuilder is not equal to its copy", firstBuilder.equals(secondBuilder)); + assertTrue("equals is not symmetric", secondBuilder.equals(firstBuilder)); + assertThat("sliceBuilder copy's hashcode is different from original hashcode", secondBuilder.hashCode(), + equalTo(firstBuilder.hashCode())); + SliceBuilder thirdBuilder = serializedCopy(secondBuilder); + assertTrue("sliceBuilder is not equal to self", thirdBuilder.equals(thirdBuilder)); + assertTrue("sliceBuilder is not equal to its copy", secondBuilder.equals(thirdBuilder)); + assertThat("sliceBuilder copy's hashcode is different from original hashcode", secondBuilder.hashCode(), + equalTo(thirdBuilder.hashCode())); + assertTrue("equals is not transitive", firstBuilder.equals(thirdBuilder)); + assertThat("sliceBuilder copy's hashcode is different from original hashcode", firstBuilder.hashCode(), + equalTo(thirdBuilder.hashCode())); + assertTrue("sliceBuilder is not symmetric", thirdBuilder.equals(secondBuilder)); + assertTrue("sliceBuilder is not symmetric", thirdBuilder.equals(firstBuilder)); + } + + public void testFromXContent() throws Exception { + SliceBuilder sliceBuilder = randomSliceBuilder(); + XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); + if (randomBoolean()) { + builder.prettyPrint(); + } + builder.startObject(); + sliceBuilder.innerToXContent(builder); + builder.endObject(); + XContentParser parser = XContentHelper.createParser(shuffleXContent(builder).bytes()); + QueryParseContext context = new QueryParseContext(indicesQueriesRegistry, parser, + ParseFieldMatcher.STRICT); + SliceBuilder secondSliceBuilder = SliceBuilder.fromXContent(context); + assertNotSame(sliceBuilder, secondSliceBuilder); + assertEquals(sliceBuilder, secondSliceBuilder); + assertEquals(sliceBuilder.hashCode(), secondSliceBuilder.hashCode()); + } + + public void testInvalidArguments() throws Exception { + Exception e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", -1, 10)); + assertEquals(e.getMessage(), "id must be greater than or equal to 0"); + + e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, -1)); + assertEquals(e.getMessage(), "max must be greater than 1"); + + e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 0)); + assertEquals(e.getMessage(), "max must be greater than 1"); + + e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 5)); + assertEquals(e.getMessage(), "max must be greater than id"); + + e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1000, 1000)); + assertEquals(e.getMessage(), "max must be greater than id"); + e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1001, 1000)); + assertEquals(e.getMessage(), "max must be greater than id"); + } + + public void testToFilter() throws IOException { + Directory dir = new RAMDirectory(); + try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { + writer.commit(); + } + QueryShardContext context = mock(QueryShardContext.class); + try (IndexReader reader = DirectoryReader.open(dir)) { + MappedFieldType fieldType = new MappedFieldType() { + @Override + public MappedFieldType clone() { + return null; + } + + @Override + public String typeName() { + return null; + } + + @Override + public Query termQuery(Object value, @Nullable QueryShardContext context) { + return null; + } + }; + fieldType.setName(UidFieldMapper.NAME); + fieldType.setHasDocValues(false); + when(context.fieldMapper(UidFieldMapper.NAME)).thenReturn(fieldType); + when(context.getIndexReader()).thenReturn(reader); + SliceBuilder builder = new SliceBuilder(5, 10); + Query query = builder.toFilter(context, 0, 1); + assertThat(query, instanceOf(TermsSliceQuery.class)); + + assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + try (IndexReader newReader = DirectoryReader.open(dir)) { + when(context.getIndexReader()).thenReturn(newReader); + assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + } + } + + try (IndexReader reader = DirectoryReader.open(dir)) { + MappedFieldType fieldType = new MappedFieldType() { + @Override + public MappedFieldType clone() { + return null; + } + + @Override + public String typeName() { + return null; + } + + @Override + public Query termQuery(Object value, @Nullable QueryShardContext context) { + return null; + } + }; + fieldType.setName("field_doc_values"); + fieldType.setHasDocValues(true); + fieldType.setDocValuesType(DocValuesType.SORTED_NUMERIC); + when(context.fieldMapper("field_doc_values")).thenReturn(fieldType); + when(context.getIndexReader()).thenReturn(reader); + IndexNumericFieldData fd = mock(IndexNumericFieldData.class); + when(context.getForField(fieldType)).thenReturn(fd); + SliceBuilder builder = new SliceBuilder("field_doc_values", 5, 10); + Query query = builder.toFilter(context, 0, 1); + assertThat(query, instanceOf(DocValuesSliceQuery.class)); + + assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + try (IndexReader newReader = DirectoryReader.open(dir)) { + when(context.getIndexReader()).thenReturn(newReader); + assertThat(builder.toFilter(context, 0, 1), equalTo(query)); + } + + // numSlices > numShards + int numSlices = randomIntBetween(10, 100); + int numShards = randomIntBetween(1, 9); + Map numSliceMap = new HashMap<>(); + for (int i = 0; i < numSlices; i++) { + for (int j = 0; j < numShards; j++) { + SliceBuilder slice = new SliceBuilder("_uid", i, numSlices); + Query q = slice.toFilter(context, j, numShards); + if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) { + AtomicInteger count = numSliceMap.get(j); + if (count == null) { + count = new AtomicInteger(0); + numSliceMap.put(j, count); + } + count.incrementAndGet(); + if (q instanceof MatchAllDocsQuery) { + assertThat(count.get(), equalTo(1)); + } + } else { + assertThat(q, instanceOf(MatchNoDocsQuery.class)); + } + } + } + int total = 0; + for (Map.Entry e : numSliceMap.entrySet()) { + total += e.getValue().get(); + } + assertThat(total, equalTo(numSlices)); + + // numShards > numSlices + numShards = randomIntBetween(3, 100); + numSlices = randomInt(numShards-1); + List targetShards = new ArrayList<>(); + for (int i = 0; i < numSlices; i++) { + for (int j = 0; j < numShards; j++) { + SliceBuilder slice = new SliceBuilder("_uid", i, numSlices); + Query q = slice.toFilter(context, j, numShards); + if (q instanceof MatchNoDocsQuery == false) { + assertThat(q, instanceOf(MatchAllDocsQuery.class)); + targetShards.add(j); + } + } + } + assertThat(targetShards.size(), equalTo(numShards)); + assertThat(new HashSet<>(targetShards).size(), equalTo(numShards)); + + // numShards == numSlices + numShards = randomIntBetween(2, 10); + numSlices = numShards; + for (int i = 0; i < numSlices; i++) { + for (int j = 0; j < numShards; j++) { + SliceBuilder slice = new SliceBuilder("_uid", i, numSlices); + Query q = slice.toFilter(context, j, numShards); + if (i == j) { + assertThat(q, instanceOf(MatchAllDocsQuery.class)); + } else { + assertThat(q, instanceOf(MatchNoDocsQuery.class)); + } + } + } + } + + try (IndexReader reader = DirectoryReader.open(dir)) { + MappedFieldType fieldType = new MappedFieldType() { + @Override + public MappedFieldType clone() { + return null; + } + + @Override + public String typeName() { + return null; + } + + @Override + public Query termQuery(Object value, @Nullable QueryShardContext context) { + return null; + } + }; + fieldType.setName("field_without_doc_values"); + when(context.fieldMapper("field_without_doc_values")).thenReturn(fieldType); + when(context.getIndexReader()).thenReturn(reader); + SliceBuilder builder = new SliceBuilder("field_without_doc_values", 5, 10); + IllegalArgumentException exc = + expectThrows(IllegalArgumentException.class, () -> builder.toFilter(context, 0, 1)); + assertThat(exc.getMessage(), containsString("cannot load numeric doc values")); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/search/slice/TermsSliceQueryTests.java b/core/src/test/java/org/elasticsearch/search/slice/TermsSliceQueryTests.java new file mode 100644 index 00000000000..e00dabc6363 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/slice/TermsSliceQueryTests.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.slice; + +import org.apache.lucene.analysis.core.KeywordAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.QueryUtils; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; + +public class TermsSliceQueryTests extends ESTestCase { + + public void testBasics() { + TermsSliceQuery query1 = + new TermsSliceQuery("field1", 1, 10); + TermsSliceQuery query2 = + new TermsSliceQuery("field1", 1, 10); + TermsSliceQuery query3 = + new TermsSliceQuery("field2", 1, 10); + TermsSliceQuery query4 = + new TermsSliceQuery("field1", 2, 10); + QueryUtils.check(query1); + QueryUtils.checkEqual(query1, query2); + QueryUtils.checkUnequal(query1, query3); + QueryUtils.checkUnequal(query1, query4); + } + + public void testSearch() throws Exception { + final int numDocs = randomIntBetween(100, 200); + final Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir, new KeywordAnalyzer()); + int max = randomIntBetween(2, 10); + int[] sliceCounters = new int[max]; + Set keys = new HashSet<>(); + for (int i = 0; i < numDocs; ++i) { + Document doc = new Document(); + String uuid = UUIDs.base64UUID(); + BytesRef br = new BytesRef(uuid); + int id = Math.floorMod(br.hashCode(), max); + sliceCounters[id] ++; + doc.add(new StringField("uuid", uuid, Field.Store.YES)); + w.addDocument(doc); + keys.add(uuid); + } + final IndexReader reader = w.getReader(); + final IndexSearcher searcher = newSearcher(reader); + + for (int id = 0; id < max; id++) { + TermsSliceQuery query1 = + new TermsSliceQuery("uuid", id, max); + assertThat(searcher.count(query1), equalTo(sliceCounters[id])); + searcher.search(query1, new Collector() { + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + return new LeafCollector() { + @Override + public void setScorer(Scorer scorer) throws IOException { + } + + @Override + public void collect(int doc) throws IOException { + Document d = context.reader().document(doc, Collections.singleton("uuid")); + String uuid = d.get("uuid"); + assertThat(keys.contains(uuid), equalTo(true)); + keys.remove(uuid); + } + }; + } + + @Override + public boolean needsScores() { + return false; + } + }); + } + assertThat(keys.size(), equalTo(0)); + w.close(); + reader.close(); + dir.close(); + } +} diff --git a/docs/reference/search/request/scroll.asciidoc b/docs/reference/search/request/scroll.asciidoc index e18593d21cc..9f9558aa979 100644 --- a/docs/reference/search/request/scroll.asciidoc +++ b/docs/reference/search/request/scroll.asciidoc @@ -175,3 +175,92 @@ curl -XDELETE localhost:9200/_search/scroll \ -d 'c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1,aGVuRmV0Y2g7NTsxOnkxaDZ' --------------------------------------- + +==== Sliced Scroll + +For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which +can be consumed independently: + +[source,js] +-------------------------------------------------- +curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d ' +{ + "slice": { + "id": 0, <1> + "max": 2 <2> + }, + "query": { + "match" : { + "title" : "elasticsearch" + } + } +} +' + +curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d ' +{ + "slice": { + "id": 1, + "max": 2 + }, + "query": { + "match" : { + "title" : "elasticsearch" + } + } +} +' +-------------------------------------------------- + +<1> The id of the slice +<2> The maximum number of slices + +The result from the first request returned documents that belong to the first slice (id: 0) and the result from the +second request returned documents that belong to the second slice. Since the maximum number of slices is set to 2 + the union of the results of the two requests is equivalent to the results of a scroll query without slicing. +By default the splitting is done on the shards first and then locally on each shard using the _uid field +with the following formula: +`slice(doc) = floorMod(hashCode(doc._uid), max)` +For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned +to the first shard and the slices 1 and 3 are assigned to the second shard. + +Each scroll is independent and can be processed in parallel like any scroll request. + +NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals +to N bits per slice where N is the total number of documents in the shard. +After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of + sliced query you perform in parallel to avoid the memory explosion. + +To avoid this cost entirely it is possible to use the `doc_values` of another field to do the slicing +but the user must ensure that the field has the following properties: + + * The field is numeric. + + * `doc_values` are enabled on that field + + * Every document should contain a single value. If a document has multiple values for the specified field, the first value is used. + + * The value for each document should be set once when the document is created and never updated. This ensures that each +slice gets deterministic results. + + * The cardinality of the field should be high. This ensures that each slice gets approximately the same amount of documents. + +[source,js] +-------------------------------------------------- +curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d ' +{ + "slice": { + "field": "my_random_integer_field", + "id": 0, + "max": 10 + }, + "query": { + "match" : { + "title" : "elasticsearch" + } + } +} +' +-------------------------------------------------- + +For append only time-based indices, the `timestamp` field can be used safely. \ No newline at end of file