Merge pull request #18237 from jimferenczi/scroll_by_slice

Add the ability to partition a scroll in multiple slices.
This commit is contained in:
Jim Ferenczi 2016-06-07 16:23:57 +02:00
commit 7f20b3a7ad
16 changed files with 1462 additions and 63 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.script.Template;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.highlight.HighlightBuilder;
import org.elasticsearch.search.rescore.RescoreBuilder;
@ -352,6 +353,11 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
return this;
}
public SearchRequestBuilder slice(SliceBuilder builder) {
sourceBuilder().slice(builder);
return this;
}
/**
* Applies when sorting, and controls if scores will be tracked as well. Defaults to
* <tt>false</tt>.

View File

@ -821,6 +821,15 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());
context.searchAfter(fieldDoc);
}
if (source.slice() != null) {
if (context.scrollContext() == null) {
throw new SearchContextException(context, "`slice` cannot be used outside of a scroll context");
}
context.sliceFilter(source.slice().toFilter(queryShardContext,
context.shardTarget().getShardId().getId(),
queryShardContext.getIndexSettings().getNumberOfShards()));
}
}
private static final int[] EMPTY_DOC_IDS = new int[0];

View File

@ -42,6 +42,7 @@ import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
@ -98,6 +99,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
public static final ParseField EXT_FIELD = new ParseField("ext");
public static final ParseField PROFILE_FIELD = new ParseField("profile");
public static final ParseField SEARCH_AFTER = new ParseField("search_after");
public static final ParseField SLICE = new ParseField("slice");
public static SearchSourceBuilder fromXContent(QueryParseContext context, AggregatorParsers aggParsers,
Suggesters suggesters) throws IOException {
@ -138,6 +140,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
private SearchAfterBuilder searchAfterBuilder;
private SliceBuilder sliceBuilder;
private Float minScore;
private long timeoutInMillis = -1;
@ -175,9 +179,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
* Read from a stream.
*/
public SearchSourceBuilder(StreamInput in) throws IOException {
if (in.readBoolean()) {
aggregations = new AggregatorFactories.Builder(in);
}
aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
explain = in.readOptionalBoolean();
fetchSourceContext = in.readOptionalStreamable(FetchSourceContext::new);
boolean hasFieldDataFields = in.readBoolean();
@ -206,15 +208,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
indexBoost.put(in.readString(), in.readFloat());
}
}
if (in.readBoolean()) {
minScore = in.readFloat();
}
if (in.readBoolean()) {
postQueryBuilder = in.readNamedWriteable(QueryBuilder.class);
}
if (in.readBoolean()) {
queryBuilder = in.readNamedWriteable(QueryBuilder.class);
}
minScore = in.readOptionalFloat();
postQueryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
queryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
if (in.readBoolean()) {
int size = in.readVInt();
rescoreBuilders = new ArrayList<>();
@ -244,29 +240,20 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
stats.add(in.readString());
}
}
if (in.readBoolean()) {
suggestBuilder = new SuggestBuilder(in);
}
suggestBuilder = in.readOptionalWriteable(SuggestBuilder::new);
terminateAfter = in.readVInt();
timeoutInMillis = in.readLong();
trackScores = in.readBoolean();
version = in.readOptionalBoolean();
if (in.readBoolean()) {
ext = in.readBytesReference();
}
ext = in.readOptionalBytesReference();
profile = in.readBoolean();
if (in.readBoolean()) {
searchAfterBuilder = new SearchAfterBuilder(in);
}
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
sliceBuilder = in.readOptionalWriteable(SliceBuilder::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
boolean hasAggregations = aggregations != null;
out.writeBoolean(hasAggregations);
if (hasAggregations) {
aggregations.writeTo(out);
}
out.writeOptionalWriteable(aggregations);
out.writeOptionalBoolean(explain);
out.writeOptionalStreamable(fetchSourceContext);
boolean hasFieldDataFields = fieldDataFields != null;
@ -296,21 +283,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
out.writeFloat(indexBoost.get(key.value));
}
}
boolean hasMinScore = minScore != null;
out.writeBoolean(hasMinScore);
if (hasMinScore) {
out.writeFloat(minScore);
}
boolean hasPostQuery = postQueryBuilder != null;
out.writeBoolean(hasPostQuery);
if (hasPostQuery) {
out.writeNamedWriteable(postQueryBuilder);
}
boolean hasQuery = queryBuilder != null;
out.writeBoolean(hasQuery);
if (hasQuery) {
out.writeNamedWriteable(queryBuilder);
}
out.writeOptionalFloat(minScore);
out.writeOptionalNamedWriteable(postQueryBuilder);
out.writeOptionalNamedWriteable(queryBuilder);
boolean hasRescoreBuilders = rescoreBuilders != null;
out.writeBoolean(hasRescoreBuilders);
if (hasRescoreBuilders) {
@ -344,26 +319,15 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
out.writeString(stat);
}
}
boolean hasSuggestBuilder = suggestBuilder != null;
out.writeBoolean(hasSuggestBuilder);
if (hasSuggestBuilder) {
suggestBuilder.writeTo(out);
}
out.writeOptionalWriteable(suggestBuilder);
out.writeVInt(terminateAfter);
out.writeLong(timeoutInMillis);
out.writeBoolean(trackScores);
out.writeOptionalBoolean(version);
boolean hasExt = ext != null;
out.writeBoolean(hasExt);
if (hasExt) {
out.writeBytesReference(ext);
}
out.writeOptionalBytesReference(ext);
out.writeBoolean(profile);
boolean hasSearchAfter = searchAfterBuilder != null;
out.writeBoolean(hasSearchAfter);
if (hasSearchAfter) {
searchAfterBuilder.writeTo(out);
}
out.writeOptionalWriteable(searchAfterBuilder);
out.writeOptionalWriteable(sliceBuilder);
}
/**
@ -597,6 +561,22 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
return this;
}
/**
* Sets a filter that will restrict the search hits, the top hits and the aggregations to a slice of the results
* of the main query.
*/
public SearchSourceBuilder slice(SliceBuilder builder) {
this.sliceBuilder = builder;
return this;
}
/**
* Gets the slice used to filter the search hits, the top hits and the aggregations.
*/
public SliceBuilder slice() {
return sliceBuilder;
}
/**
* Add an aggregation to perform as part of the search.
*/
@ -943,6 +923,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
rewrittenBuilder.rescoreBuilders = rescoreBuilders;
rewrittenBuilder.scriptFields = scriptFields;
rewrittenBuilder.searchAfterBuilder = searchAfterBuilder;
rewrittenBuilder.sliceBuilder = sliceBuilder;
rewrittenBuilder.size = size;
rewrittenBuilder.sorts = sorts;
rewrittenBuilder.stats = stats;
@ -1039,6 +1020,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
} else if (context.getParseFieldMatcher().match(currentFieldName, EXT_FIELD)) {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().copyCurrentStructure(parser);
ext = xContentBuilder.bytes();
} else if (context.getParseFieldMatcher().match(currentFieldName, SLICE)) {
sliceBuilder = SliceBuilder.fromXContent(context);
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation());
@ -1193,6 +1176,10 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
builder.field(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues());
}
if (sliceBuilder != null) {
builder.field(SLICE.getPreferredName(), sliceBuilder);
}
if (indexBoost != null) {
builder.startObject(INDICES_BOOST_FIELD.getPreferredName());
assert !indexBoost.containsKey(null);
@ -1355,7 +1342,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
public int hashCode() {
return Objects.hash(aggregations, explain, fetchSourceContext, fieldDataFields, fieldNames, from,
highlightBuilder, indexBoost, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields,
size, sorts, searchAfterBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile);
size, sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile);
}
@Override
@ -1383,6 +1370,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
&& Objects.equals(size, other.size)
&& Objects.equals(sorts, other.sorts)
&& Objects.equals(searchAfterBuilder, other.searchAfterBuilder)
&& Objects.equals(sliceBuilder, other.sliceBuilder)
&& Objects.equals(stats, other.stats)
&& Objects.equals(suggestBuilder, other.suggestBuilder)
&& Objects.equals(terminateAfter, other.terminateAfter)

