diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java b/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java index b6cc08f13fe..8f12c9917d3 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java @@ -95,6 +95,24 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder shardFailures, Match[] matches, long count, long tookInMillis) { + private boolean hasScores; + + public PercolateResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, + Match[] matches, long count, long tookInMillis, boolean hasScores) { super(totalShards, successfulShards, failedShards, shardFailures); this.tookInMillis = tookInMillis; this.matches = matches; this.count = count; + this.hasScores = hasScores; } public PercolateResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, long count, long tookInMillis) { @@ -58,11 +63,14 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite this.tookInMillis = tookInMillis; this.matches = EMPTY; this.count = count; + this.hasScores = false; } public PercolateResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, long tookInMillis) { super(totalShards, successfulShards, failedShards, shardFailures); this.tookInMillis = tookInMillis; + this.matches = EMPTY; + this.hasScores = false; } PercolateResponse() { @@ -104,23 +112,7 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite builder.startObject(); builder.field(Fields.TOOK, tookInMillis); - builder.startObject(Fields._SHARDS); - builder.field(Fields.TOTAL, getTotalShards()); - builder.field(Fields.SUCCESSFUL, getSuccessfulShards()); - builder.field(Fields.FAILED, getFailedShards()); - if (getShardFailures().length > 0) { - builder.startArray(Fields.FAILURES); - for (ShardOperationFailedException shardFailure : getShardFailures()) { - builder.startObject(); - builder.field(Fields.INDEX, shardFailure.index()); - builder.field(Fields.SHARD, shardFailure.shardId()); - builder.field(Fields.STATUS, shardFailure.status().getStatus()); - builder.field(Fields.REASON, shardFailure.reason()); - builder.endObject(); - } - builder.endArray(); - } - builder.endObject(); + RestActions.buildBroadcastShardsHeader(builder, this); builder.field(Fields.TOTAL, count); if (matches.length != 0) { @@ -135,6 +127,9 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite builder.startObject(); builder.field(Fields._INDEX, match.getIndex()); builder.field(Fields._ID, match.getId()); + if (hasScores) { + builder.field(Fields._SCORE, match.getScore()); + } builder.endObject(); } } @@ -156,6 +151,7 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite matches[i] = new Match(); matches[i].readFrom(in); } + hasScores = in.readBoolean(); } @Override @@ -167,64 +163,70 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite for (Match match : matches) { match.writeTo(out); } + out.writeBoolean(hasScores); } public static class Match implements Streamable { - private Text id; private Text index; + private Text id; + private float score; - public Match(Text id, Text index) { + public Match(Text index, Text id, float score) { this.id = id; + this.score = score; this.index = index; } Match() { } - public Text id() { - return id; - } - public Text index() { return index; } - public Text getId() { + public Text id() { return id; } + public float score() { + return score; + } + public Text getIndex() { - return index; + return index(); + } + + public Text getId() { + return id(); + } + + public float getScore() { + return score(); } @Override public void readFrom(StreamInput in) throws IOException { id = in.readText(); index = in.readText(); + score = in.readFloat(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeText(id); out.writeText(index); + out.writeFloat(score); } } static final class Fields { - static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards"); - static final XContentBuilderString TOTAL = new XContentBuilderString("total"); - static final XContentBuilderString SUCCESSFUL = new XContentBuilderString("successful"); - static final XContentBuilderString FAILED = new XContentBuilderString("failed"); - static final XContentBuilderString FAILURES = new XContentBuilderString("failures"); - static final XContentBuilderString STATUS = new XContentBuilderString("status"); - static final XContentBuilderString INDEX = new XContentBuilderString("index"); - static final XContentBuilderString SHARD = new XContentBuilderString("shard"); - static final XContentBuilderString REASON = new XContentBuilderString("reason"); static final XContentBuilderString TOOK = new XContentBuilderString("took"); + static final XContentBuilderString TOTAL = new XContentBuilderString("total"); static final XContentBuilderString MATCHES = new XContentBuilderString("matches"); static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); static final XContentBuilderString _ID = new XContentBuilderString("_id"); + static final XContentBuilderString _SCORE = new XContentBuilderString("_score"); } } diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java b/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java index 73a817172b3..4a8d824b013 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.action.percolate; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; @@ -14,40 +33,69 @@ import java.io.IOException; public class PercolateShardResponse extends BroadcastShardOperationResponse { private long count; + private float[] scores; private Text[] matches; // Request fields: private boolean limit; private int requestedSize; + private boolean sort; + private boolean score; public PercolateShardResponse() { } + public PercolateShardResponse(Text[] matches, long count, float[] scores, PercolatorService.PercolateContext context, String index, int shardId) { + super(index, shardId); + this.matches = matches; + this.count = count; + this.scores = scores; + this.limit = context.limit; + this.requestedSize = context.size; + this.sort = context.sort; + this.score = context.score; + } + public PercolateShardResponse(Text[] matches, long count, PercolatorService.PercolateContext context, String index, int shardId) { super(index, shardId); this.matches = matches; + this.scores = new float[0]; this.count = count; this.limit = context.limit; this.requestedSize = context.size; + this.sort = context.sort; + this.score = context.score; } public PercolateShardResponse(long count, PercolatorService.PercolateContext context, String index, int shardId) { super(index, shardId); this.count = count; this.matches = StringText.EMPTY_ARRAY; + this.scores = new float[0]; this.limit = context.limit; this.requestedSize = context.size; + this.sort = context.sort; + this.score = context.score; } - public PercolateShardResponse(String index, int shardId) { + public PercolateShardResponse(PercolatorService.PercolateContext context, String index, int shardId) { super(index, shardId); this.matches = StringText.EMPTY_ARRAY; + this.scores = new float[0]; + this.limit = context.limit; + this.requestedSize = context.size; + this.sort = context.sort; + this.score = context.score; } public Text[] matches() { return matches; } + public float[] scores() { + return scores; + } + public long count() { return count; } @@ -60,14 +108,27 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { return requestedSize; } + public boolean sort() { + return sort; + } + + public boolean score() { + return score; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); count = in.readVLong(); matches = in.readTextArray(); + scores = new float[in.readVInt()]; + for (int i = 0; i < scores.length; i++) { + scores[i] = in.readFloat(); + } limit = in.readBoolean(); requestedSize = in.readVInt(); + sort = in.readBoolean(); } @Override @@ -75,8 +136,13 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { super.writeTo(out); out.writeVLong(count); out.writeTextArray(matches); + out.writeVLong(scores.length); + for (float score : scores) { + out.writeFloat(score); + } out.writeBoolean(limit); out.writeVLong(requestedSize); + out.writeBoolean(sort); } } diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateSourceBuilder.java b/src/main/java/org/elasticsearch/action/percolate/PercolateSourceBuilder.java index e21eb3638ec..66a13129f8e 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateSourceBuilder.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateSourceBuilder.java @@ -39,6 +39,8 @@ public class PercolateSourceBuilder implements ToXContent { private QueryBuilder queryBuilder; private FilterBuilder filterBuilder; private Integer size; + private Boolean sort; + private Boolean score; public DocBuilder percolateDocument() { if (docBuilder == null) { @@ -79,6 +81,16 @@ public class PercolateSourceBuilder implements ToXContent { return this; } + public PercolateSourceBuilder setSort(boolean sort) { + this.sort = sort; + return this; + } + + public PercolateSourceBuilder setScore(boolean score) { + this.score = score; + return this; + } + public BytesReference buildAsBytes(XContentType contentType) throws SearchSourceBuilderException { try { XContentBuilder builder = XContentFactory.contentBuilder(contentType); @@ -106,6 +118,12 @@ public class PercolateSourceBuilder implements ToXContent { if (size != null) { builder.field("size", size); } + if (sort != null) { + builder.field("sort", sort); + } + if (score != null) { + builder.field("score", score); + } builder.endObject(); return builder; } diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java index 9828b480982..d7307baf511 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java @@ -164,34 +164,83 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction< shardsResponses.length(), successfulShards, failedShards, shardFailures, finalCount, tookInMillis ); } else { - long finalCount = 0; + long foundMatches = 0; + int numMatches = 0; for (PercolateShardResponse response : shardResults) { - finalCount += response.count(); + foundMatches += response.count(); + numMatches += response.matches().length; } int requestedSize = shardResults.get(0).requestedSize(); boolean limit = shardResults.get(0).limit(); + boolean sort = shardResults.get(0).sort(); + boolean matchesScored = shardResults.get(0).score(); if (limit) { - requestedSize = (int) Math.min(requestedSize, finalCount); + requestedSize = Math.min(requestedSize, numMatches); } else { - // Serializing more than Integer.MAX_VALUE seems insane to me... - requestedSize = (int) finalCount; + requestedSize = numMatches; } // Use a custom impl of AbstractBigArray for Object[]? List finalMatches = new ArrayList(requestedSize); - outer: for (PercolateShardResponse response : shardResults) { - Text index = new StringText(response.getIndex()); - for (Text id : response.matches()) { - finalMatches.add(new PercolateResponse.Match(id, index)); - if (requestedSize != 0 && finalMatches.size() == requestedSize) { - break outer; + if (sort) { + if (shardResults.size() == 1) { + PercolateShardResponse response = shardResults.get(0); + Text index = new StringText(response.getIndex()); + for (int i = 0; i < response.matches().length; i++) { + float score = response.scores().length == 0 ? Float.NaN : response.scores()[i]; + finalMatches.add(new PercolateResponse.Match(index, response.matches()[i], score)); + } + } else { + int[] slots = new int[shardResults.size()]; + while (true) { + float lowestScore = Float.NEGATIVE_INFINITY; + int requestIndex = 0; + int itemIndex = 0; + for (int i = 0; i < shardResults.size(); i++) { + int scoreIndex = slots[i]; + float[] scores = shardResults.get(i).scores(); + if (scoreIndex >= scores.length) { + continue; + } + + float score = scores[scoreIndex]; + int cmp = Float.compare(lowestScore, score); + if (cmp < 0) { + requestIndex = i; + itemIndex = scoreIndex; + lowestScore = score; + } + } + slots[requestIndex]++; + + PercolateShardResponse shardResponse = shardResults.get(requestIndex); + Text index = new StringText(shardResponse.getIndex()); + Text match = shardResponse.matches()[itemIndex]; + float score = shardResponse.scores()[itemIndex]; + finalMatches.add(new PercolateResponse.Match(index, match, score)); + if (finalMatches.size() == requestedSize) { + break; + } + } + } + + } else { + outer: for (PercolateShardResponse response : shardResults) { + Text index = new StringText(response.getIndex()); + for (int i = 0; i < response.matches().length; i++) { + float score = response.scores().length == 0 ? 0f : response.scores()[i]; + finalMatches.add(new PercolateResponse.Match(index, response.matches()[i], score)); + if (requestedSize != 0 && finalMatches.size() == requestedSize) { + break outer; + } } } } return new PercolateResponse( - shardsResponses.length(), successfulShards, failedShards, shardFailures, finalMatches.toArray(new PercolateResponse.Match[requestedSize]), finalCount, tookInMillis + shardsResponses.length(), successfulShards, failedShards, shardFailures, + finalMatches.toArray(new PercolateResponse.Match[requestedSize]), foundMatches, tookInMillis, sort || matchesScored ); } } @@ -220,11 +269,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction< @Override protected PercolateShardResponse shardOperation(PercolateShardRequest request) throws ElasticSearchException { try { - if (request.onlyCount()) { - return percolatorService.countPercolate(request); - } else { - return percolatorService.matchPercolate(request); - } + return percolatorService.percolate(request); } catch (Throwable t) { logger.trace("[{}][{}] failed to percolate", t, request.index(), request.shardId()); ShardId shardId = new ShardId(request.index(), request.shardId()); diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index d00a1b280ab..235689de922 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -19,14 +19,16 @@ package org.elasticsearch.percolator; import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.memory.ExtendedMemoryIndex; import org.apache.lucene.index.memory.MemoryIndex; -import org.apache.lucene.search.Filter; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; +import org.apache.lucene.search.*; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.action.percolate.PercolateShardRequest; import org.elasticsearch.action.percolate.PercolateShardResponse; @@ -35,7 +37,9 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.XConstantScoreQuery; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.text.BytesText; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -43,15 +47,18 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.fielddata.BytesValues; +import org.elasticsearch.index.fielddata.FieldDataType; +import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldDataService; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchRequest; import java.io.IOException; import java.util.ArrayList; @@ -82,106 +89,32 @@ public class PercolatorService extends AbstractComponent { }; } - public PercolateShardResponse matchPercolate(final PercolateShardRequest request) { - return preparePercolate(request, new PercolateAction() { - @Override - public PercolateShardResponse doPercolateAction(PercolateContext context) { - final List matches; - long count = 0; - if (context.query == null) { - matches = new ArrayList(); - Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); - for (Map.Entry entry : context.percolateQueries.entrySet()) { - collector.reset(); - try { - context.docSearcher.search(entry.getValue(), collector); - } catch (IOException e) { - logger.warn("[" + entry.getKey() + "] failed to execute query", e); - } - - if (collector.exists()) { - if (!context.limit || count < context.size) { - matches.add(entry.getKey()); - } - count++; - } - } - } else { - Engine.Searcher percolatorSearcher = context.indexShard.searcher(); - try { - Match match = match(logger, context.percolateQueries, context.docSearcher, context.fieldDataService, context); - percolatorSearcher.searcher().search(context.query, match); - matches = match.matches(); - count = match.counter(); - } catch (IOException e) { - logger.debug("failed to execute", e); - throw new PercolateException(context.indexShard.shardId(), "failed to execute", e); - } finally { - percolatorSearcher.release(); - } - } - return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), count, context, request.index(), request.shardId()); - } - }); - } - - public PercolateShardResponse countPercolate(final PercolateShardRequest request) { - return preparePercolate(request, new PercolateAction() { - @Override - public PercolateShardResponse doPercolateAction(PercolateContext context) { - long count = 0; - if (context.query == null) { - Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); - for (Map.Entry entry : context.percolateQueries.entrySet()) { - collector.reset(); - try { - context.docSearcher.search(entry.getValue(), collector); - } catch (IOException e) { - logger.warn("[" + entry.getKey() + "] failed to execute query", e); - } - - if (collector.exists()) { - count++; - } - } - } else { - Engine.Searcher percolatorSearcher = context.indexShard.searcher(); - try { - Count countCollector = count(logger, context.percolateQueries, context.docSearcher, context.fieldDataService); - percolatorSearcher.searcher().search(context.query, countCollector); - count = countCollector.counter(); - } catch (IOException e) { - logger.warn("failed to execute", e); - } finally { - percolatorSearcher.release(); - } - } - return new PercolateShardResponse(count, context, request.index(), request.shardId()); - } - }); - } - - private PercolateShardResponse preparePercolate(PercolateShardRequest request, PercolateAction action) { + public PercolateShardResponse percolate(PercolateShardRequest request) { IndexService percolateIndexService = indicesService.indexServiceSafe(request.index()); IndexShard indexShard = percolateIndexService.shardSafe(request.shardId()); ShardPercolateService shardPercolateService = indexShard.shardPercolateService(); shardPercolateService.prePercolate(); long startTime = System.nanoTime(); + try { - ConcurrentMap percolateQueries = indexShard.percolateRegistry().percolateQueries(); - if (percolateQueries.isEmpty()) { - return new PercolateShardResponse(request.index(), request.shardId()); + final PercolateContext context = new PercolateContext(); + context.percolateQueries = indexShard.percolateRegistry().percolateQueries(); + context.indexShard = indexShard; + context.percolateIndexService = percolateIndexService; + ParsedDocument parsedDocument = parsePercolate(percolateIndexService, request, context); + if (context.percolateQueries.isEmpty()) { + return new PercolateShardResponse(context, request.index(), request.shardId()); } - final PercolateContext context = new PercolateContext(); - context.percolateQueries = percolateQueries; - context.indexShard = indexShard; - ParsedDocument parsedDocument = parsePercolate(percolateIndexService, request, context); if (request.docSource() != null && request.docSource().length() != 0) { parsedDocument = parseFetchedDoc(request.docSource(), percolateIndexService, request.documentType()); } else if (parsedDocument == null) { - throw new ElasticSearchParseException("No doc to percolate in the request"); + throw new ElasticSearchIllegalArgumentException("Nothing to percolate"); + } + + if (context.query == null && (context.score || context.sort)) { + throw new ElasticSearchIllegalArgumentException("Can't sort or score if no query is specified"); } if (context.size < 0) { @@ -212,15 +145,28 @@ public class PercolatorService extends AbstractComponent { } } + PercolatorType action; + if (request.onlyCount()) { + action = context.query != null ? queryCountPercolator : countPercolator; + } else { + if (context.sort) { + action = topMatchingPercolator; + } else if (context.query != null) { + action = context.score ? scoringPercolator : queryPercolator; + } else { + action = matchPercolator; + } + } + context.docSearcher = memoryIndex.createSearcher(); - context.fieldDataService = percolateIndexService.fieldData(); + context.fieldData = percolateIndexService.fieldData(); IndexCache indexCache = percolateIndexService.cache(); try { - return action.doPercolateAction(context); + return action.doPercolate(request, context); } finally { // explicitly clear the reader, since we can only register on callback on SegmentReader indexCache.clear(context.docSearcher.getIndexReader()); - context.fieldDataService.clear(context.docSearcher.getIndexReader()); + context.fieldData.clear(context.docSearcher.getIndexReader()); } } finally { memoryIndex.reset(); @@ -238,6 +184,12 @@ public class PercolatorService extends AbstractComponent { ParsedDocument doc = null; XContentParser parser = null; + + // Some queries (function_score query when for decay functions) rely on SearchContext being set: + SearchContext.setCurrent(new SearchContext(0, + new ShardSearchRequest().types(new String[0]), + null, context.indexShard.searcher(), context.percolateIndexService, context.indexShard, + null, null)); try { parser = XContentFactory.xContent(source).createParser(source); String currentFieldName = null; @@ -269,6 +221,8 @@ public class PercolatorService extends AbstractComponent { Filter filter = documentIndexService.queryParserService().parseInnerFilter(parser).filter(); context.query = new XConstantScoreQuery(filter); } + } else if (token == null) { + break; } else if (token.isValue()) { if ("size".equals(currentFieldName)) { context.limit = true; @@ -276,14 +230,18 @@ public class PercolatorService extends AbstractComponent { if (context.size < 0) { throw new ElasticSearchParseException("size is set to [" + context.size + "] and is expected to be higher or equal to 0"); } + } else if ("sort".equals(currentFieldName)) { + context.sort = parser.booleanValue(); + } else if ("score".equals(currentFieldName)) { + context.score = parser.booleanValue(); } - } else if (token == null) { - break; } } } catch (IOException e) { throw new ElasticSearchParseException("failed to parse request", e); } finally { + SearchContext.current().release(); + SearchContext.removeCurrent(); if (parser != null) { parser.close(); } @@ -319,22 +277,175 @@ public class PercolatorService extends AbstractComponent { cache.close(); } - interface PercolateAction { + interface PercolatorType { - PercolateShardResponse doPercolateAction(PercolateContext context); + PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context); } + private final PercolatorType countPercolator = new PercolatorType() { + + @Override + public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { + long count = 0; + Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); + for (Map.Entry entry : context.percolateQueries.entrySet()) { + collector.reset(); + try { + context.docSearcher.search(entry.getValue(), collector); + } catch (IOException e) { + logger.warn("[" + entry.getKey() + "] failed to execute query", e); + } + + if (collector.exists()) { + count++; + } + } + return new PercolateShardResponse(count, context, request.index(), request.shardId()); + } + + }; + + private final PercolatorType queryCountPercolator = new PercolatorType() { + + @Override + public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { + long count = 0; + Engine.Searcher percolatorSearcher = context.indexShard.searcher(); + try { + Count countCollector = count(logger, context); + queryBasedPercolating(percolatorSearcher, context, countCollector); + count = countCollector.counter(); + } catch (IOException e) { + logger.warn("failed to execute", e); + } finally { + percolatorSearcher.release(); + } + return new PercolateShardResponse(count, context, request.index(), request.shardId()); + } + + }; + + private final PercolatorType matchPercolator = new PercolatorType() { + @Override + public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { + long count = 0; + List matches = new ArrayList(); + Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); + + for (Map.Entry entry : context.percolateQueries.entrySet()) { + collector.reset(); + try { + context.docSearcher.search(entry.getValue(), collector); + } catch (IOException e) { + logger.warn("[" + entry.getKey() + "] failed to execute query", e); + } + + if (collector.exists()) { + if (!context.limit || count < context.size) { + matches.add(entry.getKey()); + } + count++; + } + } + return new PercolateShardResponse(matches.toArray(new Text[0]), count, context, request.index(), request.shardId()); + } + }; + + private final PercolatorType queryPercolator = new PercolatorType() { + @Override + public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { + Engine.Searcher percolatorSearcher = context.indexShard.searcher(); + try { + Match match = match(logger, context); + queryBasedPercolating(percolatorSearcher, context, match); + List matches = match.matches(); + long count = match.counter(); + return new PercolateShardResponse(matches.toArray(new Text[0]), count, context, request.index(), request.shardId()); + } catch (IOException e) { + logger.debug("failed to execute", e); + throw new PercolateException(context.indexShard.shardId(), "failed to execute", e); + } finally { + percolatorSearcher.release(); + } + } + }; + + private final PercolatorType scoringPercolator = new PercolatorType() { + @Override + public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { + Engine.Searcher percolatorSearcher = context.indexShard.searcher(); + try { + MatchAndScore matchAndScore = matchAndScore(logger, context); + queryBasedPercolating(percolatorSearcher, context, matchAndScore); + Text[] matches = matchAndScore.matches().toArray(new Text[0]); + float[] scores = matchAndScore.scores().toArray(); + long count = matchAndScore.counter(); + return new PercolateShardResponse(matches, count, scores, context, request.index(), request.shardId()); + } catch (IOException e) { + logger.debug("failed to execute", e); + throw new PercolateException(context.indexShard.shardId(), "failed to execute", e); + } finally { + percolatorSearcher.release(); + } + } + }; + + private final PercolatorType topMatchingPercolator = new PercolatorType() { + + @Override + public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { + Engine.Searcher percolatorSearcher = context.indexShard.searcher(); + try { + MatchAndSort matchAndSort = QueryCollector.matchAndSort(logger, context); + queryBasedPercolating(percolatorSearcher, context, matchAndSort); + TopDocs topDocs = matchAndSort.topDocs(); + long count = topDocs.totalHits; + List matches = new ArrayList(topDocs.scoreDocs.length); + float[] scores = new float[topDocs.scoreDocs.length]; + + IndexFieldData uidFieldData = context.fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes"))); + int i = 0; + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + int segmentIdx = ReaderUtil.subIndex(scoreDoc.doc, percolatorSearcher.reader().leaves()); + AtomicReaderContext atomicReaderContext = percolatorSearcher.reader().leaves().get(segmentIdx); + BytesValues values = uidFieldData.load(atomicReaderContext).getBytesValues(); + BytesRef uid = values.getValue(scoreDoc.doc - atomicReaderContext.docBase); + Text id = new BytesText(Uid.idFromUid(uid)); + matches.add(id); + scores[i++] = scoreDoc.score; + } + return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), count, scores, context, request.index(), request.shardId()); + } catch (Exception e) { + logger.debug("failed to execute", e); + throw new PercolateException(context.indexShard.shardId(), "failed to execute", e); + } finally { + percolatorSearcher.release(); + } + } + + }; + + private static void queryBasedPercolating(Engine.Searcher percolatorSearcher, PercolateContext context, Collector collector) throws IOException { + Filter percolatorTypeFilter = context.percolateIndexService.mapperService().documentMapper(Constants.TYPE_NAME).typeFilter(); + percolatorTypeFilter = context.percolateIndexService.cache().filter().cache(percolatorTypeFilter); + FilteredQuery query = new FilteredQuery(context.query, percolatorTypeFilter); + percolatorSearcher.searcher().search(query, collector); + } + public class PercolateContext { public boolean limit; public int size; + public boolean score; + public boolean sort; Query query; ConcurrentMap percolateQueries; IndexSearcher docSearcher; IndexShard indexShard; - IndexFieldDataService fieldDataService; + IndexFieldDataService fieldData; + IndexService percolateIndexService; } diff --git a/src/main/java/org/elasticsearch/percolator/QueryCollector.java b/src/main/java/org/elasticsearch/percolator/QueryCollector.java index f983e846433..89d36a35cc3 100644 --- a/src/main/java/org/elasticsearch/percolator/QueryCollector.java +++ b/src/main/java/org/elasticsearch/percolator/QueryCollector.java @@ -1,10 +1,8 @@ package org.elasticsearch.percolator; +import gnu.trove.list.array.TFloatArrayList; import org.apache.lucene.index.AtomicReaderContext; -import org.apache.lucene.search.Collector; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.*; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; @@ -14,7 +12,6 @@ import org.elasticsearch.common.text.Text; import org.elasticsearch.index.fielddata.BytesValues; import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.IndexFieldData; -import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.UidFieldMapper; @@ -37,12 +34,12 @@ abstract class QueryCollector extends Collector { BytesValues values; - QueryCollector(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData) { + QueryCollector(ESLogger logger, PercolatorService.PercolateContext context) { this.logger = logger; - this.queries = queries; - this.searcher = searcher; + this.queries = context.percolateQueries; + this.searcher = context.docSearcher; // TODO: when we move to a UID level mapping def on the index level, we can use that one, now, its per type, and we can't easily choose one - this.uidFieldData = fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes"))); + this.uidFieldData = context.fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes"))); } @Override @@ -61,12 +58,20 @@ abstract class QueryCollector extends Collector { } - static Match match(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData, PercolatorService.PercolateContext context) { - return new Match(logger, queries, searcher, fieldData, context); + static Match match(ESLogger logger, PercolatorService.PercolateContext context) { + return new Match(logger, context); } - static Count count(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData) { - return new Count(logger, queries, searcher, fieldData); + static Count count(ESLogger logger, PercolatorService.PercolateContext context) { + return new Count(logger, context); + } + + static MatchAndScore matchAndScore(ESLogger logger, PercolatorService.PercolateContext context) { + return new MatchAndScore(logger, context); + } + + static MatchAndSort matchAndSort(ESLogger logger, PercolatorService.PercolateContext context) { + return new MatchAndSort(logger, context); } final static class Match extends QueryCollector { @@ -76,8 +81,8 @@ abstract class QueryCollector extends Collector { private final int size; private long counter = 0; - Match(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData, PercolatorService.PercolateContext context) { - super(logger, queries, searcher, fieldData); + Match(ESLogger logger, PercolatorService.PercolateContext context) { + super(logger, context); this.limit = context.limit; this.size = context.size; } @@ -119,12 +124,126 @@ abstract class QueryCollector extends Collector { } + final static class MatchAndSort extends QueryCollector { + + private final TopScoreDocCollector topDocsCollector; + + MatchAndSort(ESLogger logger, PercolatorService.PercolateContext context) { + super(logger, context); + // TODO: Use TopFieldCollector.create(...) for ascending and decending scoring? + topDocsCollector = TopScoreDocCollector.create(context.size, false); + } + + @Override + public void collect(int doc) throws IOException { + BytesRef uid = values.getValue(doc); + if (uid == null) { + return; + } + Text id = new BytesText(Uid.idFromUid(uid)); + Query query = queries.get(id); + if (query == null) { + // log??? + return; + } + // run the query + try { + collector.reset(); + searcher.search(query, collector); + if (collector.exists()) { + topDocsCollector.collect(doc); + } + } catch (IOException e) { + logger.warn("[" + id + "] failed to execute query", e); + } + } + + @Override + public void setNextReader(AtomicReaderContext context) throws IOException { + super.setNextReader(context); + topDocsCollector.setNextReader(context); + } + + @Override + public void setScorer(Scorer scorer) throws IOException { + topDocsCollector.setScorer(scorer); + } + + TopDocs topDocs() { + return topDocsCollector.topDocs(); + } + + } + + final static class MatchAndScore extends QueryCollector { + + private final List matches = new ArrayList(); + // TODO: Use thread local in order to cache the scores lists? + private final TFloatArrayList scores = new TFloatArrayList(); + private final boolean limit; + private final int size; + private long counter = 0; + + private Scorer scorer; + + MatchAndScore(ESLogger logger, PercolatorService.PercolateContext context) { + super(logger, context); + this.limit = context.limit; + this.size = context.size; + } + + @Override + public void collect(int doc) throws IOException { + BytesRef uid = values.getValue(doc); + if (uid == null) { + return; + } + Text id = new BytesText(Uid.idFromUid(uid)); + Query query = queries.get(id); + if (query == null) { + // log??? + return; + } + // run the query + try { + collector.reset(); + searcher.search(query, collector); + if (collector.exists()) { + if (!limit || counter < size) { + matches.add(id); + scores.add(scorer.score()); + } + counter++; + } + } catch (IOException e) { + logger.warn("[" + id + "] failed to execute query", e); + } + } + + @Override + public void setScorer(Scorer scorer) throws IOException { + this.scorer = scorer; + } + + long counter() { + return counter; + } + + List matches() { + return matches; + } + + TFloatArrayList scores() { + return scores; + } + } + final static class Count extends QueryCollector { private long counter = 0; - Count(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData) { - super(logger, queries, searcher, fieldData); + Count(ESLogger logger, PercolatorService.PercolateContext context) { + super(logger, context); } @Override diff --git a/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java b/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java index 0d69879c0be..1a2c0ef1260 100644 --- a/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java +++ b/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java @@ -39,9 +39,12 @@ import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.functionscore.script.ScriptScoreFunctionBuilder; import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.junit.Test; +import java.util.*; + import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.*; @@ -1026,6 +1029,7 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { assertThat(response.getMatches(), emptyArray()); } + @Test public void testPercolateSizingWithQueryAndFilter() throws Exception { client().admin().indices().prepareCreate("test").execute().actionGet(); ensureGreen(); @@ -1119,6 +1123,103 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { } } + @Test + public void testPercolateScoreAndSorting() throws Exception { + client().admin().indices().prepareCreate("my-index") + .setSettings(ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 3) + .put("index.number_of_replicas", 1) + .build()) + .execute().actionGet(); + ensureGreen(); + + // Add a dummy doc, that shouldn't never interfere with percolate operations. + client().prepareIndex("my-index", "my-type", "1").setSource("field", "value").execute().actionGet(); + + Map> controlMap = new HashMap>(); + long numQueries = randomIntBetween(100, 250); + logger.info("--> register " + numQueries +" queries"); + for (int i = 0; i < numQueries; i++) { + int value = randomInt(10); + client().prepareIndex("my-index", "_percolator", Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("query", matchAllQuery()).field("level", i).field("field1", value).endObject()) + .execute().actionGet(); + if (!controlMap.containsKey(value)) { + controlMap.put(value, new TreeSet()); + } + controlMap.get(value).add(i); + } + refresh(); + + // Only retrieve the score + int runs = randomInt(27); + for (int i = 0; i < runs; i++) { + int size = randomIntBetween(1, 50); + PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type") + .setScore(true) + .setSize(size) + .setPercolateDoc(docBuilder().setDoc("field", "value")) + .setPercolateQuery(QueryBuilders.functionScoreQuery(matchAllQuery()).add(new ScriptScoreFunctionBuilder().script("doc['level'].value"))) + .execute().actionGet(); + assertNoFailures(response); + assertThat(response.getCount(), equalTo(numQueries)); + assertThat(response.getMatches().length, equalTo(size)); + for (int j = 0; j < response.getMatches().length; j++) { + String id = response.getMatches()[j].getId().string(); + assertThat(Integer.valueOf(id), equalTo((int) response.getMatches()[j].getScore())); + } + } + + // Sort the queries by the score + runs = randomInt(27); + for (int i = 0; i < runs; i++) { + int size = randomIntBetween(1, 10); + PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type") + .setSort(true) + .setSize(size) + .setPercolateDoc(docBuilder().setDoc("field", "value")) + .setPercolateQuery(QueryBuilders.functionScoreQuery(matchAllQuery()).add(new ScriptScoreFunctionBuilder().script("doc['level'].value"))) + .execute().actionGet(); + assertNoFailures(response); + assertThat(response.getCount(), equalTo(numQueries)); + assertThat(response.getMatches().length, equalTo(size)); + + int expectedId = (int) (numQueries - 1); + for (PercolateResponse.Match match : response) { + assertThat(match.getId().string(), equalTo(Integer.toString(expectedId))); + assertThat(match.getScore(), equalTo((float) expectedId)); + assertThat(match.getIndex().string(), equalTo("my-index")); + expectedId--; + } + } + + + runs = randomInt(27); + for (int i = 0; i < runs; i++) { + int value = randomInt(10); + NavigableSet levels = controlMap.get(value); + int size = randomIntBetween(1, levels.size()); + PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type") + .setSort(true) + .setSize(size) + .setPercolateDoc(docBuilder().setDoc("field", "value")) + .setPercolateQuery(QueryBuilders.functionScoreQuery(matchQuery("field1", value)) + .add(new ScriptScoreFunctionBuilder().script("doc['level'].value"))) + .execute().actionGet(); + assertNoFailures(response); + + assertThat(response.getCount(), equalTo((long) levels.size())); + assertThat(response.getMatches().length, equalTo(Math.min(levels.size(), size))); + Iterator levelIterator = levels.descendingIterator(); + for (PercolateResponse.Match match : response) { + int controlLevel = levelIterator.next(); + assertThat(match.getId().string(), equalTo(Integer.toString(controlLevel))); + assertThat(match.getScore(), equalTo((float) controlLevel)); + assertThat(match.getIndex().string(), equalTo("my-index")); + } + } + } + public static String[] convertFromTextArray(PercolateResponse.Match[] matches, String index) { if (matches.length == 0) { return Strings.EMPTY_ARRAY;