Added scoring support to percolate api

Scoring support will allow the percolate matches to be sorted, or just assign a scores to percolate matches. Sorting by score can be very useful when millions of matching percolate queries are being returned.

The scoring support hooks in into the percolate query option and adds two new boolean options:
* `sort` - Whether to sort the matches based on the score. This will also include the score for each match. The `size` option is a required option when sorting percolate matches is enabled.
* `score` - Whether to compute the score and include it with each match. This will not sort the matches.

For both new options the `query` option needs to be specified, which is used to produce the scores. The `query` option is normally used to control which percolate queries are evaluated. In order to give meaning to these scores, the recently added `function_score` query in #3423 can be used to wrap the percolate query, this way the scores have meaning.

Closes #3506
This commit is contained in:
Martijn van Groningen 2013-08-14 13:50:46 +02:00
parent 32cdddb671
commit 691ac8e105
8 changed files with 652 additions and 180 deletions

View File

@ -95,6 +95,24 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<Pe
return this;
}
/**
* Limits the maximum number of percolate query matches to be returned.
*/
public PercolateRequestBuilder setSize(int size) {
sourceBuilder().setSize(size);
return this;
}
public PercolateRequestBuilder setSort(boolean sort) {
sourceBuilder().setSort(sort);
return this;
}
public PercolateRequestBuilder setScore(boolean score) {
sourceBuilder().setScore(score);
return this;
}
public PercolateRequestBuilder setSource(PercolateSourceBuilder source) {
sourceBuilder = source;
return this;
@ -160,14 +178,6 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<Pe
return this;
}
/**
* Limits the maximum number of percolate query matches to be returned.
*/
public PercolateRequestBuilder setSize(int size) {
sourceBuilder().setSize(size);
return this;
}
private PercolateSourceBuilder sourceBuilder() {
if (sourceBuilder == null) {
sourceBuilder = new PercolateSourceBuilder();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.action.support.RestActions;
import java.io.IOException;
import java.util.Arrays;
@ -46,11 +47,15 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
private Match[] matches;
private long count;
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, Match[] matches, long count, long tookInMillis) {
private boolean hasScores;
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures,
Match[] matches, long count, long tookInMillis, boolean hasScores) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.tookInMillis = tookInMillis;
this.matches = matches;
this.count = count;
this.hasScores = hasScores;
}
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, long count, long tookInMillis) {
@ -58,11 +63,14 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
this.tookInMillis = tookInMillis;
this.matches = EMPTY;
this.count = count;
this.hasScores = false;
}
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, long tookInMillis) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.tookInMillis = tookInMillis;
this.matches = EMPTY;
this.hasScores = false;
}
PercolateResponse() {
@ -104,23 +112,7 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
builder.startObject();
builder.field(Fields.TOOK, tookInMillis);
builder.startObject(Fields._SHARDS);
builder.field(Fields.TOTAL, getTotalShards());
builder.field(Fields.SUCCESSFUL, getSuccessfulShards());
builder.field(Fields.FAILED, getFailedShards());
if (getShardFailures().length > 0) {
builder.startArray(Fields.FAILURES);
for (ShardOperationFailedException shardFailure : getShardFailures()) {
builder.startObject();
builder.field(Fields.INDEX, shardFailure.index());
builder.field(Fields.SHARD, shardFailure.shardId());
builder.field(Fields.STATUS, shardFailure.status().getStatus());
builder.field(Fields.REASON, shardFailure.reason());
builder.endObject();
}
builder.endArray();
}
builder.endObject();
RestActions.buildBroadcastShardsHeader(builder, this);
builder.field(Fields.TOTAL, count);
if (matches.length != 0) {
@ -135,6 +127,9 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
builder.startObject();
builder.field(Fields._INDEX, match.getIndex());
builder.field(Fields._ID, match.getId());
if (hasScores) {
builder.field(Fields._SCORE, match.getScore());
}
builder.endObject();
}
}
@ -156,6 +151,7 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
matches[i] = new Match();
matches[i].readFrom(in);
}
hasScores = in.readBoolean();
}
@Override
@ -167,64 +163,70 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
for (Match match : matches) {
match.writeTo(out);
}
out.writeBoolean(hasScores);
}
public static class Match implements Streamable {
private Text id;
private Text index;
private Text id;
private float score;
public Match(Text id, Text index) {
public Match(Text index, Text id, float score) {
this.id = id;
this.score = score;
this.index = index;
}
Match() {
}
public Text id() {
return id;
}
public Text index() {
return index;
}
public Text getId() {
public Text id() {
return id;
}
public float score() {
return score;
}
public Text getIndex() {
return index;
return index();
}
public Text getId() {
return id();
}
public float getScore() {
return score();
}
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readText();
index = in.readText();
score = in.readFloat();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeText(id);
out.writeText(index);
out.writeFloat(score);
}
}
static final class Fields {
static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString SUCCESSFUL = new XContentBuilderString("successful");
static final XContentBuilderString FAILED = new XContentBuilderString("failed");
static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
static final XContentBuilderString STATUS = new XContentBuilderString("status");
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString SHARD = new XContentBuilderString("shard");
static final XContentBuilderString REASON = new XContentBuilderString("reason");
static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString MATCHES = new XContentBuilderString("matches");
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _SCORE = new XContentBuilderString("_score");
}
}

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.percolate;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
@ -14,40 +33,69 @@ import java.io.IOException;
public class PercolateShardResponse extends BroadcastShardOperationResponse {
private long count;
private float[] scores;
private Text[] matches;
// Request fields:
private boolean limit;
private int requestedSize;
private boolean sort;
private boolean score;
public PercolateShardResponse() {
}
public PercolateShardResponse(Text[] matches, long count, float[] scores, PercolatorService.PercolateContext context, String index, int shardId) {
super(index, shardId);
this.matches = matches;
this.count = count;
this.scores = scores;
this.limit = context.limit;
this.requestedSize = context.size;
this.sort = context.sort;
this.score = context.score;
}
public PercolateShardResponse(Text[] matches, long count, PercolatorService.PercolateContext context, String index, int shardId) {
super(index, shardId);
this.matches = matches;
this.scores = new float[0];
this.count = count;
this.limit = context.limit;
this.requestedSize = context.size;
this.sort = context.sort;
this.score = context.score;
}
public PercolateShardResponse(long count, PercolatorService.PercolateContext context, String index, int shardId) {
super(index, shardId);
this.count = count;
this.matches = StringText.EMPTY_ARRAY;
this.scores = new float[0];
this.limit = context.limit;
this.requestedSize = context.size;
this.sort = context.sort;
this.score = context.score;
}
public PercolateShardResponse(String index, int shardId) {
public PercolateShardResponse(PercolatorService.PercolateContext context, String index, int shardId) {
super(index, shardId);
this.matches = StringText.EMPTY_ARRAY;
this.scores = new float[0];
this.limit = context.limit;
this.requestedSize = context.size;
this.sort = context.sort;
this.score = context.score;
}
public Text[] matches() {
return matches;
}
public float[] scores() {
return scores;
}
public long count() {
return count;
}
@ -60,14 +108,27 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
return requestedSize;
}
public boolean sort() {
return sort;
}
public boolean score() {
return score;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
count = in.readVLong();
matches = in.readTextArray();
scores = new float[in.readVInt()];
for (int i = 0; i < scores.length; i++) {
scores[i] = in.readFloat();
}
limit = in.readBoolean();
requestedSize = in.readVInt();
sort = in.readBoolean();
}
@Override
@ -75,8 +136,13 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
super.writeTo(out);
out.writeVLong(count);
out.writeTextArray(matches);
out.writeVLong(scores.length);
for (float score : scores) {
out.writeFloat(score);
}
out.writeBoolean(limit);
out.writeVLong(requestedSize);
out.writeBoolean(sort);
}
}