View File

@ -115,6 +115,9 @@ public class DefaultSearchContext extends SearchContext {
private Float minimumScore;
private boolean trackScores = false; // when sorting, track scores as well...
private FieldDoc searchAfter;
// filter for sliced scroll
private Query sliceFilter;
/**
* The original query as sent by the user without the types and aliases
* applied. Putting things in here leaks them into highlighting so don't add
@ -122,8 +125,7 @@ public class DefaultSearchContext extends SearchContext {
*/
private ParsedQuery originalQuery;
/**
* Just like originalQuery but with the filters from types and aliases
* applied.
* Just like originalQuery but with the filters from types, aliases and slice applied.
*/
private ParsedQuery filteredQuery;
/**
@ -210,7 +212,7 @@ public class DefaultSearchContext extends SearchContext {
if (rescoreContext.window() > maxWindow) {
throw new QueryPhaseExecutionException(this, "Rescore window [" + rescoreContext.window() + "] is too large. It must "
+ "be less than [" + maxWindow + "]. This prevents allocating massive heaps for storing the results to be "
+ "rescored. This limit can be set by chaning the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey()
+ "rescored. This limit can be set by chaining the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey()
+ "] index level setting.");
}
@ -254,7 +256,17 @@ public class DefaultSearchContext extends SearchContext {
@Override
@Nullable
public Query searchFilter(String[] types) {
return createSearchFilter(types, aliasFilter, mapperService().hasNested());
Query typesFilter = createSearchFilter(types, aliasFilter, mapperService().hasNested());
if (sliceFilter == null) {
return typesFilter;
}
if (typesFilter == null) {
return sliceFilter;
}
return new BooleanQuery.Builder()
.add(typesFilter, Occur.FILTER)
.add(sliceFilter, Occur.FILTER)
.build();
}
// extracted to static helper method to make writing unit tests easier:
@ -550,6 +562,11 @@ public class DefaultSearchContext extends SearchContext {
return searchAfter;
}
public SearchContext sliceFilter(Query filter) {
this.sliceFilter = filter;
return this;
}
@Override
public SearchContext parsedPostFilter(ParsedQuery postFilter) {
this.postFilter = postFilter;

View File

@ -29,5 +29,4 @@ public class ScrollContext {
public float maxScore;
public ScoreDoc lastEmittedDoc;
public Scroll scroll;
}

View File

@ -20,7 +20,6 @@ package org.elasticsearch.search.internal;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.search.aggregations.SearchContextAggregations;
import org.elasticsearch.search.fetch.FetchSearchResult;

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.slice;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.RandomAccessWeight;
import org.apache.lucene.util.Bits;
import java.io.IOException;
/**
* A {@link SliceQuery} that uses the numeric doc values of a field to do the slicing.
*
* <b>NOTE</b>: With deterministic field values this query can be used across different readers safely.
* If updates are accepted on the field you must ensure that the same reader is used for all `slice` queries.
*/
public final class DocValuesSliceQuery extends SliceQuery {
public DocValuesSliceQuery(String field, int id, int max) {
super(field, id, max);
}
@Override
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
return new RandomAccessWeight(this) {
@Override
protected Bits getMatchingDocs(final LeafReaderContext context) throws IOException {
final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), getField());
return new Bits() {
@Override
public boolean get(int doc) {
values.setDocument(doc);
for (int i = 0; i < values.count(); i++) {
return contains(Long.hashCode(values.valueAt(i)));
}
return contains(0);
}
@Override
public int length() {
return context.reader().maxDoc();
}
};
}
};
}
}

