diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java b/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java index 83d15cc9a3d..495d8785e1d 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java @@ -160,6 +160,14 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder values = new HashMap(2); + values.put(field, value); + setDoc(values); + return this; + } + public DocBuilder setDoc(String doc) { this.doc = new BytesArray(doc); return this; diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java index f46ed0e5b70..9828b480982 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java @@ -159,6 +159,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction< for (PercolateShardResponse shardResponse : shardResults) { finalCount += shardResponse.count(); } + return new PercolateResponse( shardsResponses.length(), successfulShards, failedShards, shardFailures, finalCount, tookInMillis ); @@ -168,19 +169,29 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction< finalCount += response.count(); } - // Serializing more than Integer.MAX_VALUE seems insane to me... - int size = (int) finalCount; + int requestedSize = shardResults.get(0).requestedSize(); + boolean limit = shardResults.get(0).limit(); + if (limit) { + requestedSize = (int) Math.min(requestedSize, finalCount); + } else { + // Serializing more than Integer.MAX_VALUE seems insane to me... + requestedSize = (int) finalCount; + } + // Use a custom impl of AbstractBigArray for Object[]? - List finalMatches = new ArrayList(size); - for (PercolateShardResponse response : shardResults) { + List finalMatches = new ArrayList(requestedSize); + outer: for (PercolateShardResponse response : shardResults) { Text index = new StringText(response.getIndex()); for (Text id : response.matches()) { finalMatches.add(new PercolateResponse.Match(id, index)); + if (requestedSize != 0 && finalMatches.size() == requestedSize) { + break outer; + } } } return new PercolateResponse( - shardsResponses.length(), successfulShards, failedShards, shardFailures, finalMatches.toArray(new PercolateResponse.Match[size]), finalCount, tookInMillis + shardsResponses.length(), successfulShards, failedShards, shardFailures, finalMatches.toArray(new PercolateResponse.Match[requestedSize]), finalCount, tookInMillis ); } } diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index 3336e8aa3bd..eeabdc7aece 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -31,13 +31,11 @@ import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.action.percolate.PercolateShardRequest; import org.elasticsearch.action.percolate.PercolateShardResponse; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.XConstantScoreQuery; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -85,43 +83,52 @@ public class PercolatorService extends AbstractComponent { } public PercolateShardResponse matchPercolate(final PercolateShardRequest request) { - return innerPercolate(request, new PercolateAction() { + return preparePercolate(request, new PercolateAction() { @Override public PercolateShardResponse doPercolateAction(PercolateContext context) { - List matches = new ArrayList(); + final List matches; + long count = 0; if (context.query == null) { + matches = new ArrayList(); Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); for (Map.Entry entry : context.percolateQueries.entrySet()) { collector.reset(); try { - context.searcher.search(entry.getValue(), collector); + context.docSearcher.search(entry.getValue(), collector); } catch (IOException e) { logger.warn("[" + entry.getKey() + "] failed to execute query", e); } if (collector.exists()) { - matches.add(entry.getKey()); + if (!context.limit) { + matches.add(entry.getKey()); + } else if (count < context.size) { + matches.add(entry.getKey()); + } + count++; } } } else { Engine.Searcher percolatorSearcher = context.indexShard.searcher(); try { - percolatorSearcher.searcher().search( - context.query, match(logger, context.percolateQueries, context.searcher, context.fieldDataService, matches) - ); + Match match = match(logger, context.percolateQueries, context.docSearcher, context.fieldDataService, context); + percolatorSearcher.searcher().search(context.query, match); + matches = match.matches(); + count = match.counter(); } catch (IOException e) { - logger.warn("failed to execute", e); + logger.debug("failed to execute", e); + throw new PercolateException(context.indexShard.shardId(), "failed to execute", e); } finally { percolatorSearcher.release(); } } - return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), request.index(), request.shardId()); + return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), count, context, request.index(), request.shardId()); } }); } public PercolateShardResponse countPercolate(final PercolateShardRequest request) { - return innerPercolate(request, new PercolateAction() { + return preparePercolate(request, new PercolateAction() { @Override public PercolateShardResponse doPercolateAction(PercolateContext context) { long count = 0; @@ -130,7 +137,7 @@ public class PercolatorService extends AbstractComponent { for (Map.Entry entry : context.percolateQueries.entrySet()) { collector.reset(); try { - context.searcher.search(entry.getValue(), collector); + context.docSearcher.search(entry.getValue(), collector); } catch (IOException e) { logger.warn("[" + entry.getKey() + "] failed to execute query", e); } @@ -142,7 +149,7 @@ public class PercolatorService extends AbstractComponent { } else { Engine.Searcher percolatorSearcher = context.indexShard.searcher(); try { - Count countCollector = count(logger, context.percolateQueries, context.searcher, context.fieldDataService); + Count countCollector = count(logger, context.percolateQueries, context.docSearcher, context.fieldDataService); percolatorSearcher.searcher().search(context.query, countCollector); count = countCollector.counter(); } catch (IOException e) { @@ -151,12 +158,12 @@ public class PercolatorService extends AbstractComponent { percolatorSearcher.release(); } } - return new PercolateShardResponse(count, request.index(), request.shardId()); + return new PercolateShardResponse(count, context, request.index(), request.shardId()); } }); } - private PercolateShardResponse innerPercolate(PercolateShardRequest request, PercolateAction action) { + private PercolateShardResponse preparePercolate(PercolateShardRequest request, PercolateAction action) { IndexService percolateIndexService = indicesService.indexServiceSafe(request.index()); IndexShard indexShard = percolateIndexService.shardSafe(request.shardId()); @@ -164,27 +171,30 @@ public class PercolatorService extends AbstractComponent { shardPercolateService.prePercolate(); long startTime = System.nanoTime(); try { - final PercolateContext context = new PercolateContext(); - context.indexShard = indexShard; - context.percolateQueries = indexShard.percolateRegistry().percolateQueries(); - if (context.percolateQueries.isEmpty()) { - return new PercolateShardResponse(StringText.EMPTY_ARRAY, request.index(), request.shardId()); + ConcurrentMap percolateQueries = indexShard.percolateRegistry().percolateQueries(); + if (percolateQueries.isEmpty()) { + return new PercolateShardResponse(request.index(), request.shardId()); } - ParsedDocument parsedDocument; + final PercolateContext context = new PercolateContext(); + context.percolateQueries = percolateQueries; + context.indexShard = indexShard; + ParsedDocument parsedDocument = parsePercolate(percolateIndexService, request, context); if (request.docSource() != null && request.docSource().length() != 0) { parsedDocument = parseFetchedDoc(request.docSource(), percolateIndexService, request.documentType()); - context.query = parseQueryOrFilter(percolateIndexService, request.source()); - } else { - Tuple parseResult = parsePercolate(percolateIndexService, request.documentType(), request.source()); - parsedDocument = parseResult.v1(); - context.query = parseResult.v2(); + } else if (parsedDocument == null) { + throw new ElasticSearchParseException("No doc to percolate in the request"); + } + + if (context.size < 0) { + context.size = 0; } // first, parse the source doc into a MemoryIndex final MemoryIndex memoryIndex = cache.get(); try { // TODO: This means percolation does not support nested docs... + // So look into: ByteBufferDirectory for (IndexableField field : parsedDocument.rootDoc().getFields()) { if (!field.fieldType().indexed()) { continue; @@ -204,15 +214,15 @@ public class PercolatorService extends AbstractComponent { } } - context.searcher = memoryIndex.createSearcher(); + context.docSearcher = memoryIndex.createSearcher(); context.fieldDataService = percolateIndexService.fieldData(); IndexCache indexCache = percolateIndexService.cache(); try { return action.doPercolateAction(context); } finally { // explicitly clear the reader, since we can only register on callback on SegmentReader - indexCache.clear(context.searcher.getIndexReader()); - context.fieldDataService.clear(context.searcher.getIndexReader()); + indexCache.clear(context.docSearcher.getIndexReader()); + context.fieldDataService.clear(context.docSearcher.getIndexReader()); } } finally { memoryIndex.reset(); @@ -222,8 +232,12 @@ public class PercolatorService extends AbstractComponent { } } - private Tuple parsePercolate(IndexService documentIndexService, String type, BytesReference source) throws ElasticSearchException { - Query query = null; + private ParsedDocument parsePercolate(IndexService documentIndexService, PercolateShardRequest request, PercolateContext context) throws ElasticSearchException { + BytesReference source = request.source(); + if (source == null || source.length() == 0) { + return null; + } + ParsedDocument doc = null; XContentParser parser = null; try { @@ -241,21 +255,29 @@ public class PercolatorService extends AbstractComponent { } MapperService mapperService = documentIndexService.mapperService(); - DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type); - doc = docMapper.parse(source(parser).type(type).flyweight(true)); + DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType()); + doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true)); } } else if (token == XContentParser.Token.START_OBJECT) { if ("query".equals(currentFieldName)) { - if (query != null) { + if (context.query != null) { throw new ElasticSearchParseException("Either specify query or filter, not both"); } - query = documentIndexService.queryParserService().parse(parser).query(); + context.query = documentIndexService.queryParserService().parse(parser).query(); } else if ("filter".equals(currentFieldName)) { - if (query != null) { + if (context.query != null) { throw new ElasticSearchParseException("Either specify query or filter, not both"); } Filter filter = documentIndexService.queryParserService().parseInnerFilter(parser).filter(); - query = new XConstantScoreQuery(filter); + context.query = new XConstantScoreQuery(filter); + } + } else if (token.isValue()) { + if ("size".equals(currentFieldName)) { + context.limit = true; + context.size = parser.intValue(); + if (context.size < 0) { + throw new ElasticSearchParseException("size is set to [" + context.size + "] and is expected to be higher or equal to 0"); + } } } else if (token == null) { break; @@ -269,11 +291,7 @@ public class PercolatorService extends AbstractComponent { } } - if (doc == null) { - throw new ElasticSearchParseException("No doc to percolate in the request"); - } - - return new Tuple(doc, query); + return doc; } private ParsedDocument parseFetchedDoc(BytesReference fetchedDoc, IndexService documentIndexService, String type) { @@ -299,48 +317,6 @@ public class PercolatorService extends AbstractComponent { return doc; } - private Query parseQueryOrFilter(IndexService documentIndexService, BytesReference source) { - if (source == null || source.length() == 0) { - return null; - } - - Query query = null; - XContentParser parser = null; - try { - parser = XContentFactory.xContent(source).createParser(source); - String currentFieldName = null; - XContentParser.Token token; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if ("query".equals(currentFieldName)) { - if (query != null) { - throw new ElasticSearchParseException("Either specify query or filter, not both"); - } - query = documentIndexService.queryParserService().parse(parser).query(); - } else if ("filter".equals(currentFieldName)) { - if (query != null) { - throw new ElasticSearchParseException("Either specify query or filter, not both"); - } - Filter filter = documentIndexService.queryParserService().parseInnerFilter(parser).filter(); - query = new XConstantScoreQuery(filter); - } - } else if (token == null) { - break; - } - } - } catch (IOException e) { - throw new ElasticSearchParseException("failed to parse request", e); - } finally { - if (parser != null) { - parser.close(); - } - } - - return query; - } - public void close() { cache.close(); } @@ -351,11 +327,14 @@ public class PercolatorService extends AbstractComponent { } - class PercolateContext { + public class PercolateContext { + + public boolean limit; + public int size; Query query; ConcurrentMap percolateQueries; - IndexSearcher searcher; + IndexSearcher docSearcher; IndexShard indexShard; IndexFieldDataService fieldDataService; diff --git a/src/main/java/org/elasticsearch/percolator/QueryCollector.java b/src/main/java/org/elasticsearch/percolator/QueryCollector.java index a6ebefbd3c7..034d72606ab 100644 --- a/src/main/java/org/elasticsearch/percolator/QueryCollector.java +++ b/src/main/java/org/elasticsearch/percolator/QueryCollector.java @@ -20,6 +20,7 @@ import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -60,8 +61,8 @@ abstract class QueryCollector extends Collector { } - static Match match(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData, List matches) { - return new Match(logger, queries, searcher, fieldData, matches); + static Match match(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData, PercolatorService.PercolateContext context) { + return new Match(logger, queries, searcher, fieldData, context); } static Count count(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData) { @@ -70,11 +71,15 @@ abstract class QueryCollector extends Collector { final static class Match extends QueryCollector { - private final List matches; + private final List matches = new ArrayList(); + private final boolean limit; + private final int size; + private long counter = 0; - Match(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData, List matches) { + Match(ESLogger logger, ConcurrentMap queries, IndexSearcher searcher, IndexFieldDataService fieldData, PercolatorService.PercolateContext context) { super(logger, queries, searcher, fieldData); - this.matches = matches; + this.limit = context.limit; + this.size = context.size; } @Override @@ -94,13 +99,26 @@ abstract class QueryCollector extends Collector { collector.reset(); searcher.search(query, collector); if (collector.exists()) { - matches.add(id); + if (!limit) { + matches.add(id); + } else if (counter < size) { + matches.add(id); + } + counter++; } } catch (IOException e) { logger.warn("[" + id + "] failed to execute query", e); } } + long counter() { + return counter; + } + + List matches() { + return matches; + } + } final static class Count extends QueryCollector { diff --git a/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java b/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java index 2349b56a40d..9d9177c3d91 100644 --- a/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java +++ b/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.percolate.PercolateResponse; +import org.elasticsearch.action.percolate.PercolateSourceBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IgnoreIndices; import org.elasticsearch.client.Client; @@ -46,6 +47,7 @@ import org.junit.Test; import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.*; +import static org.elasticsearch.index.query.FilterBuilders.termFilter; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.*; @@ -466,10 +468,12 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { .setRefresh(true) .execute().actionGet(); + PercolateSourceBuilder sourceBuilder = new PercolateSourceBuilder() + .setDoc(docBuilder().setDoc(jsonBuilder().startObject().startObject("type1").field("field1", "value2").endObject().endObject())) + .setQueryBuilder(termQuery("color", "red")); percolate = client().preparePercolate() .setIndices("test").setDocumentType("type1") - .setSource(jsonBuilder().startObject().startObject("doc").startObject("type1").field("field1", "value2").endObject().endObject() - .field("query", termQuery("color", "red")).endObject()) + .setSource(sourceBuilder) .execute().actionGet(); assertThat(percolate.getMatches(), arrayWithSize(1)); assertThat(convertFromTextArray(percolate.getMatches(), "test"), arrayContaining("susu")); @@ -1077,6 +1081,99 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { } } + public void testPercolateSizingWithQueryAndFilter() throws Exception { + client().admin().indices().prepareCreate("test").execute().actionGet(); + ensureGreen(); + + int numLevels = randomIntBetween(1, 25); + long numQueriesPerLevel = randomIntBetween(10, 250); + long totalQueries = numLevels * numQueriesPerLevel; + logger.info("--> register " + totalQueries +" queries"); + for (int level = 1; level <= numLevels; level++) { + for (int query = 1; query <= numQueriesPerLevel; query++) { + client().prepareIndex("my-index", "_percolator", level + "-" + query) + .setSource(jsonBuilder().startObject().field("query", matchAllQuery()).field("level", level).endObject()) + .execute().actionGet(); + } + } + + boolean onlyCount = randomBoolean(); + PercolateResponse response = client().preparePercolate() + .setIndices("my-index").setDocumentType("my-type") + .setOnlyCount(onlyCount) + .setPercolateDoc(docBuilder().setDoc("field", "value")) + .execute().actionGet(); + assertNoFailures(response); + assertThat(response.getCount(), equalTo(totalQueries)); + if (!onlyCount) { + assertThat(response.getMatches().length, equalTo((int) totalQueries)); + } + + int size = randomIntBetween(0, (int) totalQueries - 1); + response = client().preparePercolate() + .setIndices("my-index").setDocumentType("my-type") + .setOnlyCount(onlyCount) + .setPercolateDoc(docBuilder().setDoc("field", "value")) + .setSize(size) + .execute().actionGet(); + assertNoFailures(response); + assertThat(response.getCount(), equalTo(totalQueries)); + if (!onlyCount) { + assertThat(response.getMatches().length, equalTo(size)); + } + + // The query / filter capabilities are NOT in realtime + client().admin().indices().prepareRefresh("my-index").execute().actionGet(); + + int runs = randomIntBetween(3, 16); + for (int i = 0; i < runs; i++) { + onlyCount = randomBoolean(); + response = client().preparePercolate() + .setIndices("my-index").setDocumentType("my-type") + .setOnlyCount(onlyCount) + .setPercolateDoc(docBuilder().setDoc("field", "value")) + .setPercolateQuery(termQuery("level", 1 + randomInt(numLevels - 1))) + .execute().actionGet(); + assertNoFailures(response); + assertThat(response.getCount(), equalTo(numQueriesPerLevel)); + if (!onlyCount) { + assertThat(response.getMatches().length, equalTo((int) numQueriesPerLevel)); + } + } + + for (int i = 0; i < runs; i++) { + onlyCount = randomBoolean(); + response = client().preparePercolate() + .setIndices("my-index").setDocumentType("my-type") + .setOnlyCount(onlyCount) + .setPercolateDoc(docBuilder().setDoc("field", "value")) + .setPercolateFilter(termFilter("level", 1 + randomInt(numLevels - 1))) + .execute().actionGet(); + assertNoFailures(response); + assertThat(response.getCount(), equalTo(numQueriesPerLevel)); + if (!onlyCount) { + assertThat(response.getMatches().length, equalTo((int) numQueriesPerLevel)); + } + } + + for (int i = 0; i < runs; i++) { + onlyCount = randomBoolean(); + size = randomIntBetween(0, (int) numQueriesPerLevel - 1); + response = client().preparePercolate() + .setIndices("my-index").setDocumentType("my-type") + .setOnlyCount(onlyCount) + .setSize(size) + .setPercolateDoc(docBuilder().setDoc("field", "value")) + .setPercolateFilter(termFilter("level", 1 + randomInt(numLevels - 1))) + .execute().actionGet(); + assertNoFailures(response); + assertThat(response.getCount(), equalTo(numQueriesPerLevel)); + if (!onlyCount) { + assertThat(response.getMatches().length, equalTo(size)); + } + } + } + public static String[] convertFromTextArray(PercolateResponse.Match[] matches, String index) { if (matches.length == 0) { return Strings.EMPTY_ARRAY;