View File

@ -39,6 +39,8 @@ public class PercolateSourceBuilder implements ToXContent {
private QueryBuilder queryBuilder;
private FilterBuilder filterBuilder;
private Integer size;
private Boolean sort;
private Boolean score;
public DocBuilder percolateDocument() {
if (docBuilder == null) {
@ -79,6 +81,16 @@ public class PercolateSourceBuilder implements ToXContent {
return this;
}
public PercolateSourceBuilder setSort(boolean sort) {
this.sort = sort;
return this;
}
public PercolateSourceBuilder setScore(boolean score) {
this.score = score;
return this;
}
public BytesReference buildAsBytes(XContentType contentType) throws SearchSourceBuilderException {
try {
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
@ -106,6 +118,12 @@ public class PercolateSourceBuilder implements ToXContent {
if (size != null) {
builder.field("size", size);
}
if (sort != null) {
builder.field("sort", sort);
}
if (score != null) {
builder.field("score", score);
}
builder.endObject();
return builder;
}

View File

@ -164,34 +164,83 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalCount, tookInMillis
);
} else {
long finalCount = 0;
long foundMatches = 0;
int numMatches = 0;
for (PercolateShardResponse response : shardResults) {
finalCount += response.count();
foundMatches += response.count();
numMatches += response.matches().length;
}
int requestedSize = shardResults.get(0).requestedSize();
boolean limit = shardResults.get(0).limit();
boolean sort = shardResults.get(0).sort();
boolean matchesScored = shardResults.get(0).score();
if (limit) {
requestedSize = (int) Math.min(requestedSize, finalCount);
requestedSize = Math.min(requestedSize, numMatches);
} else {
// Serializing more than Integer.MAX_VALUE seems insane to me...
requestedSize = (int) finalCount;
requestedSize = numMatches;
}
// Use a custom impl of AbstractBigArray for Object[]?
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(requestedSize);
outer: for (PercolateShardResponse response : shardResults) {
Text index = new StringText(response.getIndex());
for (Text id : response.matches()) {
finalMatches.add(new PercolateResponse.Match(id, index));
if (requestedSize != 0 && finalMatches.size() == requestedSize) {
break outer;
if (sort) {
if (shardResults.size() == 1) {
PercolateShardResponse response = shardResults.get(0);
Text index = new StringText(response.getIndex());
for (int i = 0; i < response.matches().length; i++) {
float score = response.scores().length == 0 ? Float.NaN : response.scores()[i];
finalMatches.add(new PercolateResponse.Match(index, response.matches()[i], score));
}
} else {
int[] slots = new int[shardResults.size()];
while (true) {
float lowestScore = Float.NEGATIVE_INFINITY;
int requestIndex = 0;
int itemIndex = 0;
for (int i = 0; i < shardResults.size(); i++) {
int scoreIndex = slots[i];
float[] scores = shardResults.get(i).scores();
if (scoreIndex >= scores.length) {
continue;
}
float score = scores[scoreIndex];
int cmp = Float.compare(lowestScore, score);
if (cmp < 0) {
requestIndex = i;
itemIndex = scoreIndex;
lowestScore = score;
}
}
slots[requestIndex]++;
PercolateShardResponse shardResponse = shardResults.get(requestIndex);
Text index = new StringText(shardResponse.getIndex());
Text match = shardResponse.matches()[itemIndex];
float score = shardResponse.scores()[itemIndex];
finalMatches.add(new PercolateResponse.Match(index, match, score));
if (finalMatches.size() == requestedSize) {
break;
}
}
}
} else {
outer: for (PercolateShardResponse response : shardResults) {
Text index = new StringText(response.getIndex());
for (int i = 0; i < response.matches().length; i++) {
float score = response.scores().length == 0 ? 0f : response.scores()[i];
finalMatches.add(new PercolateResponse.Match(index, response.matches()[i], score));
if (requestedSize != 0 && finalMatches.size() == requestedSize) {
break outer;
}
}
}
}
return new PercolateResponse(
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalMatches.toArray(new PercolateResponse.Match[requestedSize]), finalCount, tookInMillis
shardsResponses.length(), successfulShards, failedShards, shardFailures,
finalMatches.toArray(new PercolateResponse.Match[requestedSize]), foundMatches, tookInMillis, sort || matchesScored
);
}
}
@ -220,11 +269,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
@Override
protected PercolateShardResponse shardOperation(PercolateShardRequest request) throws ElasticSearchException {
try {
if (request.onlyCount()) {
return percolatorService.countPercolate(request);
} else {
return percolatorService.matchPercolate(request);
}
return percolatorService.percolate(request);
} catch (Throwable t) {
logger.trace("[{}][{}] failed to percolate", t, request.index(), request.shardId());
ShardId shardId = new ShardId(request.index(), request.shardId());

View File

@ -19,14 +19,16 @@
package org.elasticsearch.percolator;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.memory.ExtendedMemoryIndex;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.percolate.PercolateShardResponse;
@ -35,7 +37,9 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.BytesText;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -43,15 +47,18 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.BytesValues;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import java.io.IOException;
import java.util.ArrayList;
@ -82,106 +89,32 @@ public class PercolatorService extends AbstractComponent {
};
}
public PercolateShardResponse matchPercolate(final PercolateShardRequest request) {
return preparePercolate(request, new PercolateAction() {
@Override
public PercolateShardResponse doPercolateAction(PercolateContext context) {
final List<Text> matches;
long count = 0;
if (context.query == null) {
matches = new ArrayList<Text>();
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
collector.reset();
try {
context.docSearcher.search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
}
if (collector.exists()) {
if (!context.limit || count < context.size) {
matches.add(entry.getKey());
}
count++;
}
}
} else {
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
try {
Match match = match(logger, context.percolateQueries, context.docSearcher, context.fieldDataService, context);
percolatorSearcher.searcher().search(context.query, match);
matches = match.matches();
count = match.counter();
} catch (IOException e) {
logger.debug("failed to execute", e);
throw new PercolateException(context.indexShard.shardId(), "failed to execute", e);
} finally {
percolatorSearcher.release();
}
}
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), count, context, request.index(), request.shardId());
}
});
}
public PercolateShardResponse countPercolate(final PercolateShardRequest request) {
return preparePercolate(request, new PercolateAction() {
@Override
public PercolateShardResponse doPercolateAction(PercolateContext context) {
long count = 0;
if (context.query == null) {
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
collector.reset();
try {
context.docSearcher.search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
}
if (collector.exists()) {
count++;
}
}
} else {
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
try {
Count countCollector = count(logger, context.percolateQueries, context.docSearcher, context.fieldDataService);
percolatorSearcher.searcher().search(context.query, countCollector);
count = countCollector.counter();
} catch (IOException e) {
logger.warn("failed to execute", e);
} finally {
percolatorSearcher.release();
}
}
return new PercolateShardResponse(count, context, request.index(), request.shardId());
}
});
}
private PercolateShardResponse preparePercolate(PercolateShardRequest request, PercolateAction action) {
public PercolateShardResponse percolate(PercolateShardRequest request) {
IndexService percolateIndexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = percolateIndexService.shardSafe(request.shardId());
ShardPercolateService shardPercolateService = indexShard.shardPercolateService();
shardPercolateService.prePercolate();
long startTime = System.nanoTime();
try {
ConcurrentMap<Text, Query> percolateQueries = indexShard.percolateRegistry().percolateQueries();
if (percolateQueries.isEmpty()) {
return new PercolateShardResponse(request.index(), request.shardId());
final PercolateContext context = new PercolateContext();
context.percolateQueries = indexShard.percolateRegistry().percolateQueries();
context.indexShard = indexShard;
context.percolateIndexService = percolateIndexService;
ParsedDocument parsedDocument = parsePercolate(percolateIndexService, request, context);
if (context.percolateQueries.isEmpty()) {
return new PercolateShardResponse(context, request.index(), request.shardId());
}
final PercolateContext context = new PercolateContext();
context.percolateQueries = percolateQueries;
context.indexShard = indexShard;
ParsedDocument parsedDocument = parsePercolate(percolateIndexService, request, context);
if (request.docSource() != null && request.docSource().length() != 0) {
parsedDocument = parseFetchedDoc(request.docSource(), percolateIndexService, request.documentType());
} else if (parsedDocument == null) {
throw new ElasticSearchParseException("No doc to percolate in the request");
throw new ElasticSearchIllegalArgumentException("Nothing to percolate");
}
if (context.query == null && (context.score || context.sort)) {
throw new ElasticSearchIllegalArgumentException("Can't sort or score if no query is specified");
}
if (context.size < 0) {
@ -212,15 +145,28 @@ public class PercolatorService extends AbstractComponent {
}
}
PercolatorType action;
if (request.onlyCount()) {
action = context.query != null ? queryCountPercolator : countPercolator;
} else {
if (context.sort) {
action = topMatchingPercolator;
} else if (context.query != null) {
action = context.score ? scoringPercolator : queryPercolator;
} else {
action = matchPercolator;
}
}
context.docSearcher = memoryIndex.createSearcher();
context.fieldDataService = percolateIndexService.fieldData();
context.fieldData = percolateIndexService.fieldData();
IndexCache indexCache = percolateIndexService.cache();
try {
return action.doPercolateAction(context);
return action.doPercolate(request, context);
} finally {
// explicitly clear the reader, since we can only register on callback on SegmentReader
indexCache.clear(context.docSearcher.getIndexReader());
context.fieldDataService.clear(context.docSearcher.getIndexReader());
context.fieldData.clear(context.docSearcher.getIndexReader());
}
} finally {
memoryIndex.reset();
@ -238,6 +184,12 @@ public class PercolatorService extends AbstractComponent {
ParsedDocument doc = null;
XContentParser parser = null;
// Some queries (function_score query when for decay functions) rely on SearchContext being set:
SearchContext.setCurrent(new SearchContext(0,
new ShardSearchRequest().types(new String[0]),
null, context.indexShard.searcher(), context.percolateIndexService, context.indexShard,
null, null));
try {
parser = XContentFactory.xContent(source).createParser(source);
String currentFieldName = null;
@ -269,6 +221,8 @@ public class PercolatorService extends AbstractComponent {
Filter filter = documentIndexService.queryParserService().parseInnerFilter(parser).filter();
context.query = new XConstantScoreQuery(filter);
}
} else if (token == null) {
break;
} else if (token.isValue()) {
if ("size".equals(currentFieldName)) {
context.limit = true;
@ -276,14 +230,18 @@ public class PercolatorService extends AbstractComponent {
if (context.size < 0) {
throw new ElasticSearchParseException("size is set to [" + context.size + "] and is expected to be higher or equal to 0");
}
} else if ("sort".equals(currentFieldName)) {
context.sort = parser.booleanValue();
} else if ("score".equals(currentFieldName)) {
context.score = parser.booleanValue();
}
} else if (token == null) {
break;
}
}
} catch (IOException e) {
throw new ElasticSearchParseException("failed to parse request", e);
} finally {
SearchContext.current().release();
SearchContext.removeCurrent();
if (parser != null) {
parser.close();
}
@ -319,22 +277,175 @@ public class PercolatorService extends AbstractComponent {
cache.close();
}
interface PercolateAction {
interface PercolatorType {
PercolateShardResponse doPercolateAction(PercolateContext context);
PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context);
}
private final PercolatorType countPercolator = new PercolatorType() {
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
long count = 0;
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
collector.reset();
try {
context.docSearcher.search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
}
if (collector.exists()) {
count++;
}
}
return new PercolateShardResponse(count, context, request.index(), request.shardId());
}
};
private final PercolatorType queryCountPercolator = new PercolatorType() {
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
long count = 0;
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
try {
Count countCollector = count(logger, context);
queryBasedPercolating(percolatorSearcher, context, countCollector);
count = countCollector.counter();
} catch (IOException e) {
logger.warn("failed to execute", e);
} finally {
percolatorSearcher.release();
}
return new PercolateShardResponse(count, context, request.index(), request.shardId());
}
};
private final PercolatorType matchPercolator = new PercolatorType() {
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
long count = 0;
List<Text> matches = new ArrayList<Text>();
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
collector.reset();
try {
context.docSearcher.search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
}
if (collector.exists()) {
if (!context.limit || count < context.size) {
matches.add(entry.getKey());
}
count++;
}
}
return new PercolateShardResponse(matches.toArray(new Text[0]), count, context, request.index(), request.shardId());
}
};
private final PercolatorType queryPercolator = new PercolatorType() {
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
try {
Match match = match(logger, context);
queryBasedPercolating(percolatorSearcher, context, match);
List<Text> matches = match.matches();
long count = match.counter();
return new PercolateShardResponse(matches.toArray(new Text[0]), count, context, request.index(), request.shardId());
} catch (IOException e) {
logger.debug("failed to execute", e);
throw new PercolateException(context.indexShard.shardId(), "failed to execute", e);
} finally {
percolatorSearcher.release();
}
}
};
private final PercolatorType scoringPercolator = new PercolatorType() {
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
try {
MatchAndScore matchAndScore = matchAndScore(logger, context);
queryBasedPercolating(percolatorSearcher, context, matchAndScore);
Text[] matches = matchAndScore.matches().toArray(new Text[0]);
float[] scores = matchAndScore.scores().toArray();
long count = matchAndScore.counter();
return new PercolateShardResponse(matches, count, scores, context, request.index(), request.shardId());
} catch (IOException e) {
logger.debug("failed to execute", e);
throw new PercolateException(context.indexShard.shardId(), "failed to execute", e);
} finally {
percolatorSearcher.release();
}
}
};
private final PercolatorType topMatchingPercolator = new PercolatorType() {
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
try {
MatchAndSort matchAndSort = QueryCollector.matchAndSort(logger, context);
queryBasedPercolating(percolatorSearcher, context, matchAndSort);
TopDocs topDocs = matchAndSort.topDocs();
long count = topDocs.totalHits;
List<Text> matches = new ArrayList<Text>(topDocs.scoreDocs.length);
float[] scores = new float[topDocs.scoreDocs.length];
IndexFieldData uidFieldData = context.fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes")));
int i = 0;
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
int segmentIdx = ReaderUtil.subIndex(scoreDoc.doc, percolatorSearcher.reader().leaves());
AtomicReaderContext atomicReaderContext = percolatorSearcher.reader().leaves().get(segmentIdx);
BytesValues values = uidFieldData.load(atomicReaderContext).getBytesValues();
BytesRef uid = values.getValue(scoreDoc.doc - atomicReaderContext.docBase);
Text id = new BytesText(Uid.idFromUid(uid));
matches.add(id);
scores[i++] = scoreDoc.score;
}
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), count, scores, context, request.index(), request.shardId());
} catch (Exception e) {
logger.debug("failed to execute", e);
throw new PercolateException(context.indexShard.shardId(), "failed to execute", e);
} finally {
percolatorSearcher.release();
}
}
};
private static void queryBasedPercolating(Engine.Searcher percolatorSearcher, PercolateContext context, Collector collector) throws IOException {
Filter percolatorTypeFilter = context.percolateIndexService.mapperService().documentMapper(Constants.TYPE_NAME).typeFilter();
percolatorTypeFilter = context.percolateIndexService.cache().filter().cache(percolatorTypeFilter);
FilteredQuery query = new FilteredQuery(context.query, percolatorTypeFilter);
percolatorSearcher.searcher().search(query, collector);
}
public class PercolateContext {
public boolean limit;
public int size;
public boolean score;
public boolean sort;
Query query;
ConcurrentMap<Text, Query> percolateQueries;
IndexSearcher docSearcher;
IndexShard indexShard;
IndexFieldDataService fieldDataService;
IndexFieldDataService fieldData;
IndexService percolateIndexService;
}