View File

@ -0,0 +1,251 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.slice;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.search.MatchNoDocsQuery;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryShardContext;
import java.io.IOException;
import java.util.Objects;
/**
* A slice builder allowing to split a scroll in multiple partitions.
* If the provided field is the "_uid" it uses a {@link org.elasticsearch.search.slice.TermsSliceQuery}
* to do the slicing. The slicing is done at the shard level first and then each shard is splitted in multiple slices.
* For instance if the number of shards is equal to 2 and the user requested 4 slices
* then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard.
* This way the total number of bitsets that we need to build on each shard is bounded by the number of slices
* (instead of {@code numShards*numSlices}).
* Otherwise the provided field must be a numeric and doc_values must be enabled. In that case a
* {@link org.elasticsearch.search.slice.DocValuesSliceQuery} is used to filter the results.
*/
public class SliceBuilder extends ToXContentToBytes implements Writeable {
public static final ParseField FIELD_FIELD = new ParseField("field");
public static final ParseField ID_FIELD = new ParseField("id");
public static final ParseField MAX_FIELD = new ParseField("max");
private final static ObjectParser<SliceBuilder, QueryParseContext> PARSER =
new ObjectParser<>("slice", SliceBuilder::new);
static {
PARSER.declareString(SliceBuilder::setField, FIELD_FIELD);
PARSER.declareInt(SliceBuilder::setId, ID_FIELD);
PARSER.declareInt(SliceBuilder::setMax, MAX_FIELD);
}
/** Name of field to slice against (_uid by default) */
private String field = UidFieldMapper.NAME;
/** The id of the slice */
private int id = -1;
/** Max number of slices */
private int max = -1;
private SliceBuilder() {}
public SliceBuilder(int id, int max) {
this(UidFieldMapper.NAME, id, max);
}
/**
*
* @param field The name of the field
* @param id The id of the slice
* @param max The maximum number of slices
*/
public SliceBuilder(String field, int id, int max) {
setField(field);
setId(id);
setMax(max);
}
public SliceBuilder(StreamInput in) throws IOException {
this.field = in.readString();
this.id = in.readVInt();
this.max = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(field);
out.writeVInt(id);
out.writeVInt(max);
}
private SliceBuilder setField(String field) {
if (Strings.isEmpty(field)) {
throw new IllegalArgumentException("field name is null or empty");
}
this.field = field;
return this;
}
/**
* The name of the field to slice against
*/
public String getField() {
return this.field;
}
private SliceBuilder setId(int id) {
if (id < 0) {
throw new IllegalArgumentException("id must be greater than or equal to 0");
}
if (max != -1 && id >= max) {
throw new IllegalArgumentException("max must be greater than id");
}
this.id = id;
return this;
}
/**
* The id of the slice.
*/
public int getId() {
return id;
}
private SliceBuilder setMax(int max) {
if (max <= 1) {
throw new IllegalArgumentException("max must be greater than 1");
}
if (id != -1 && id >= max) {
throw new IllegalArgumentException("max must be greater than id");
}
this.max = max;
return this;
}
/**
* The maximum number of slices.
*/
public int getMax() {
return max;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder);
builder.endObject();
return builder;
}
void innerToXContent(XContentBuilder builder) throws IOException {
builder.field(FIELD_FIELD.getPreferredName(), field);
builder.field(ID_FIELD.getPreferredName(), id);
builder.field(MAX_FIELD.getPreferredName(), max);
}
public static SliceBuilder fromXContent(QueryParseContext context) throws IOException {
SliceBuilder builder = PARSER.parse(context.parser(), new SliceBuilder(), context);
return builder;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof SliceBuilder)) {
return false;
}
SliceBuilder o = (SliceBuilder) other;
return ((field == null && o.field == null) || field.equals(o.field))
&& id == o.id && o.max == max;
}
@Override
public int hashCode() {
return Objects.hash(this.field, this.id, this.max);
}
public Query toFilter(QueryShardContext context, int shardId, int numShards) {
final MappedFieldType type = context.fieldMapper(field);
if (type == null) {
throw new IllegalArgumentException("field " + field + " not found");
}
boolean useTermQuery = false;
if (UidFieldMapper.NAME.equals(field)) {
useTermQuery = true;
} else if (type.hasDocValues() == false) {
throw new IllegalArgumentException("cannot load numeric doc values on " + field);
} else {
IndexFieldData ifm = context.getForField(type);
if (ifm instanceof IndexNumericFieldData == false) {
throw new IllegalArgumentException("cannot load numeric doc values on " + field);
}
}
if (numShards == 1) {
return useTermQuery ? new TermsSliceQuery(field, id, max) :
new DocValuesSliceQuery(field, id, max);
}
if (max >= numShards) {
// the number of slices is greater than the number of shards
// in such case we can reduce the number of requested shards by slice
// first we check if the slice is responsible of this shard
int targetShard = id % numShards;
if (targetShard != shardId) {
// the shard is not part of this slice, we can skip it.
return new MatchNoDocsQuery("this shard is not part of the slice");
}
// compute the number of slices where this shard appears
int numSlicesInShard = max / numShards;
int rest = max % numShards;
if (rest > targetShard) {
numSlicesInShard++;
}
if (numSlicesInShard == 1) {
// this shard has only one slice so we must check all the documents
return new MatchAllDocsQuery();
}
// get the new slice id for this shard
int shardSlice = id / numShards;
return useTermQuery ?
new TermsSliceQuery(field, shardSlice, numSlicesInShard) :
new DocValuesSliceQuery(field, shardSlice, numSlicesInShard);
}
// the number of shards is greater than the number of slices
// check if the shard is assigned to the slice
int targetSlice = shardId % max;
if (id != targetSlice) {
// the shard is not part of this slice, we can skip it.
return new MatchNoDocsQuery("this shard is not part of the slice");
}
return new MatchAllDocsQuery();
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.slice;
import org.apache.lucene.search.Query;
import java.util.Objects;
/**
* An abstract {@link Query} that defines an hash function to partition the documents in multiple slices.
*/
public abstract class SliceQuery extends Query {
private final String field;
private final int id;
private final int max;
/**
* @param field The name of the field
* @param id The id of the slice
* @param max The maximum number of slices
*/
public SliceQuery(String field, int id, int max) {
this.field = field;
this.id = id;
this.max = max;
}
// Returns true if the value matches the predicate
protected final boolean contains(long value) {
return Math.floorMod(value, max) == id;
}
public String getField() {
return field;
}
public int getId() {
return id;
}
public int getMax() {
return max;
}
@Override
public boolean equals(Object o) {
if (super.equals(o) == false) {
return false;
}
SliceQuery that = (SliceQuery) o;
return field.equals(that.field) && id == that.id && max == that.max;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), field, id, max);
}
@Override
public String toString(String f) {
return getClass().getSimpleName() + "[field=" + field + ", id=" + id + ", max=" + max + "]";
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.slice;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.ConstantScoreWeight;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ConstantScoreScorer;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.DocIdSetBuilder;
import java.io.IOException;
/**
* A {@link SliceQuery} that uses the terms dictionary of a field to do the slicing.
*
* <b>NOTE</b>: The cost of this filter is O(N*M) where N is the number of unique terms in the dictionary
* and M is the average number of documents per term.
* For each segment this filter enumerates the terms dictionary, computes the hash code for each term and fills
* a bit set with the documents of all terms whose hash code matches the predicate.
* <b>NOTE</b>: Documents with no value for that field are ignored.
*/
public final class TermsSliceQuery extends SliceQuery {
public TermsSliceQuery(String field, int id, int max) {
super(field, id, max);
}
@Override
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
return new ConstantScoreWeight(this) {
@Override
public Scorer scorer(LeafReaderContext context) throws IOException {
final DocIdSet disi = build(context.reader());
final DocIdSetIterator leafIt = disi.iterator();
return new ConstantScoreScorer(this, score(), leafIt);
}
};
}
/**
* Returns a DocIdSet per segments containing the matching docs for the specified slice.
*/
private DocIdSet build(LeafReader reader) throws IOException {
final DocIdSetBuilder builder = new DocIdSetBuilder(reader.maxDoc());
final Terms terms = reader.terms(getField());
final TermsEnum te = terms.iterator();
PostingsEnum docsEnum = null;
for (BytesRef term = te.next(); term != null; term = te.next()) {
int hashCode = term.hashCode();
if (contains(hashCode)) {
docsEnum = te.postings(docsEnum, PostingsEnum.NONE);
int docId = docsEnum.nextDoc();
while (docId != DocIdSetIterator.NO_MORE_DOCS) {
builder.add(docId);
docId = docsEnum.nextDoc();
}
}
}
return builder.build();
}
}

