Add the ability to partition a scroll in multiple slices.
API: ``` curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '{ "slice": { "field": "_uid", <1> "id": 0, <2> "max": 10 <3> }, "query": { "match" : { "title" : "elasticsearch" } } } ``` <1> (optional) The field name used to do the slicing (_uid by default) <2> The id of the slice By default the splitting is done on the shards first and then locally on each shard using the _uid field with the following formula: `slice(doc) = floorMod(hashCode(doc._uid), max)` For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard. Each scroll is independent and can be processed in parallel like any scroll request. Closes #13494
This commit is contained in:
parent
d71894a226
commit
b9030bf6fe
|
@ -30,6 +30,7 @@ import org.elasticsearch.script.Template;
|
|||
import org.elasticsearch.search.Scroll;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
|
||||
import org.elasticsearch.search.slice.SliceBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.highlight.HighlightBuilder;
|
||||
import org.elasticsearch.search.rescore.RescoreBuilder;
|
||||
|
@ -352,6 +353,11 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
|
|||
return this;
|
||||
}
|
||||
|
||||
public SearchRequestBuilder slice(SliceBuilder builder) {
|
||||
sourceBuilder().slice(builder);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies when sorting, and controls if scores will be tracked as well. Defaults to
|
||||
* <tt>false</tt>.
|
||||
|
|
|
@ -821,6 +821,15 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|||
FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());
|
||||
context.searchAfter(fieldDoc);
|
||||
}
|
||||
|
||||
if (source.slice() != null) {
|
||||
if (context.scrollContext() == null) {
|
||||
throw new SearchContextException(context, "`slice` cannot be used outside of a scroll context");
|
||||
}
|
||||
context.sliceFilter(source.slice().toFilter(queryShardContext,
|
||||
context.shardTarget().getShardId().getId(),
|
||||
queryShardContext.getIndexSettings().getNumberOfShards()));
|
||||
}
|
||||
}
|
||||
|
||||
private static final int[] EMPTY_DOC_IDS = new int[0];
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.index.query.QueryParseContext;
|
|||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.slice.SliceBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.AggregatorParsers;
|
||||
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
|
||||
|
@ -98,6 +99,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
public static final ParseField EXT_FIELD = new ParseField("ext");
|
||||
public static final ParseField PROFILE_FIELD = new ParseField("profile");
|
||||
public static final ParseField SEARCH_AFTER = new ParseField("search_after");
|
||||
public static final ParseField SLICE = new ParseField("slice");
|
||||
|
||||
public static SearchSourceBuilder fromXContent(QueryParseContext context, AggregatorParsers aggParsers,
|
||||
Suggesters suggesters) throws IOException {
|
||||
|
@ -138,6 +140,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
|
||||
private SearchAfterBuilder searchAfterBuilder;
|
||||
|
||||
private SliceBuilder sliceBuilder;
|
||||
|
||||
private Float minScore;
|
||||
|
||||
private long timeoutInMillis = -1;
|
||||
|
@ -175,9 +179,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
* Read from a stream.
|
||||
*/
|
||||
public SearchSourceBuilder(StreamInput in) throws IOException {
|
||||
if (in.readBoolean()) {
|
||||
aggregations = new AggregatorFactories.Builder(in);
|
||||
}
|
||||
aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
|
||||
explain = in.readOptionalBoolean();
|
||||
fetchSourceContext = in.readOptionalStreamable(FetchSourceContext::new);
|
||||
boolean hasFieldDataFields = in.readBoolean();
|
||||
|
@ -206,15 +208,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
indexBoost.put(in.readString(), in.readFloat());
|
||||
}
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
minScore = in.readFloat();
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
postQueryBuilder = in.readNamedWriteable(QueryBuilder.class);
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
queryBuilder = in.readNamedWriteable(QueryBuilder.class);
|
||||
}
|
||||
minScore = in.readOptionalFloat();
|
||||
postQueryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
|
||||
queryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
|
||||
if (in.readBoolean()) {
|
||||
int size = in.readVInt();
|
||||
rescoreBuilders = new ArrayList<>();
|
||||
|
@ -244,29 +240,20 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
stats.add(in.readString());
|
||||
}
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
suggestBuilder = new SuggestBuilder(in);
|
||||
}
|
||||
suggestBuilder = in.readOptionalWriteable(SuggestBuilder::new);
|
||||
terminateAfter = in.readVInt();
|
||||
timeoutInMillis = in.readLong();
|
||||
trackScores = in.readBoolean();
|
||||
version = in.readOptionalBoolean();
|
||||
if (in.readBoolean()) {
|
||||
ext = in.readBytesReference();
|
||||
}
|
||||
ext = in.readOptionalBytesReference();
|
||||
profile = in.readBoolean();
|
||||
if (in.readBoolean()) {
|
||||
searchAfterBuilder = new SearchAfterBuilder(in);
|
||||
}
|
||||
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
|
||||
sliceBuilder = in.readOptionalWriteable(SliceBuilder::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
boolean hasAggregations = aggregations != null;
|
||||
out.writeBoolean(hasAggregations);
|
||||
if (hasAggregations) {
|
||||
aggregations.writeTo(out);
|
||||
}
|
||||
out.writeOptionalWriteable(aggregations);
|
||||
out.writeOptionalBoolean(explain);
|
||||
out.writeOptionalStreamable(fetchSourceContext);
|
||||
boolean hasFieldDataFields = fieldDataFields != null;
|
||||
|
@ -296,21 +283,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
out.writeFloat(indexBoost.get(key.value));
|
||||
}
|
||||
}
|
||||
boolean hasMinScore = minScore != null;
|
||||
out.writeBoolean(hasMinScore);
|
||||
if (hasMinScore) {
|
||||
out.writeFloat(minScore);
|
||||
}
|
||||
boolean hasPostQuery = postQueryBuilder != null;
|
||||
out.writeBoolean(hasPostQuery);
|
||||
if (hasPostQuery) {
|
||||
out.writeNamedWriteable(postQueryBuilder);
|
||||
}
|
||||
boolean hasQuery = queryBuilder != null;
|
||||
out.writeBoolean(hasQuery);
|
||||
if (hasQuery) {
|
||||
out.writeNamedWriteable(queryBuilder);
|
||||
}
|
||||
out.writeOptionalFloat(minScore);
|
||||
out.writeOptionalNamedWriteable(postQueryBuilder);
|
||||
out.writeOptionalNamedWriteable(queryBuilder);
|
||||
boolean hasRescoreBuilders = rescoreBuilders != null;
|
||||
out.writeBoolean(hasRescoreBuilders);
|
||||
if (hasRescoreBuilders) {
|
||||
|
@ -344,26 +319,15 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
out.writeString(stat);
|
||||
}
|
||||
}
|
||||
boolean hasSuggestBuilder = suggestBuilder != null;
|
||||
out.writeBoolean(hasSuggestBuilder);
|
||||
if (hasSuggestBuilder) {
|
||||
suggestBuilder.writeTo(out);
|
||||
}
|
||||
out.writeOptionalWriteable(suggestBuilder);
|
||||
out.writeVInt(terminateAfter);
|
||||
out.writeLong(timeoutInMillis);
|
||||
out.writeBoolean(trackScores);
|
||||
out.writeOptionalBoolean(version);
|
||||
boolean hasExt = ext != null;
|
||||
out.writeBoolean(hasExt);
|
||||
if (hasExt) {
|
||||
out.writeBytesReference(ext);
|
||||
}
|
||||
out.writeOptionalBytesReference(ext);
|
||||
out.writeBoolean(profile);
|
||||
boolean hasSearchAfter = searchAfterBuilder != null;
|
||||
out.writeBoolean(hasSearchAfter);
|
||||
if (hasSearchAfter) {
|
||||
searchAfterBuilder.writeTo(out);
|
||||
}
|
||||
out.writeOptionalWriteable(searchAfterBuilder);
|
||||
out.writeOptionalWriteable(sliceBuilder);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -597,6 +561,22 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a filter that will restrict the search hits, the top hits and the aggregations to a slice of the results
|
||||
* of the main query.
|
||||
*/
|
||||
public SearchSourceBuilder slice(SliceBuilder builder) {
|
||||
this.sliceBuilder = builder;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the slice used to filter the search hits, the top hits and the aggregations.
|
||||
*/
|
||||
public SliceBuilder slice() {
|
||||
return sliceBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an aggregation to perform as part of the search.
|
||||
*/
|
||||
|
@ -943,6 +923,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
rewrittenBuilder.rescoreBuilders = rescoreBuilders;
|
||||
rewrittenBuilder.scriptFields = scriptFields;
|
||||
rewrittenBuilder.searchAfterBuilder = searchAfterBuilder;
|
||||
rewrittenBuilder.sliceBuilder = sliceBuilder;
|
||||
rewrittenBuilder.size = size;
|
||||
rewrittenBuilder.sorts = sorts;
|
||||
rewrittenBuilder.stats = stats;
|
||||
|
@ -1039,6 +1020,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
} else if (context.getParseFieldMatcher().match(currentFieldName, EXT_FIELD)) {
|
||||
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().copyCurrentStructure(parser);
|
||||
ext = xContentBuilder.bytes();
|
||||
} else if (context.getParseFieldMatcher().match(currentFieldName, SLICE)) {
|
||||
sliceBuilder = SliceBuilder.fromXContent(context);
|
||||
} else {
|
||||
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
|
||||
parser.getTokenLocation());
|
||||
|
@ -1193,6 +1176,10 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
builder.field(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues());
|
||||
}
|
||||
|
||||
if (sliceBuilder != null) {
|
||||
builder.field(SLICE.getPreferredName(), sliceBuilder);
|
||||
}
|
||||
|
||||
if (indexBoost != null) {
|
||||
builder.startObject(INDICES_BOOST_FIELD.getPreferredName());
|
||||
assert !indexBoost.containsKey(null);
|
||||
|
@ -1355,7 +1342,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
public int hashCode() {
|
||||
return Objects.hash(aggregations, explain, fetchSourceContext, fieldDataFields, fieldNames, from,
|
||||
highlightBuilder, indexBoost, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields,
|
||||
size, sorts, searchAfterBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile);
|
||||
size, sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1383,6 +1370,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
|
|||
&& Objects.equals(size, other.size)
|
||||
&& Objects.equals(sorts, other.sorts)
|
||||
&& Objects.equals(searchAfterBuilder, other.searchAfterBuilder)
|
||||
&& Objects.equals(sliceBuilder, other.sliceBuilder)
|
||||
&& Objects.equals(stats, other.stats)
|
||||
&& Objects.equals(suggestBuilder, other.suggestBuilder)
|
||||
&& Objects.equals(terminateAfter, other.terminateAfter)
|
||||
|
|
|
@ -115,6 +115,9 @@ public class DefaultSearchContext extends SearchContext {
|
|||
private Float minimumScore;
|
||||
private boolean trackScores = false; // when sorting, track scores as well...
|
||||
private FieldDoc searchAfter;
|
||||
// filter for sliced scroll
|
||||
private Query sliceFilter;
|
||||
|
||||
/**
|
||||
* The original query as sent by the user without the types and aliases
|
||||
* applied. Putting things in here leaks them into highlighting so don't add
|
||||
|
@ -122,8 +125,7 @@ public class DefaultSearchContext extends SearchContext {
|
|||
*/
|
||||
private ParsedQuery originalQuery;
|
||||
/**
|
||||
* Just like originalQuery but with the filters from types and aliases
|
||||
* applied.
|
||||
* Just like originalQuery but with the filters from types, aliases and slice applied.
|
||||
*/
|
||||
private ParsedQuery filteredQuery;
|
||||
/**
|
||||
|
@ -210,7 +212,7 @@ public class DefaultSearchContext extends SearchContext {
|
|||
if (rescoreContext.window() > maxWindow) {
|
||||
throw new QueryPhaseExecutionException(this, "Rescore window [" + rescoreContext.window() + "] is too large. It must "
|
||||
+ "be less than [" + maxWindow + "]. This prevents allocating massive heaps for storing the results to be "
|
||||
+ "rescored. This limit can be set by chaning the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey()
|
||||
+ "rescored. This limit can be set by chaining the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey()
|
||||
+ "] index level setting.");
|
||||
|
||||
}
|
||||
|
@ -254,7 +256,17 @@ public class DefaultSearchContext extends SearchContext {
|
|||
@Override
|
||||
@Nullable
|
||||
public Query searchFilter(String[] types) {
|
||||
return createSearchFilter(types, aliasFilter, mapperService().hasNested());
|
||||
Query typesFilter = createSearchFilter(types, aliasFilter, mapperService().hasNested());
|
||||
if (sliceFilter == null) {
|
||||
return typesFilter;
|
||||
}
|
||||
if (typesFilter == null) {
|
||||
return sliceFilter;
|
||||
}
|
||||
return new BooleanQuery.Builder()
|
||||
.add(typesFilter, Occur.FILTER)
|
||||
.add(sliceFilter, Occur.FILTER)
|
||||
.build();
|
||||
}
|
||||
|
||||
// extracted to static helper method to make writing unit tests easier:
|
||||
|
@ -550,6 +562,11 @@ public class DefaultSearchContext extends SearchContext {
|
|||
return searchAfter;
|
||||
}
|
||||
|
||||
public SearchContext sliceFilter(Query filter) {
|
||||
this.sliceFilter = filter;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext parsedPostFilter(ParsedQuery postFilter) {
|
||||
this.postFilter = postFilter;
|
||||
|
|
|
@ -29,5 +29,4 @@ public class ScrollContext {
|
|||
public float maxScore;
|
||||
public ScoreDoc lastEmittedDoc;
|
||||
public Scroll scroll;
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.elasticsearch.search.internal;
|
|||
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.util.Counter;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.index.query.ParsedQuery;
|
||||
import org.elasticsearch.search.aggregations.SearchContextAggregations;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.DocValues;
|
||||
import org.apache.lucene.index.SortedNumericDocValues;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.search.RandomAccessWeight;
|
||||
import org.apache.lucene.util.Bits;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A {@link SliceQuery} that uses the numeric doc values of a field to do the slicing.
|
||||
*
|
||||
* <b>NOTE</b>: With deterministic field values this query can be used across different readers safely.
|
||||
* If updates are accepted on the field you must ensure that the same reader is used for all `slice` queries.
|
||||
*/
|
||||
public final class DocValuesSliceQuery extends SliceQuery {
|
||||
public DocValuesSliceQuery(String field, int id, int max) {
|
||||
super(field, id, max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
|
||||
return new RandomAccessWeight(this) {
|
||||
@Override
|
||||
protected Bits getMatchingDocs(final LeafReaderContext context) throws IOException {
|
||||
final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), getField());
|
||||
return new Bits() {
|
||||
@Override
|
||||
public boolean get(int doc) {
|
||||
values.setDocument(doc);
|
||||
for (int i = 0; i < values.count(); i++) {
|
||||
return contains(Long.hashCode(values.valueAt(i)));
|
||||
}
|
||||
return contains(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int length() {
|
||||
return context.reader().maxDoc();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,251 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lucene.search.MatchNoDocsQuery;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.query.QueryParseContext;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A slice builder allowing to split a scroll in multiple partitions.
|
||||
* If the provided field is the "_uid" it uses a {@link org.elasticsearch.search.slice.TermsSliceQuery}
|
||||
* to do the slicing. The slicing is done at the shard level first and then each shard is splitted in multiple slices.
|
||||
* For instance if the number of shards is equal to 2 and the user requested 4 slices
|
||||
* then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard.
|
||||
* This way the total number of bitsets that we need to build on each shard is bounded by the number of slices
|
||||
* (instead of {@code numShards*numSlices}).
|
||||
* Otherwise the provided field must be a numeric and doc_values must be enabled. In that case a
|
||||
* {@link org.elasticsearch.search.slice.DocValuesSliceQuery} is used to filter the results.
|
||||
*/
|
||||
public class SliceBuilder extends ToXContentToBytes implements Writeable {
|
||||
public static final ParseField FIELD_FIELD = new ParseField("field");
|
||||
public static final ParseField ID_FIELD = new ParseField("id");
|
||||
public static final ParseField MAX_FIELD = new ParseField("max");
|
||||
private final static ObjectParser<SliceBuilder, QueryParseContext> PARSER =
|
||||
new ObjectParser<>("slice", SliceBuilder::new);
|
||||
|
||||
static {
|
||||
PARSER.declareString(SliceBuilder::setField, FIELD_FIELD);
|
||||
PARSER.declareInt(SliceBuilder::setId, ID_FIELD);
|
||||
PARSER.declareInt(SliceBuilder::setMax, MAX_FIELD);
|
||||
}
|
||||
|
||||
/** Name of field to slice against (_uid by default) */
|
||||
private String field = UidFieldMapper.NAME;
|
||||
/** The id of the slice */
|
||||
private int id = -1;
|
||||
/** Max number of slices */
|
||||
private int max = -1;
|
||||
|
||||
private SliceBuilder() {}
|
||||
|
||||
public SliceBuilder(int id, int max) {
|
||||
this(UidFieldMapper.NAME, id, max);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param field The name of the field
|
||||
* @param id The id of the slice
|
||||
* @param max The maximum number of slices
|
||||
*/
|
||||
public SliceBuilder(String field, int id, int max) {
|
||||
setField(field);
|
||||
setId(id);
|
||||
setMax(max);
|
||||
}
|
||||
|
||||
public SliceBuilder(StreamInput in) throws IOException {
|
||||
this.field = in.readString();
|
||||
this.id = in.readVInt();
|
||||
this.max = in.readVInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(field);
|
||||
out.writeVInt(id);
|
||||
out.writeVInt(max);
|
||||
}
|
||||
|
||||
private SliceBuilder setField(String field) {
|
||||
if (Strings.isEmpty(field)) {
|
||||
throw new IllegalArgumentException("field name is null or empty");
|
||||
}
|
||||
this.field = field;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the field to slice against
|
||||
*/
|
||||
public String getField() {
|
||||
return this.field;
|
||||
}
|
||||
|
||||
private SliceBuilder setId(int id) {
|
||||
if (id < 0) {
|
||||
throw new IllegalArgumentException("id must be greater than or equal to 0");
|
||||
}
|
||||
if (max != -1 && id >= max) {
|
||||
throw new IllegalArgumentException("max must be greater than id");
|
||||
}
|
||||
this.id = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The id of the slice.
|
||||
*/
|
||||
public int getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
private SliceBuilder setMax(int max) {
|
||||
if (max <= 1) {
|
||||
throw new IllegalArgumentException("max must be greater than 1");
|
||||
}
|
||||
if (id != -1 && id >= max) {
|
||||
throw new IllegalArgumentException("max must be greater than id");
|
||||
}
|
||||
this.max = max;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum number of slices.
|
||||
*/
|
||||
public int getMax() {
|
||||
return max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
innerToXContent(builder);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
void innerToXContent(XContentBuilder builder) throws IOException {
|
||||
builder.field(FIELD_FIELD.getPreferredName(), field);
|
||||
builder.field(ID_FIELD.getPreferredName(), id);
|
||||
builder.field(MAX_FIELD.getPreferredName(), max);
|
||||
}
|
||||
|
||||
public static SliceBuilder fromXContent(QueryParseContext context) throws IOException {
|
||||
SliceBuilder builder = PARSER.parse(context.parser(), new SliceBuilder(), context);
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof SliceBuilder)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SliceBuilder o = (SliceBuilder) other;
|
||||
return ((field == null && o.field == null) || field.equals(o.field))
|
||||
&& id == o.id && o.max == max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(this.field, this.id, this.max);
|
||||
}
|
||||
|
||||
public Query toFilter(QueryShardContext context, int shardId, int numShards) {
|
||||
final MappedFieldType type = context.fieldMapper(field);
|
||||
if (type == null) {
|
||||
throw new IllegalArgumentException("field " + field + " not found");
|
||||
}
|
||||
|
||||
boolean useTermQuery = false;
|
||||
if (UidFieldMapper.NAME.equals(field)) {
|
||||
useTermQuery = true;
|
||||
} else if (type.hasDocValues() == false) {
|
||||
throw new IllegalArgumentException("cannot load numeric doc values on " + field);
|
||||
} else {
|
||||
IndexFieldData ifm = context.getForField(type);
|
||||
if (ifm instanceof IndexNumericFieldData == false) {
|
||||
throw new IllegalArgumentException("cannot load numeric doc values on " + field);
|
||||
}
|
||||
}
|
||||
|
||||
if (numShards == 1) {
|
||||
return useTermQuery ? new TermsSliceQuery(field, id, max) :
|
||||
new DocValuesSliceQuery(field, id, max);
|
||||
}
|
||||
if (max >= numShards) {
|
||||
// the number of slices is greater than the number of shards
|
||||
// in such case we can reduce the number of requested shards by slice
|
||||
|
||||
// first we check if the slice is responsible of this shard
|
||||
int targetShard = id % numShards;
|
||||
if (targetShard != shardId) {
|
||||
// the shard is not part of this slice, we can skip it.
|
||||
return new MatchNoDocsQuery("this shard is not part of the slice");
|
||||
}
|
||||
// compute the number of slices where this shard appears
|
||||
int numSlicesInShard = max / numShards;
|
||||
int rest = max % numShards;
|
||||
if (rest > targetShard) {
|
||||
numSlicesInShard++;
|
||||
}
|
||||
|
||||
if (numSlicesInShard == 1) {
|
||||
// this shard has only one slice so we must check all the documents
|
||||
return new MatchAllDocsQuery();
|
||||
}
|
||||
// get the new slice id for this shard
|
||||
int shardSlice = id / numShards;
|
||||
|
||||
return useTermQuery ?
|
||||
new TermsSliceQuery(field, shardSlice, numSlicesInShard) :
|
||||
new DocValuesSliceQuery(field, shardSlice, numSlicesInShard);
|
||||
}
|
||||
// the number of shards is greater than the number of slices
|
||||
|
||||
// check if the shard is assigned to the slice
|
||||
int targetSlice = shardId % max;
|
||||
if (id != targetSlice) {
|
||||
// the shard is not part of this slice, we can skip it.
|
||||
return new MatchNoDocsQuery("this shard is not part of the slice");
|
||||
}
|
||||
return new MatchAllDocsQuery();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.apache.lucene.search.Query;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* An abstract {@link Query} that defines an hash function to partition the documents in multiple slices.
|
||||
*/
|
||||
public abstract class SliceQuery extends Query {
|
||||
private final String field;
|
||||
private final int id;
|
||||
private final int max;
|
||||
|
||||
/**
|
||||
* @param field The name of the field
|
||||
* @param id The id of the slice
|
||||
* @param max The maximum number of slices
|
||||
*/
|
||||
public SliceQuery(String field, int id, int max) {
|
||||
this.field = field;
|
||||
this.id = id;
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
// Returns true if the value matches the predicate
|
||||
protected final boolean contains(long value) {
|
||||
return Math.floorMod(value, max) == id;
|
||||
}
|
||||
|
||||
public String getField() {
|
||||
return field;
|
||||
}
|
||||
|
||||
public int getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public int getMax() {
|
||||
return max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (super.equals(o) == false) {
|
||||
return false;
|
||||
}
|
||||
SliceQuery that = (SliceQuery) o;
|
||||
return field.equals(that.field) && id == that.id && max == that.max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), field, id, max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(String f) {
|
||||
return getClass().getSimpleName() + "[field=" + field + ", id=" + id + ", max=" + max + "]";
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.Terms;
|
||||
import org.apache.lucene.index.TermsEnum;
|
||||
import org.apache.lucene.index.PostingsEnum;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.search.ConstantScoreWeight;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.ConstantScoreScorer;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.DocIdSetBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A {@link SliceQuery} that uses the terms dictionary of a field to do the slicing.
|
||||
*
|
||||
* <b>NOTE</b>: The cost of this filter is O(N*M) where N is the number of unique terms in the dictionary
|
||||
* and M is the average number of documents per term.
|
||||
* For each segment this filter enumerates the terms dictionary, computes the hash code for each term and fills
|
||||
* a bit set with the documents of all terms whose hash code matches the predicate.
|
||||
* <b>NOTE</b>: Documents with no value for that field are ignored.
|
||||
*/
|
||||
public final class TermsSliceQuery extends SliceQuery {
|
||||
public TermsSliceQuery(String field, int id, int max) {
|
||||
super(field, id, max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
|
||||
return new ConstantScoreWeight(this) {
|
||||
@Override
|
||||
public Scorer scorer(LeafReaderContext context) throws IOException {
|
||||
final DocIdSet disi = build(context.reader());
|
||||
final DocIdSetIterator leafIt = disi.iterator();
|
||||
return new ConstantScoreScorer(this, score(), leafIt);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a DocIdSet per segments containing the matching docs for the specified slice.
|
||||
*/
|
||||
private DocIdSet build(LeafReader reader) throws IOException {
|
||||
final DocIdSetBuilder builder = new DocIdSetBuilder(reader.maxDoc());
|
||||
final Terms terms = reader.terms(getField());
|
||||
final TermsEnum te = terms.iterator();
|
||||
PostingsEnum docsEnum = null;
|
||||
for (BytesRef term = te.next(); term != null; term = te.next()) {
|
||||
int hashCode = term.hashCode();
|
||||
if (contains(hashCode)) {
|
||||
docsEnum = te.postings(docsEnum, PostingsEnum.NONE);
|
||||
int docId = docsEnum.nextDoc();
|
||||
while (docId != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
builder.add(docId);
|
||||
docId = docsEnum.nextDoc();
|
||||
}
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
|
@ -71,6 +71,7 @@ import org.elasticsearch.search.highlight.HighlightBuilderTests;
|
|||
import org.elasticsearch.search.rescore.QueryRescoreBuilderTests;
|
||||
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
|
||||
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
|
||||
import org.elasticsearch.search.slice.SliceBuilder;
|
||||
import org.elasticsearch.search.sort.FieldSortBuilder;
|
||||
import org.elasticsearch.search.sort.ScoreSortBuilder;
|
||||
import org.elasticsearch.search.sort.ScriptSortBuilder.ScriptSortType;
|
||||
|
@ -426,6 +427,16 @@ public class SearchSourceBuilderTests extends ESTestCase {
|
|||
xContentBuilder.endObject();
|
||||
builder.ext(xContentBuilder);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
String field = randomBoolean() ? null : randomAsciiOfLengthBetween(5, 20);
|
||||
int max = randomInt(1000);
|
||||
int id = randomInt(max-1);
|
||||
if (field == null) {
|
||||
builder.slice(new SliceBuilder(id, max));
|
||||
} else {
|
||||
builder.slice(new SliceBuilder(field, id, max));
|
||||
}
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.SortedNumericDocValuesField;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.LeafCollector;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.QueryUtils;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.NumericUtils;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DocValuesSliceQueryTests extends ESTestCase {
|
||||
|
||||
public void testBasics() {
|
||||
DocValuesSliceQuery query1 =
|
||||
new DocValuesSliceQuery("field1", 1, 10);
|
||||
DocValuesSliceQuery query2 =
|
||||
new DocValuesSliceQuery("field1", 1, 10);
|
||||
DocValuesSliceQuery query3 =
|
||||
new DocValuesSliceQuery("field2", 1, 10);
|
||||
DocValuesSliceQuery query4 =
|
||||
new DocValuesSliceQuery("field1", 2, 10);
|
||||
QueryUtils.check(query1);
|
||||
QueryUtils.checkEqual(query1, query2);
|
||||
QueryUtils.checkUnequal(query1, query3);
|
||||
QueryUtils.checkUnequal(query1, query4);
|
||||
}
|
||||
|
||||
public void testSearch() throws Exception {
|
||||
final int numDocs = randomIntBetween(100, 200);
|
||||
final Directory dir = newDirectory();
|
||||
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
|
||||
int max = randomIntBetween(2, 10);
|
||||
int[] sliceCounters1 = new int[max];
|
||||
int[] sliceCounters2 = new int[max];
|
||||
Set<String> keys = new HashSet<>();
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
Document doc = new Document();
|
||||
String uuid = UUIDs.base64UUID();
|
||||
int intValue = randomInt();
|
||||
long doubleValue = NumericUtils.doubleToSortableLong(randomDouble());
|
||||
doc.add(new StringField("uuid", uuid, Field.Store.YES));
|
||||
doc.add(new SortedNumericDocValuesField("intField", intValue));
|
||||
doc.add(new SortedNumericDocValuesField("doubleField", doubleValue));
|
||||
w.addDocument(doc);
|
||||
sliceCounters1[Math.floorMod(Long.hashCode(intValue), max)] ++;
|
||||
sliceCounters2[Math.floorMod(Long.hashCode(doubleValue), max)] ++;
|
||||
keys.add(uuid);
|
||||
}
|
||||
final IndexReader reader = w.getReader();
|
||||
final IndexSearcher searcher = newSearcher(reader);
|
||||
|
||||
for (int id = 0; id < max; id++) {
|
||||
DocValuesSliceQuery query1 =
|
||||
new DocValuesSliceQuery("intField", id, max);
|
||||
assertThat(searcher.count(query1), equalTo(sliceCounters1[id]));
|
||||
|
||||
DocValuesSliceQuery query2 =
|
||||
new DocValuesSliceQuery("doubleField", id, max);
|
||||
assertThat(searcher.count(query2), equalTo(sliceCounters2[id]));
|
||||
searcher.search(query1, new Collector() {
|
||||
@Override
|
||||
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
|
||||
return new LeafCollector() {
|
||||
@Override
|
||||
public void setScorer(Scorer scorer) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
Document d = context.reader().document(doc, Collections.singleton("uuid"));
|
||||
String uuid = d.get("uuid");
|
||||
assertThat(keys.contains(uuid), equalTo(true));
|
||||
keys.remove(uuid);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsScores() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
assertThat(keys.size(), equalTo(0));
|
||||
w.close();
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.search.Scroll;
|
||||
import org.elasticsearch.search.SearchContextException;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class SearchSliceIT extends ESIntegTestCase {
|
||||
private static final int NUM_DOCS = 1000;
|
||||
|
||||
private int setupIndex(boolean withDocs) throws IOException, ExecutionException, InterruptedException {
|
||||
String mapping = XContentFactory.jsonBuilder().
|
||||
startObject()
|
||||
.startObject("type")
|
||||
.startObject("properties")
|
||||
.startObject("invalid_random_kw")
|
||||
.field("type", "keyword")
|
||||
.field("doc_values", "false")
|
||||
.endObject()
|
||||
.startObject("random_int")
|
||||
.field("type", "integer")
|
||||
.field("doc_values", "true")
|
||||
.endObject()
|
||||
.startObject("invalid_random_int")
|
||||
.field("type", "integer")
|
||||
.field("doc_values", "false")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject().string();
|
||||
int numberOfShards = randomIntBetween(1, 7);
|
||||
assertAcked(client().admin().indices().prepareCreate("test")
|
||||
.setSettings("number_of_shards", numberOfShards)
|
||||
.addMapping("type", mapping));
|
||||
ensureGreen();
|
||||
|
||||
if (withDocs == false) {
|
||||
return numberOfShards;
|
||||
}
|
||||
|
||||
List<IndexRequestBuilder> requests = new ArrayList<>();
|
||||
for (int i = 0; i < NUM_DOCS; i++) {
|
||||
XContentBuilder builder = jsonBuilder();
|
||||
builder.startObject();
|
||||
builder.field("invalid_random_kw", randomAsciiOfLengthBetween(5, 20));
|
||||
builder.field("random_int", randomInt());
|
||||
builder.field("static_int", 0);
|
||||
builder.field("invalid_random_int", randomInt());
|
||||
builder.endObject();
|
||||
requests.add(client().prepareIndex("test", "test").setSource(builder));
|
||||
}
|
||||
indexRandom(true, requests);
|
||||
return numberOfShards;
|
||||
}
|
||||
|
||||
public void testDocIdSort() throws Exception {
|
||||
int numShards = setupIndex(true);
|
||||
SearchResponse sr = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setSize(0)
|
||||
.get();
|
||||
int numDocs = (int) sr.getHits().getTotalHits();
|
||||
assertThat(numDocs, equalTo(NUM_DOCS));
|
||||
int max = randomIntBetween(2, numShards*3);
|
||||
for (String field : new String[]{"_uid", "random_int", "static_int"}) {
|
||||
int fetchSize = randomIntBetween(10, 100);
|
||||
SearchRequestBuilder request = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.setSize(fetchSize)
|
||||
.addSort(SortBuilders.fieldSort("_doc"));
|
||||
assertSearchSlicesWithScroll(request, field, max);
|
||||
}
|
||||
}
|
||||
|
||||
public void testNumericSort() throws Exception {
|
||||
int numShards = setupIndex(true);
|
||||
SearchResponse sr = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setSize(0)
|
||||
.get();
|
||||
int numDocs = (int) sr.getHits().getTotalHits();
|
||||
assertThat(numDocs, equalTo(NUM_DOCS));
|
||||
|
||||
int max = randomIntBetween(2, numShards*3);
|
||||
for (String field : new String[]{"_uid", "random_int", "static_int"}) {
|
||||
int fetchSize = randomIntBetween(10, 100);
|
||||
SearchRequestBuilder request = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.addSort(SortBuilders.fieldSort("random_int"))
|
||||
.setSize(fetchSize);
|
||||
assertSearchSlicesWithScroll(request, field, max);
|
||||
}
|
||||
}
|
||||
|
||||
public void testInvalidFields() throws Exception {
|
||||
setupIndex(false);
|
||||
SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class,
|
||||
() -> client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.slice(new SliceBuilder("invalid_random_int", 0, 10))
|
||||
.get());
|
||||
Throwable rootCause = findRootCause(exc);
|
||||
assertThat(rootCause.getClass(), equalTo(IllegalArgumentException.class));
|
||||
assertThat(rootCause.getMessage(),
|
||||
startsWith("cannot load numeric doc values"));
|
||||
|
||||
exc = expectThrows(SearchPhaseExecutionException.class, () -> client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.slice(new SliceBuilder("invalid_random_kw", 0, 10))
|
||||
.get());
|
||||
rootCause = findRootCause(exc);
|
||||
assertThat(rootCause.getClass(), equalTo(IllegalArgumentException.class));
|
||||
assertThat(rootCause.getMessage(),
|
||||
startsWith("cannot load numeric doc values"));
|
||||
}
|
||||
|
||||
public void testInvalidQuery() throws Exception {
|
||||
setupIndex(false);
|
||||
SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class,
|
||||
() -> client().prepareSearch()
|
||||
.setQuery(matchAllQuery())
|
||||
.slice(new SliceBuilder("invalid_random_int", 0, 10))
|
||||
.get());
|
||||
Throwable rootCause = findRootCause(exc);
|
||||
assertThat(rootCause.getClass(), equalTo(SearchContextException.class));
|
||||
assertThat(rootCause.getMessage(),
|
||||
equalTo("`slice` cannot be used outside of a scroll context"));
|
||||
}
|
||||
|
||||
private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice) {
|
||||
int totalResults = 0;
|
||||
List<String> keys = new ArrayList<>();
|
||||
for (int id = 0; id < numSlice; id++) {
|
||||
SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
|
||||
SearchResponse searchResponse = request.slice(sliceBuilder).get();
|
||||
totalResults += searchResponse.getHits().getHits().length;
|
||||
int expectedSliceResults = (int) searchResponse.getHits().getTotalHits();
|
||||
int numSliceResults = searchResponse.getHits().getHits().length;
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
for (SearchHit hit : searchResponse.getHits().getHits()) {
|
||||
keys.add(hit.getId());
|
||||
}
|
||||
while (searchResponse.getHits().getHits().length > 0) {
|
||||
searchResponse = client().prepareSearchScroll("test")
|
||||
.setScrollId(scrollId)
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.get();
|
||||
scrollId = searchResponse.getScrollId();
|
||||
totalResults += searchResponse.getHits().getHits().length;
|
||||
numSliceResults += searchResponse.getHits().getHits().length;
|
||||
for (SearchHit hit : searchResponse.getHits().getHits()) {
|
||||
keys.add(hit.getId());
|
||||
}
|
||||
}
|
||||
assertThat(numSliceResults, equalTo(expectedSliceResults));
|
||||
clearScroll(scrollId);
|
||||
}
|
||||
assertThat(totalResults, equalTo(NUM_DOCS));
|
||||
assertThat(keys.size(), equalTo(NUM_DOCS));
|
||||
assertThat(new HashSet(keys).size(), equalTo(NUM_DOCS));
|
||||
}
|
||||
|
||||
private Throwable findRootCause(Exception e) {
|
||||
Throwable ret = e;
|
||||
while (ret.getCause() != null) {
|
||||
ret = ret.getCause();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,340 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.DocValuesType;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lucene.search.MatchNoDocsQuery;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryParseContext;
|
||||
import org.elasticsearch.index.query.QueryParser;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class SliceBuilderTests extends ESTestCase {
|
||||
private static final int MAX_SLICE = 20;
|
||||
private static NamedWriteableRegistry namedWriteableRegistry;
|
||||
private static IndicesQueriesRegistry indicesQueriesRegistry;
|
||||
|
||||
/**
|
||||
* setup for the whole base test class
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void init() {
|
||||
namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
indicesQueriesRegistry = new IndicesQueriesRegistry();
|
||||
QueryParser<MatchAllQueryBuilder> parser = MatchAllQueryBuilder::fromXContent;
|
||||
indicesQueriesRegistry.register(parser, MatchAllQueryBuilder.QUERY_NAME_FIELD);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
namedWriteableRegistry = null;
|
||||
indicesQueriesRegistry = null;
|
||||
}
|
||||
|
||||
private final SliceBuilder randomSliceBuilder() throws IOException {
|
||||
int max = randomIntBetween(2, MAX_SLICE);
|
||||
if (max == 0) max++;
|
||||
int id = randomInt(max - 1);
|
||||
String field = randomAsciiOfLengthBetween(5, 20);
|
||||
return new SliceBuilder(field, id, max);
|
||||
}
|
||||
|
||||
private static SliceBuilder serializedCopy(SliceBuilder original) throws IOException {
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
original.writeTo(output);
|
||||
try (StreamInput in =
|
||||
new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
|
||||
return new SliceBuilder(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testSerialization() throws Exception {
|
||||
SliceBuilder original = randomSliceBuilder();
|
||||
SliceBuilder deserialized = serializedCopy(original);
|
||||
assertEquals(deserialized, original);
|
||||
assertEquals(deserialized.hashCode(), original.hashCode());
|
||||
assertNotSame(deserialized, original);
|
||||
}
|
||||
|
||||
public void testEqualsAndHashcode() throws Exception {
|
||||
SliceBuilder firstBuilder = randomSliceBuilder();
|
||||
assertFalse("sliceBuilder is equal to null", firstBuilder.equals(null));
|
||||
assertFalse("sliceBuilder is equal to incompatible type", firstBuilder.equals(""));
|
||||
assertTrue("sliceBuilder is not equal to self", firstBuilder.equals(firstBuilder));
|
||||
assertThat("same searchFrom's hashcode returns different values if called multiple times",
|
||||
firstBuilder.hashCode(), equalTo(firstBuilder.hashCode()));
|
||||
|
||||
SliceBuilder secondBuilder = serializedCopy(firstBuilder);
|
||||
assertTrue("sliceBuilder is not equal to self", secondBuilder.equals(secondBuilder));
|
||||
assertTrue("sliceBuilder is not equal to its copy", firstBuilder.equals(secondBuilder));
|
||||
assertTrue("equals is not symmetric", secondBuilder.equals(firstBuilder));
|
||||
assertThat("sliceBuilder copy's hashcode is different from original hashcode", secondBuilder.hashCode(),
|
||||
equalTo(firstBuilder.hashCode()));
|
||||
SliceBuilder thirdBuilder = serializedCopy(secondBuilder);
|
||||
assertTrue("sliceBuilder is not equal to self", thirdBuilder.equals(thirdBuilder));
|
||||
assertTrue("sliceBuilder is not equal to its copy", secondBuilder.equals(thirdBuilder));
|
||||
assertThat("sliceBuilder copy's hashcode is different from original hashcode", secondBuilder.hashCode(),
|
||||
equalTo(thirdBuilder.hashCode()));
|
||||
assertTrue("equals is not transitive", firstBuilder.equals(thirdBuilder));
|
||||
assertThat("sliceBuilder copy's hashcode is different from original hashcode", firstBuilder.hashCode(),
|
||||
equalTo(thirdBuilder.hashCode()));
|
||||
assertTrue("sliceBuilder is not symmetric", thirdBuilder.equals(secondBuilder));
|
||||
assertTrue("sliceBuilder is not symmetric", thirdBuilder.equals(firstBuilder));
|
||||
}
|
||||
|
||||
public void testFromXContent() throws Exception {
|
||||
SliceBuilder sliceBuilder = randomSliceBuilder();
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
|
||||
if (randomBoolean()) {
|
||||
builder.prettyPrint();
|
||||
}
|
||||
builder.startObject();
|
||||
sliceBuilder.innerToXContent(builder);
|
||||
builder.endObject();
|
||||
XContentParser parser = XContentHelper.createParser(shuffleXContent(builder).bytes());
|
||||
QueryParseContext context = new QueryParseContext(indicesQueriesRegistry, parser,
|
||||
ParseFieldMatcher.STRICT);
|
||||
SliceBuilder secondSliceBuilder = SliceBuilder.fromXContent(context);
|
||||
assertNotSame(sliceBuilder, secondSliceBuilder);
|
||||
assertEquals(sliceBuilder, secondSliceBuilder);
|
||||
assertEquals(sliceBuilder.hashCode(), secondSliceBuilder.hashCode());
|
||||
}
|
||||
|
||||
public void testInvalidArguments() throws Exception {
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", -1, 10));
|
||||
assertEquals(e.getMessage(), "id must be greater than or equal to 0");
|
||||
|
||||
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, -1));
|
||||
assertEquals(e.getMessage(), "max must be greater than 1");
|
||||
|
||||
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 0));
|
||||
assertEquals(e.getMessage(), "max must be greater than 1");
|
||||
|
||||
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 5));
|
||||
assertEquals(e.getMessage(), "max must be greater than id");
|
||||
|
||||
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1000, 1000));
|
||||
assertEquals(e.getMessage(), "max must be greater than id");
|
||||
e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1001, 1000));
|
||||
assertEquals(e.getMessage(), "max must be greater than id");
|
||||
}
|
||||
|
||||
public void testToFilter() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
|
||||
writer.commit();
|
||||
}
|
||||
QueryShardContext context = mock(QueryShardContext.class);
|
||||
try (IndexReader reader = DirectoryReader.open(dir)) {
|
||||
MappedFieldType fieldType = new MappedFieldType() {
|
||||
@Override
|
||||
public MappedFieldType clone() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, @Nullable QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
fieldType.setName(UidFieldMapper.NAME);
|
||||
fieldType.setHasDocValues(false);
|
||||
when(context.fieldMapper(UidFieldMapper.NAME)).thenReturn(fieldType);
|
||||
when(context.getIndexReader()).thenReturn(reader);
|
||||
SliceBuilder builder = new SliceBuilder(5, 10);
|
||||
Query query = builder.toFilter(context, 0, 1);
|
||||
assertThat(query, instanceOf(TermsSliceQuery.class));
|
||||
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
try (IndexReader newReader = DirectoryReader.open(dir)) {
|
||||
when(context.getIndexReader()).thenReturn(newReader);
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
}
|
||||
}
|
||||
|
||||
try (IndexReader reader = DirectoryReader.open(dir)) {
|
||||
MappedFieldType fieldType = new MappedFieldType() {
|
||||
@Override
|
||||
public MappedFieldType clone() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, @Nullable QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
fieldType.setName("field_doc_values");
|
||||
fieldType.setHasDocValues(true);
|
||||
fieldType.setDocValuesType(DocValuesType.SORTED_NUMERIC);
|
||||
when(context.fieldMapper("field_doc_values")).thenReturn(fieldType);
|
||||
when(context.getIndexReader()).thenReturn(reader);
|
||||
IndexNumericFieldData fd = mock(IndexNumericFieldData.class);
|
||||
when(context.getForField(fieldType)).thenReturn(fd);
|
||||
SliceBuilder builder = new SliceBuilder("field_doc_values", 5, 10);
|
||||
Query query = builder.toFilter(context, 0, 1);
|
||||
assertThat(query, instanceOf(DocValuesSliceQuery.class));
|
||||
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
try (IndexReader newReader = DirectoryReader.open(dir)) {
|
||||
when(context.getIndexReader()).thenReturn(newReader);
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
}
|
||||
|
||||
// numSlices > numShards
|
||||
int numSlices = randomIntBetween(10, 100);
|
||||
int numShards = randomIntBetween(1, 9);
|
||||
Map<Integer, AtomicInteger> numSliceMap = new HashMap<>();
|
||||
for (int i = 0; i < numSlices; i++) {
|
||||
for (int j = 0; j < numShards; j++) {
|
||||
SliceBuilder slice = new SliceBuilder("_uid", i, numSlices);
|
||||
Query q = slice.toFilter(context, j, numShards);
|
||||
if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) {
|
||||
AtomicInteger count = numSliceMap.get(j);
|
||||
if (count == null) {
|
||||
count = new AtomicInteger(0);
|
||||
numSliceMap.put(j, count);
|
||||
}
|
||||
count.incrementAndGet();
|
||||
if (q instanceof MatchAllDocsQuery) {
|
||||
assertThat(count.get(), equalTo(1));
|
||||
}
|
||||
} else {
|
||||
assertThat(q, instanceOf(MatchNoDocsQuery.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
int total = 0;
|
||||
for (Map.Entry<Integer, AtomicInteger> e : numSliceMap.entrySet()) {
|
||||
total += e.getValue().get();
|
||||
}
|
||||
assertThat(total, equalTo(numSlices));
|
||||
|
||||
// numShards > numSlices
|
||||
numShards = randomIntBetween(3, 100);
|
||||
numSlices = randomInt(numShards-1);
|
||||
List<Integer> targetShards = new ArrayList<>();
|
||||
for (int i = 0; i < numSlices; i++) {
|
||||
for (int j = 0; j < numShards; j++) {
|
||||
SliceBuilder slice = new SliceBuilder("_uid", i, numSlices);
|
||||
Query q = slice.toFilter(context, j, numShards);
|
||||
if (q instanceof MatchNoDocsQuery == false) {
|
||||
assertThat(q, instanceOf(MatchAllDocsQuery.class));
|
||||
targetShards.add(j);
|
||||
}
|
||||
}
|
||||
}
|
||||
assertThat(targetShards.size(), equalTo(numShards));
|
||||
assertThat(new HashSet<>(targetShards).size(), equalTo(numShards));
|
||||
|
||||
// numShards == numSlices
|
||||
numShards = randomIntBetween(2, 10);
|
||||
numSlices = numShards;
|
||||
for (int i = 0; i < numSlices; i++) {
|
||||
for (int j = 0; j < numShards; j++) {
|
||||
SliceBuilder slice = new SliceBuilder("_uid", i, numSlices);
|
||||
Query q = slice.toFilter(context, j, numShards);
|
||||
if (i == j) {
|
||||
assertThat(q, instanceOf(MatchAllDocsQuery.class));
|
||||
} else {
|
||||
assertThat(q, instanceOf(MatchNoDocsQuery.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try (IndexReader reader = DirectoryReader.open(dir)) {
|
||||
MappedFieldType fieldType = new MappedFieldType() {
|
||||
@Override
|
||||
public MappedFieldType clone() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, @Nullable QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
fieldType.setName("field_without_doc_values");
|
||||
when(context.fieldMapper("field_without_doc_values")).thenReturn(fieldType);
|
||||
when(context.getIndexReader()).thenReturn(reader);
|
||||
SliceBuilder builder = new SliceBuilder("field_without_doc_values", 5, 10);
|
||||
IllegalArgumentException exc =
|
||||
expectThrows(IllegalArgumentException.class, () -> builder.toFilter(context, 0, 1));
|
||||
assertThat(exc.getMessage(), containsString("cannot load numeric doc values"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.apache.lucene.analysis.core.KeywordAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.LeafCollector;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.QueryUtils;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class TermsSliceQueryTests extends ESTestCase {
|
||||
|
||||
public void testBasics() {
|
||||
TermsSliceQuery query1 =
|
||||
new TermsSliceQuery("field1", 1, 10);
|
||||
TermsSliceQuery query2 =
|
||||
new TermsSliceQuery("field1", 1, 10);
|
||||
TermsSliceQuery query3 =
|
||||
new TermsSliceQuery("field2", 1, 10);
|
||||
TermsSliceQuery query4 =
|
||||
new TermsSliceQuery("field1", 2, 10);
|
||||
QueryUtils.check(query1);
|
||||
QueryUtils.checkEqual(query1, query2);
|
||||
QueryUtils.checkUnequal(query1, query3);
|
||||
QueryUtils.checkUnequal(query1, query4);
|
||||
}
|
||||
|
||||
public void testSearch() throws Exception {
|
||||
final int numDocs = randomIntBetween(100, 200);
|
||||
final Directory dir = newDirectory();
|
||||
final RandomIndexWriter w = new RandomIndexWriter(random(), dir, new KeywordAnalyzer());
|
||||
int max = randomIntBetween(2, 10);
|
||||
int[] sliceCounters = new int[max];
|
||||
Set<String> keys = new HashSet<>();
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
Document doc = new Document();
|
||||
String uuid = UUIDs.base64UUID();
|
||||
BytesRef br = new BytesRef(uuid);
|
||||
int id = Math.floorMod(br.hashCode(), max);
|
||||
sliceCounters[id] ++;
|
||||
doc.add(new StringField("uuid", uuid, Field.Store.YES));
|
||||
w.addDocument(doc);
|
||||
keys.add(uuid);
|
||||
}
|
||||
final IndexReader reader = w.getReader();
|
||||
final IndexSearcher searcher = newSearcher(reader);
|
||||
|
||||
for (int id = 0; id < max; id++) {
|
||||
TermsSliceQuery query1 =
|
||||
new TermsSliceQuery("uuid", id, max);
|
||||
assertThat(searcher.count(query1), equalTo(sliceCounters[id]));
|
||||
searcher.search(query1, new Collector() {
|
||||
@Override
|
||||
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
|
||||
return new LeafCollector() {
|
||||
@Override
|
||||
public void setScorer(Scorer scorer) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
Document d = context.reader().document(doc, Collections.singleton("uuid"));
|
||||
String uuid = d.get("uuid");
|
||||
assertThat(keys.contains(uuid), equalTo(true));
|
||||
keys.remove(uuid);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsScores() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
assertThat(keys.size(), equalTo(0));
|
||||
w.close();
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
}
|
|
@ -175,3 +175,92 @@ curl -XDELETE localhost:9200/_search/scroll \
|
|||
-d 'c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1,aGVuRmV0Y2g7NTsxOnkxaDZ'
|
||||
---------------------------------------
|
||||
|
||||
|
||||
==== Sliced Scroll
|
||||
|
||||
For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which
|
||||
can be consumed independently:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '
|
||||
{
|
||||
"slice": {
|
||||
"id": 0, <1>
|
||||
"max": 2 <2>
|
||||
},
|
||||
"query": {
|
||||
"match" : {
|
||||
"title" : "elasticsearch"
|
||||
}
|
||||
}
|
||||
}
|
||||
'
|
||||
|
||||
curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '
|
||||
{
|
||||
"slice": {
|
||||
"id": 1,
|
||||
"max": 2
|
||||
},
|
||||
"query": {
|
||||
"match" : {
|
||||
"title" : "elasticsearch"
|
||||
}
|
||||
}
|
||||
}
|
||||
'
|
||||
--------------------------------------------------
|
||||
|
||||
<1> The id of the slice
|
||||
<2> The maximum number of slices
|
||||
|
||||
The result from the first request returned documents that belong to the first slice (id: 0) and the result from the
|
||||
second request returned documents that belong to the second slice. Since the maximum number of slices is set to 2
|
||||
the union of the results of the two requests is equivalent to the results of a scroll query without slicing.
|
||||
By default the splitting is done on the shards first and then locally on each shard using the _uid field
|
||||
with the following formula:
|
||||
`slice(doc) = floorMod(hashCode(doc._uid), max)`
|
||||
For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned
|
||||
to the first shard and the slices 1 and 3 are assigned to the second shard.
|
||||
|
||||
Each scroll is independent and can be processed in parallel like any scroll request.
|
||||
|
||||
NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals
|
||||
to N bits per slice where N is the total number of documents in the shard.
|
||||
After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of
|
||||
sliced query you perform in parallel to avoid the memory explosion.
|
||||
|
||||
To avoid this cost entirely it is possible to use the `doc_values` of another field to do the slicing
|
||||
but the user must ensure that the field has the following properties:
|
||||
|
||||
* The field is numeric.
|
||||
|
||||
* `doc_values` are enabled on that field
|
||||
|
||||
* Every document should contain a single value. If a document has multiple values for the specified field, the first value is used.
|
||||
|
||||
* The value for each document should be set once when the document is created and never updated. This ensures that each
|
||||
slice gets deterministic results.
|
||||
|
||||
* The cardinality of the field should be high. This ensures that each slice gets approximately the same amount of documents.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '
|
||||
{
|
||||
"slice": {
|
||||
"field": "my_random_integer_field",
|
||||
"id": 0,
|
||||
"max": 10
|
||||
},
|
||||
"query": {
|
||||
"match" : {
|
||||
"title" : "elasticsearch"
|
||||
}
|
||||
}
|
||||
}
|
||||
'
|
||||
--------------------------------------------------
|
||||
|
||||
For append only time-based indices, the `timestamp` field can be used safely.
|
Loading…
Reference in New Issue