View File

@ -1,10 +1,8 @@
package org.elasticsearch.percolator;
import gnu.trove.list.array.TFloatArrayList;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
@ -14,7 +12,6 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.fielddata.BytesValues;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
@ -37,12 +34,12 @@ abstract class QueryCollector extends Collector {
BytesValues values;
QueryCollector(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData) {
QueryCollector(ESLogger logger, PercolatorService.PercolateContext context) {
this.logger = logger;
this.queries = queries;
this.searcher = searcher;
this.queries = context.percolateQueries;
this.searcher = context.docSearcher;
// TODO: when we move to a UID level mapping def on the index level, we can use that one, now, its per type, and we can't easily choose one
this.uidFieldData = fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes")));
this.uidFieldData = context.fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes")));
}
@Override
@ -61,12 +58,20 @@ abstract class QueryCollector extends Collector {
}
static Match match(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, PercolatorService.PercolateContext context) {
return new Match(logger, queries, searcher, fieldData, context);
static Match match(ESLogger logger, PercolatorService.PercolateContext context) {
return new Match(logger, context);
}
static Count count(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData) {
return new Count(logger, queries, searcher, fieldData);
static Count count(ESLogger logger, PercolatorService.PercolateContext context) {
return new Count(logger, context);
}
static MatchAndScore matchAndScore(ESLogger logger, PercolatorService.PercolateContext context) {
return new MatchAndScore(logger, context);
}
static MatchAndSort matchAndSort(ESLogger logger, PercolatorService.PercolateContext context) {
return new MatchAndSort(logger, context);
}
final static class Match extends QueryCollector {
@ -76,8 +81,8 @@ abstract class QueryCollector extends Collector {
private final int size;
private long counter = 0;
Match(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, PercolatorService.PercolateContext context) {
super(logger, queries, searcher, fieldData);
Match(ESLogger logger, PercolatorService.PercolateContext context) {
super(logger, context);
this.limit = context.limit;
this.size = context.size;
}
@ -119,12 +124,126 @@ abstract class QueryCollector extends Collector {
}
final static class MatchAndSort extends QueryCollector {
private final TopScoreDocCollector topDocsCollector;
MatchAndSort(ESLogger logger, PercolatorService.PercolateContext context) {
super(logger, context);
// TODO: Use TopFieldCollector.create(...) for ascending and decending scoring?
topDocsCollector = TopScoreDocCollector.create(context.size, false);
}
@Override
public void collect(int doc) throws IOException {
BytesRef uid = values.getValue(doc);
if (uid == null) {
return;
}
Text id = new BytesText(Uid.idFromUid(uid));
Query query = queries.get(id);
if (query == null) {
// log???
return;
}
// run the query
try {
collector.reset();
searcher.search(query, collector);
if (collector.exists()) {
topDocsCollector.collect(doc);
}
} catch (IOException e) {
logger.warn("[" + id + "] failed to execute query", e);
}
}
@Override
public void setNextReader(AtomicReaderContext context) throws IOException {
super.setNextReader(context);
topDocsCollector.setNextReader(context);
}
@Override
public void setScorer(Scorer scorer) throws IOException {
topDocsCollector.setScorer(scorer);
}
TopDocs topDocs() {
return topDocsCollector.topDocs();
}
}
final static class MatchAndScore extends QueryCollector {
private final List<Text> matches = new ArrayList<Text>();
// TODO: Use thread local in order to cache the scores lists?
private final TFloatArrayList scores = new TFloatArrayList();
private final boolean limit;
private final int size;
private long counter = 0;
private Scorer scorer;
MatchAndScore(ESLogger logger, PercolatorService.PercolateContext context) {
super(logger, context);
this.limit = context.limit;
this.size = context.size;
}
@Override
public void collect(int doc) throws IOException {
BytesRef uid = values.getValue(doc);
if (uid == null) {
return;
}
Text id = new BytesText(Uid.idFromUid(uid));
Query query = queries.get(id);
if (query == null) {
// log???
return;
}
// run the query
try {
collector.reset();
searcher.search(query, collector);
if (collector.exists()) {
if (!limit || counter < size) {
matches.add(id);
scores.add(scorer.score());
}
counter++;
}
} catch (IOException e) {
logger.warn("[" + id + "] failed to execute query", e);
}
}
@Override
public void setScorer(Scorer scorer) throws IOException {
this.scorer = scorer;
}
long counter() {
return counter;
}
List<Text> matches() {
return matches;
}
TFloatArrayList scores() {
return scores;
}
}
final static class Count extends QueryCollector {
private long counter = 0;
Count(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData) {
super(logger, queries, searcher, fieldData);
Count(ESLogger logger, PercolatorService.PercolateContext context) {
super(logger, context);
}
@Override

View File

@ -39,9 +39,12 @@ import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.script.ScriptScoreFunctionBuilder;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;
import java.util.*;
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
@ -1026,6 +1029,7 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
assertThat(response.getMatches(), emptyArray());
}
@Test
public void testPercolateSizingWithQueryAndFilter() throws Exception {
client().admin().indices().prepareCreate("test").execute().actionGet();
ensureGreen();
@ -1119,6 +1123,103 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
}
}
@Test
public void testPercolateScoreAndSorting() throws Exception {
client().admin().indices().prepareCreate("my-index")
.setSettings(ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
.build())
.execute().actionGet();
ensureGreen();
// Add a dummy doc, that shouldn't never interfere with percolate operations.
client().prepareIndex("my-index", "my-type", "1").setSource("field", "value").execute().actionGet();
Map<Integer, NavigableSet<Integer>> controlMap = new HashMap<Integer, NavigableSet<Integer>>();
long numQueries = randomIntBetween(100, 250);
logger.info("--> register " + numQueries +" queries");
for (int i = 0; i < numQueries; i++) {
int value = randomInt(10);
client().prepareIndex("my-index", "_percolator", Integer.toString(i))
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).field("level", i).field("field1", value).endObject())
.execute().actionGet();
if (!controlMap.containsKey(value)) {
controlMap.put(value, new TreeSet<Integer>());
}
controlMap.get(value).add(i);
}
refresh();
// Only retrieve the score
int runs = randomInt(27);
for (int i = 0; i < runs; i++) {
int size = randomIntBetween(1, 50);
PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type")
.setScore(true)
.setSize(size)
.setPercolateDoc(docBuilder().setDoc("field", "value"))
.setPercolateQuery(QueryBuilders.functionScoreQuery(matchAllQuery()).add(new ScriptScoreFunctionBuilder().script("doc['level'].value")))
.execute().actionGet();
assertNoFailures(response);
assertThat(response.getCount(), equalTo(numQueries));
assertThat(response.getMatches().length, equalTo(size));
for (int j = 0; j < response.getMatches().length; j++) {
String id = response.getMatches()[j].getId().string();
assertThat(Integer.valueOf(id), equalTo((int) response.getMatches()[j].getScore()));
}
}
// Sort the queries by the score
runs = randomInt(27);
for (int i = 0; i < runs; i++) {
int size = randomIntBetween(1, 10);
PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type")
.setSort(true)
.setSize(size)
.setPercolateDoc(docBuilder().setDoc("field", "value"))
.setPercolateQuery(QueryBuilders.functionScoreQuery(matchAllQuery()).add(new ScriptScoreFunctionBuilder().script("doc['level'].value")))
.execute().actionGet();
assertNoFailures(response);
assertThat(response.getCount(), equalTo(numQueries));
assertThat(response.getMatches().length, equalTo(size));
int expectedId = (int) (numQueries - 1);
for (PercolateResponse.Match match : response) {
assertThat(match.getId().string(), equalTo(Integer.toString(expectedId)));
assertThat(match.getScore(), equalTo((float) expectedId));
assertThat(match.getIndex().string(), equalTo("my-index"));
expectedId--;
}
}
runs = randomInt(27);
for (int i = 0; i < runs; i++) {
int value = randomInt(10);
NavigableSet<Integer> levels = controlMap.get(value);
int size = randomIntBetween(1, levels.size());
PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type")
.setSort(true)
.setSize(size)
.setPercolateDoc(docBuilder().setDoc("field", "value"))
.setPercolateQuery(QueryBuilders.functionScoreQuery(matchQuery("field1", value))
.add(new ScriptScoreFunctionBuilder().script("doc['level'].value")))
.execute().actionGet();
assertNoFailures(response);
assertThat(response.getCount(), equalTo((long) levels.size()));
assertThat(response.getMatches().length, equalTo(Math.min(levels.size(), size)));
Iterator<Integer> levelIterator = levels.descendingIterator();
for (PercolateResponse.Match match : response) {
int controlLevel = levelIterator.next();
assertThat(match.getId().string(), equalTo(Integer.toString(controlLevel)));
assertThat(match.getScore(), equalTo((float) controlLevel));
assertThat(match.getIndex().string(), equalTo("my-index"));
}
}
}
public static String[] convertFromTextArray(PercolateResponse.Match[] matches, String index) {
if (matches.length == 0) {
return Strings.EMPTY_ARRAY;