View File

@ -71,6 +71,7 @@ import org.elasticsearch.search.highlight.HighlightBuilderTests;
import org.elasticsearch.search.rescore.QueryRescoreBuilderTests;
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.search.sort.ScriptSortBuilder.ScriptSortType;
@ -426,6 +427,16 @@ public class SearchSourceBuilderTests extends ESTestCase {
xContentBuilder.endObject();
builder.ext(xContentBuilder);
}
if (randomBoolean()) {
String field = randomBoolean() ? null : randomAsciiOfLengthBetween(5, 20);
int max = randomInt(1000);
int id = randomInt(max-1);
if (field == null) {
builder.slice(new SliceBuilder(id, max));
} else {
builder.slice(new SliceBuilder(field, id, max));
}
}
return builder;
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.slice;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.QueryUtils;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
public class DocValuesSliceQueryTests extends ESTestCase {
public void testBasics() {
DocValuesSliceQuery query1 =
new DocValuesSliceQuery("field1", 1, 10);
DocValuesSliceQuery query2 =
new DocValuesSliceQuery("field1", 1, 10);
DocValuesSliceQuery query3 =
new DocValuesSliceQuery("field2", 1, 10);
DocValuesSliceQuery query4 =
new DocValuesSliceQuery("field1", 2, 10);
QueryUtils.check(query1);
QueryUtils.checkEqual(query1, query2);
QueryUtils.checkUnequal(query1, query3);
QueryUtils.checkUnequal(query1, query4);
}
public void testSearch() throws Exception {
final int numDocs = randomIntBetween(100, 200);
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
int max = randomIntBetween(2, 10);
int[] sliceCounters1 = new int[max];
int[] sliceCounters2 = new int[max];
Set<String> keys = new HashSet<>();
for (int i = 0; i < numDocs; ++i) {
Document doc = new Document();
String uuid = UUIDs.base64UUID();
int intValue = randomInt();
long doubleValue = NumericUtils.doubleToSortableLong(randomDouble());
doc.add(new StringField("uuid", uuid, Field.Store.YES));
doc.add(new SortedNumericDocValuesField("intField", intValue));
doc.add(new SortedNumericDocValuesField("doubleField", doubleValue));
w.addDocument(doc);
sliceCounters1[Math.floorMod(Long.hashCode(intValue), max)] ++;
sliceCounters2[Math.floorMod(Long.hashCode(doubleValue), max)] ++;
keys.add(uuid);
}
final IndexReader reader = w.getReader();
final IndexSearcher searcher = newSearcher(reader);
for (int id = 0; id < max; id++) {
DocValuesSliceQuery query1 =
new DocValuesSliceQuery("intField", id, max);
assertThat(searcher.count(query1), equalTo(sliceCounters1[id]));
DocValuesSliceQuery query2 =
new DocValuesSliceQuery("doubleField", id, max);
assertThat(searcher.count(query2), equalTo(sliceCounters2[id]));
searcher.search(query1, new Collector() {
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return new LeafCollector() {
@Override
public void setScorer(Scorer scorer) throws IOException {
}
@Override
public void collect(int doc) throws IOException {
Document d = context.reader().document(doc, Collections.singleton("uuid"));
String uuid = d.get("uuid");
assertThat(keys.contains(uuid), equalTo(true));
keys.remove(uuid);
}
};
}
@Override
public boolean needsScores() {
return false;
}
});
}
assertThat(keys.size(), equalTo(0));
w.close();
reader.close();
dir.close();
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.slice;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchContextException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.startsWith;
public class SearchSliceIT extends ESIntegTestCase {
private static final int NUM_DOCS = 1000;
private int setupIndex(boolean withDocs) throws IOException, ExecutionException, InterruptedException {
String mapping = XContentFactory.jsonBuilder().
startObject()
.startObject("type")
.startObject("properties")
.startObject("invalid_random_kw")
.field("type", "keyword")
.field("doc_values", "false")
.endObject()
.startObject("random_int")
.field("type", "integer")
.field("doc_values", "true")
.endObject()
.startObject("invalid_random_int")
.field("type", "integer")
.field("doc_values", "false")
.endObject()
.endObject()
.endObject()
.endObject().string();
int numberOfShards = randomIntBetween(1, 7);
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings("number_of_shards", numberOfShards)
.addMapping("type", mapping));
ensureGreen();
if (withDocs == false) {
return numberOfShards;
}
List<IndexRequestBuilder> requests = new ArrayList<>();
for (int i = 0; i < NUM_DOCS; i++) {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.field("invalid_random_kw", randomAsciiOfLengthBetween(5, 20));
builder.field("random_int", randomInt());
builder.field("static_int", 0);
builder.field("invalid_random_int", randomInt());
builder.endObject();
requests.add(client().prepareIndex("test", "test").setSource(builder));
}
indexRandom(true, requests);
return numberOfShards;
}
public void testDocIdSort() throws Exception {
int numShards = setupIndex(true);
SearchResponse sr = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize(0)
.get();
int numDocs = (int) sr.getHits().getTotalHits();
assertThat(numDocs, equalTo(NUM_DOCS));
int max = randomIntBetween(2, numShards*3);
for (String field : new String[]{"_uid", "random_int", "static_int"}) {
int fetchSize = randomIntBetween(10, 100);
SearchRequestBuilder request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("_doc"));
assertSearchSlicesWithScroll(request, field, max);
}
}
public void testNumericSort() throws Exception {
int numShards = setupIndex(true);
SearchResponse sr = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize(0)
.get();
int numDocs = (int) sr.getHits().getTotalHits();
assertThat(numDocs, equalTo(NUM_DOCS));
int max = randomIntBetween(2, numShards*3);
for (String field : new String[]{"_uid", "random_int", "static_int"}) {
int fetchSize = randomIntBetween(10, 100);
SearchRequestBuilder request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
.addSort(SortBuilders.fieldSort("random_int"))
.setSize(fetchSize);
assertSearchSlicesWithScroll(request, field, max);
}
}
public void testInvalidFields() throws Exception {
setupIndex(false);
SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class,
() -> client().prepareSearch("test")
.setQuery(matchAllQuery())
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
.slice(new SliceBuilder("invalid_random_int", 0, 10))
.get());
Throwable rootCause = findRootCause(exc);
assertThat(rootCause.getClass(), equalTo(IllegalArgumentException.class));
assertThat(rootCause.getMessage(),
startsWith("cannot load numeric doc values"));
exc = expectThrows(SearchPhaseExecutionException.class, () -> client().prepareSearch("test")
.setQuery(matchAllQuery())
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
.slice(new SliceBuilder("invalid_random_kw", 0, 10))
.get());
rootCause = findRootCause(exc);
assertThat(rootCause.getClass(), equalTo(IllegalArgumentException.class));
assertThat(rootCause.getMessage(),
startsWith("cannot load numeric doc values"));
}
public void testInvalidQuery() throws Exception {
setupIndex(false);
SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class,
() -> client().prepareSearch()
.setQuery(matchAllQuery())
.slice(new SliceBuilder("invalid_random_int", 0, 10))
.get());
Throwable rootCause = findRootCause(exc);
assertThat(rootCause.getClass(), equalTo(SearchContextException.class));
assertThat(rootCause.getMessage(),
equalTo("`slice` cannot be used outside of a scroll context"));
}
private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice) {
int totalResults = 0;
List<String> keys = new ArrayList<>();
for (int id = 0; id < numSlice; id++) {
SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
SearchResponse searchResponse = request.slice(sliceBuilder).get();
totalResults += searchResponse.getHits().getHits().length;
int expectedSliceResults = (int) searchResponse.getHits().getTotalHits();
int numSliceResults = searchResponse.getHits().getHits().length;
String scrollId = searchResponse.getScrollId();
for (SearchHit hit : searchResponse.getHits().getHits()) {
keys.add(hit.getId());
}
while (searchResponse.getHits().getHits().length > 0) {
searchResponse = client().prepareSearchScroll("test")
.setScrollId(scrollId)
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
.get();
scrollId = searchResponse.getScrollId();
totalResults += searchResponse.getHits().getHits().length;
numSliceResults += searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
keys.add(hit.getId());
}
}
assertThat(numSliceResults, equalTo(expectedSliceResults));
clearScroll(scrollId);
}
assertThat(totalResults, equalTo(NUM_DOCS));
assertThat(keys.size(), equalTo(NUM_DOCS));
assertThat(new HashSet(keys).size(), equalTo(NUM_DOCS));
}
private Throwable findRootCause(Exception e) {
Throwable ret = e;
while (ret.getCause() != null) {
ret = ret.getCause();
}
return ret;
}
}

View File

@ -0,0 +1,340 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.slice;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lucene.search.MatchNoDocsQuery;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryParser;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.test.ESTestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SliceBuilderTests extends ESTestCase {
private static final int MAX_SLICE = 20;
private static NamedWriteableRegistry namedWriteableRegistry;
private static IndicesQueriesRegistry indicesQueriesRegistry;
/**
* setup for the whole base test class
*/
@BeforeClass
public static void init() {
namedWriteableRegistry = new NamedWriteableRegistry();
indicesQueriesRegistry = new IndicesQueriesRegistry();
QueryParser<MatchAllQueryBuilder> parser = MatchAllQueryBuilder::fromXContent;
indicesQueriesRegistry.register(parser, MatchAllQueryBuilder.QUERY_NAME_FIELD);
}
@AfterClass
public static void afterClass() throws Exception {
namedWriteableRegistry = null;
indicesQueriesRegistry = null;
}
private final SliceBuilder randomSliceBuilder() throws IOException {
int max = randomIntBetween(2, MAX_SLICE);
if (max == 0) max++;
int id = randomInt(max - 1);
String field = randomAsciiOfLengthBetween(5, 20);
return new SliceBuilder(field, id, max);
}
private static SliceBuilder serializedCopy(SliceBuilder original) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
original.writeTo(output);
try (StreamInput in =
new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
return new SliceBuilder(in);
}
}
}
public void testSerialization() throws Exception {
SliceBuilder original = randomSliceBuilder();
SliceBuilder deserialized = serializedCopy(original);
assertEquals(deserialized, original);
assertEquals(deserialized.hashCode(), original.hashCode());
assertNotSame(deserialized, original);
}
public void testEqualsAndHashcode() throws Exception {
SliceBuilder firstBuilder = randomSliceBuilder();
assertFalse("sliceBuilder is equal to null", firstBuilder.equals(null));
assertFalse("sliceBuilder is equal to incompatible type", firstBuilder.equals(""));
assertTrue("sliceBuilder is not equal to self", firstBuilder.equals(firstBuilder));
assertThat("same searchFrom's hashcode returns different values if called multiple times",
firstBuilder.hashCode(), equalTo(firstBuilder.hashCode()));
SliceBuilder secondBuilder = serializedCopy(firstBuilder);
assertTrue("sliceBuilder is not equal to self", secondBuilder.equals(secondBuilder));
assertTrue("sliceBuilder is not equal to its copy", firstBuilder.equals(secondBuilder));
assertTrue("equals is not symmetric", secondBuilder.equals(firstBuilder));
assertThat("sliceBuilder copy's hashcode is different from original hashcode", secondBuilder.hashCode(),
equalTo(firstBuilder.hashCode()));
SliceBuilder thirdBuilder = serializedCopy(secondBuilder);
assertTrue("sliceBuilder is not equal to self", thirdBuilder.equals(thirdBuilder));
assertTrue("sliceBuilder is not equal to its copy", secondBuilder.equals(thirdBuilder));
assertThat("sliceBuilder copy's hashcode is different from original hashcode", secondBuilder.hashCode(),
equalTo(thirdBuilder.hashCode()));
assertTrue("equals is not transitive", firstBuilder.equals(thirdBuilder));
assertThat("sliceBuilder copy's hashcode is different from original hashcode", firstBuilder.hashCode(),
equalTo(thirdBuilder.hashCode()));
assertTrue("sliceBuilder is not symmetric", thirdBuilder.equals(secondBuilder));
assertTrue("sliceBuilder is not symmetric", thirdBuilder.equals(firstBuilder));
}
public void testFromXContent() throws Exception {
SliceBuilder sliceBuilder = randomSliceBuilder();
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
if (randomBoolean()) {
builder.prettyPrint();
}
builder.startObject();
sliceBuilder.innerToXContent(builder);
builder.endObject();
XContentParser parser = XContentHelper.createParser(shuffleXContent(builder).bytes());
QueryParseContext context = new QueryParseContext(indicesQueriesRegistry, parser,
ParseFieldMatcher.STRICT);
SliceBuilder secondSliceBuilder = SliceBuilder.fromXContent(context);
assertNotSame(sliceBuilder, secondSliceBuilder);
assertEquals(sliceBuilder, secondSliceBuilder);
assertEquals(sliceBuilder.hashCode(), secondSliceBuilder.hashCode());
}
public void testInvalidArguments() throws Exception {
Exception e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", -1, 10));
assertEquals(e.getMessage(), "id must be greater than or equal to 0");
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, -1));
assertEquals(e.getMessage(), "max must be greater than 1");
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 0));
assertEquals(e.getMessage(), "max must be greater than 1");
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 5));
assertEquals(e.getMessage(), "max must be greater than id");
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1000, 1000));
assertEquals(e.getMessage(), "max must be greater than id");
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1001, 1000));
assertEquals(e.getMessage(), "max must be greater than id");
}
public void testToFilter() throws IOException {
Directory dir = new RAMDirectory();
try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
writer.commit();
}
QueryShardContext context = mock(QueryShardContext.class);
try (IndexReader reader = DirectoryReader.open(dir)) {
MappedFieldType fieldType = new MappedFieldType() {
@Override
public MappedFieldType clone() {
return null;
}
@Override
public String typeName() {
return null;
}
@Override
public Query termQuery(Object value, @Nullable QueryShardContext context) {
return null;
}
};
fieldType.setName(UidFieldMapper.NAME);
fieldType.setHasDocValues(false);
when(context.fieldMapper(UidFieldMapper.NAME)).thenReturn(fieldType);
when(context.getIndexReader()).thenReturn(reader);
SliceBuilder builder = new SliceBuilder(5, 10);
Query query = builder.toFilter(context, 0, 1);
assertThat(query, instanceOf(TermsSliceQuery.class));
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
try (IndexReader newReader = DirectoryReader.open(dir)) {
when(context.getIndexReader()).thenReturn(newReader);
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
}
}
try (IndexReader reader = DirectoryReader.open(dir)) {
MappedFieldType fieldType = new MappedFieldType() {
@Override
public MappedFieldType clone() {
return null;
}
@Override
public String typeName() {
return null;
}
@Override
public Query termQuery(Object value, @Nullable QueryShardContext context) {
return null;
}
};
fieldType.setName("field_doc_values");
fieldType.setHasDocValues(true);
fieldType.setDocValuesType(DocValuesType.SORTED_NUMERIC);
when(context.fieldMapper("field_doc_values")).thenReturn(fieldType);
when(context.getIndexReader()).thenReturn(reader);
IndexNumericFieldData fd = mock(IndexNumericFieldData.class);
when(context.getForField(fieldType)).thenReturn(fd);
SliceBuilder builder = new SliceBuilder("field_doc_values", 5, 10);
Query query = builder.toFilter(context, 0, 1);
assertThat(query, instanceOf(DocValuesSliceQuery.class));
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
try (IndexReader newReader = DirectoryReader.open(dir)) {
when(context.getIndexReader()).thenReturn(newReader);
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
}
// numSlices > numShards
int numSlices = randomIntBetween(10, 100);
int numShards = randomIntBetween(1, 9);
Map<Integer, AtomicInteger> numSliceMap = new HashMap<>();
for (int i = 0; i < numSlices; i++) {
for (int j = 0; j < numShards; j++) {
SliceBuilder slice = new SliceBuilder("_uid", i, numSlices);
Query q = slice.toFilter(context, j, numShards);
if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) {
AtomicInteger count = numSliceMap.get(j);
if (count == null) {
count = new AtomicInteger(0);
numSliceMap.put(j, count);
}
count.incrementAndGet();
if (q instanceof MatchAllDocsQuery) {
assertThat(count.get(), equalTo(1));
}
} else {
assertThat(q, instanceOf(MatchNoDocsQuery.class));
}
}
}
int total = 0;
for (Map.Entry<Integer, AtomicInteger> e : numSliceMap.entrySet()) {
total += e.getValue().get();
}
assertThat(total, equalTo(numSlices));
// numShards > numSlices
numShards = randomIntBetween(3, 100);
numSlices = randomInt(numShards-1);
List<Integer> targetShards = new ArrayList<>();
for (int i = 0; i < numSlices; i++) {
for (int j = 0; j < numShards; j++) {
SliceBuilder slice = new SliceBuilder("_uid", i, numSlices);
Query q = slice.toFilter(context, j, numShards);
if (q instanceof MatchNoDocsQuery == false) {
assertThat(q, instanceOf(MatchAllDocsQuery.class));
targetShards.add(j);
}
}
}
assertThat(targetShards.size(), equalTo(numShards));
assertThat(new HashSet<>(targetShards).size(), equalTo(numShards));
// numShards == numSlices
numShards = randomIntBetween(2, 10);
numSlices = numShards;
for (int i = 0; i < numSlices; i++) {
for (int j = 0; j < numShards; j++) {
SliceBuilder slice = new SliceBuilder("_uid", i, numSlices);
Query q = slice.toFilter(context, j, numShards);
if (i == j) {
assertThat(q, instanceOf(MatchAllDocsQuery.class));
} else {
assertThat(q, instanceOf(MatchNoDocsQuery.class));
}
}
}
}
try (IndexReader reader = DirectoryReader.open(dir)) {
MappedFieldType fieldType = new MappedFieldType() {
@Override
public MappedFieldType clone() {
return null;
}
@Override
public String typeName() {
return null;
}
@Override
public Query termQuery(Object value, @Nullable QueryShardContext context) {
return null;
}
};
fieldType.setName("field_without_doc_values");
when(context.fieldMapper("field_without_doc_values")).thenReturn(fieldType);
when(context.getIndexReader()).thenReturn(reader);
SliceBuilder builder = new SliceBuilder("field_without_doc_values", 5, 10);
IllegalArgumentException exc =
expectThrows(IllegalArgumentException.class, () -> builder.toFilter(context, 0, 1));
assertThat(exc.getMessage(), containsString("cannot load numeric doc values"));
}
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.slice;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.QueryUtils;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
public class TermsSliceQueryTests extends ESTestCase {
public void testBasics() {
TermsSliceQuery query1 =
new TermsSliceQuery("field1", 1, 10);
TermsSliceQuery query2 =
new TermsSliceQuery("field1", 1, 10);
TermsSliceQuery query3 =
new TermsSliceQuery("field2", 1, 10);
TermsSliceQuery query4 =
new TermsSliceQuery("field1", 2, 10);
QueryUtils.check(query1);
QueryUtils.checkEqual(query1, query2);
QueryUtils.checkUnequal(query1, query3);
QueryUtils.checkUnequal(query1, query4);
}
public void testSearch() throws Exception {
final int numDocs = randomIntBetween(100, 200);
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir, new KeywordAnalyzer());
int max = randomIntBetween(2, 10);
int[] sliceCounters = new int[max];
Set<String> keys = new HashSet<>();
for (int i = 0; i < numDocs; ++i) {
Document doc = new Document();
String uuid = UUIDs.base64UUID();
BytesRef br = new BytesRef(uuid);
int id = Math.floorMod(br.hashCode(), max);
sliceCounters[id] ++;
doc.add(new StringField("uuid", uuid, Field.Store.YES));
w.addDocument(doc);
keys.add(uuid);
}
final IndexReader reader = w.getReader();
final IndexSearcher searcher = newSearcher(reader);
for (int id = 0; id < max; id++) {
TermsSliceQuery query1 =
new TermsSliceQuery("uuid", id, max);
assertThat(searcher.count(query1), equalTo(sliceCounters[id]));
searcher.search(query1, new Collector() {
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return new LeafCollector() {
@Override
public void setScorer(Scorer scorer) throws IOException {
}
@Override
public void collect(int doc) throws IOException {
Document d = context.reader().document(doc, Collections.singleton("uuid"));
String uuid = d.get("uuid");
assertThat(keys.contains(uuid), equalTo(true));
keys.remove(uuid);
}
};
}
@Override
public boolean needsScores() {
return false;
}
});
}
assertThat(keys.size(), equalTo(0));
w.close();
reader.close();
dir.close();
}
}

View File

@ -175,3 +175,92 @@ curl -XDELETE localhost:9200/_search/scroll \
-d 'c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1,aGVuRmV0Y2g7NTsxOnkxaDZ'
---------------------------------------
==== Sliced Scroll
For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which
can be consumed independently:
[source,js]
--------------------------------------------------
curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '
{
"slice": {
"id": 0, <1>
"max": 2 <2>
},
"query": {
"match" : {
"title" : "elasticsearch"
}
}
}
'
curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '
{
"slice": {
"id": 1,
"max": 2
},
"query": {
"match" : {
"title" : "elasticsearch"
}
}
}
'
--------------------------------------------------
<1> The id of the slice
<2> The maximum number of slices
The result from the first request returned documents that belong to the first slice (id: 0) and the result from the
second request returned documents that belong to the second slice. Since the maximum number of slices is set to 2
the union of the results of the two requests is equivalent to the results of a scroll query without slicing.
By default the splitting is done on the shards first and then locally on each shard using the _uid field
with the following formula:
`slice(doc) = floorMod(hashCode(doc._uid), max)`
For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned
to the first shard and the slices 1 and 3 are assigned to the second shard.
Each scroll is independent and can be processed in parallel like any scroll request.
NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals
to N bits per slice where N is the total number of documents in the shard.
After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of
sliced query you perform in parallel to avoid the memory explosion.
To avoid this cost entirely it is possible to use the `doc_values` of another field to do the slicing
but the user must ensure that the field has the following properties:
* The field is numeric.
* `doc_values` are enabled on that field
* Every document should contain a single value. If a document has multiple values for the specified field, the first value is used.
* The value for each document should be set once when the document is created and never updated. This ensures that each
slice gets deterministic results.
* The cardinality of the field should be high. This ensures that each slice gets approximately the same amount of documents.
[source,js]
--------------------------------------------------
curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '
{
"slice": {
"field": "my_random_integer_field",
"id": 0,
"max": 10
},
"query": {
"match" : {
"title" : "elasticsearch"
}
}
}
'
--------------------------------------------------
For append only time-based indices, the `timestamp` field can be used safely.