mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-16 09:54:55 +00:00
Added size
option to percolate api
The `size` option in the percolate api will limit the number of matches being returned: ```bash curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{ "size" : 10, "doc" : {...} }' ``` In the above request no more than 10 matches will be returned. The `count` field will still return the total number of matches the document matched with. The `size` option is not applicable for the count percolate api. Closes #3440
This commit is contained in:
parent
662bb80d6b
commit
12c7eeb262
@ -160,6 +160,14 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<Pe
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Limits the maximum number of percolate query matches to be returned.
|
||||||
|
*/
|
||||||
|
public PercolateRequestBuilder setSize(int size) {
|
||||||
|
sourceBuilder().setSize(size);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
private PercolateSourceBuilder sourceBuilder() {
|
private PercolateSourceBuilder sourceBuilder() {
|
||||||
if (sourceBuilder == null) {
|
if (sourceBuilder == null) {
|
||||||
sourceBuilder = new PercolateSourceBuilder();
|
sourceBuilder = new PercolateSourceBuilder();
|
||||||
|
@ -5,6 +5,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.text.StringText;
|
import org.elasticsearch.common.text.StringText;
|
||||||
import org.elasticsearch.common.text.Text;
|
import org.elasticsearch.common.text.Text;
|
||||||
|
import org.elasticsearch.percolator.PercolatorService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
@ -15,19 +16,32 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||||||
private long count;
|
private long count;
|
||||||
private Text[] matches;
|
private Text[] matches;
|
||||||
|
|
||||||
|
// Request fields:
|
||||||
|
private boolean limit;
|
||||||
|
private int requestedSize;
|
||||||
|
|
||||||
public PercolateShardResponse() {
|
public PercolateShardResponse() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public PercolateShardResponse(Text[] matches, String index, int shardId) {
|
public PercolateShardResponse(Text[] matches, long count, PercolatorService.PercolateContext context, String index, int shardId) {
|
||||||
super(index, shardId);
|
super(index, shardId);
|
||||||
this.matches = matches;
|
this.matches = matches;
|
||||||
this.count = matches.length;
|
this.count = count;
|
||||||
|
this.limit = context.limit;
|
||||||
|
this.requestedSize = context.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
public PercolateShardResponse(long count, String index, int shardId) {
|
public PercolateShardResponse(long count, PercolatorService.PercolateContext context, String index, int shardId) {
|
||||||
super(index, shardId);
|
super(index, shardId);
|
||||||
this.count = count;
|
this.count = count;
|
||||||
this.matches = StringText.EMPTY_ARRAY;
|
this.matches = StringText.EMPTY_ARRAY;
|
||||||
|
this.limit = context.limit;
|
||||||
|
this.requestedSize = context.size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PercolateShardResponse(String index, int shardId) {
|
||||||
|
super(index, shardId);
|
||||||
|
this.matches = StringText.EMPTY_ARRAY;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Text[] matches() {
|
public Text[] matches() {
|
||||||
@ -38,11 +52,22 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean limit() {
|
||||||
|
return limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int requestedSize() {
|
||||||
|
return requestedSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
count = in.readVLong();
|
count = in.readVLong();
|
||||||
matches = in.readTextArray();
|
matches = in.readTextArray();
|
||||||
|
|
||||||
|
limit = in.readBoolean();
|
||||||
|
requestedSize = in.readVInt();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -50,5 +75,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
out.writeVLong(count);
|
out.writeVLong(count);
|
||||||
out.writeTextArray(matches);
|
out.writeTextArray(matches);
|
||||||
|
|
||||||
|
out.writeBoolean(limit);
|
||||||
|
out.writeVLong(requestedSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.index.query.QueryBuilder;
|
|||||||
import org.elasticsearch.search.builder.SearchSourceBuilderException;
|
import org.elasticsearch.search.builder.SearchSourceBuilderException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -37,6 +38,8 @@ public class PercolateSourceBuilder implements ToXContent {
|
|||||||
private DocBuilder docBuilder;
|
private DocBuilder docBuilder;
|
||||||
private QueryBuilder queryBuilder;
|
private QueryBuilder queryBuilder;
|
||||||
private FilterBuilder filterBuilder;
|
private FilterBuilder filterBuilder;
|
||||||
|
private Integer size;
|
||||||
|
private Boolean shortCircuit;
|
||||||
|
|
||||||
public DocBuilder percolateDocument() {
|
public DocBuilder percolateDocument() {
|
||||||
if (docBuilder == null) {
|
if (docBuilder == null) {
|
||||||
@ -49,24 +52,32 @@ public class PercolateSourceBuilder implements ToXContent {
|
|||||||
return docBuilder;
|
return docBuilder;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setDoc(DocBuilder docBuilder) {
|
public PercolateSourceBuilder setDoc(DocBuilder docBuilder) {
|
||||||
this.docBuilder = docBuilder;
|
this.docBuilder = docBuilder;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public QueryBuilder getQueryBuilder() {
|
public QueryBuilder getQueryBuilder() {
|
||||||
return queryBuilder;
|
return queryBuilder;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setQueryBuilder(QueryBuilder queryBuilder) {
|
public PercolateSourceBuilder setQueryBuilder(QueryBuilder queryBuilder) {
|
||||||
this.queryBuilder = queryBuilder;
|
this.queryBuilder = queryBuilder;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FilterBuilder getFilterBuilder() {
|
public FilterBuilder getFilterBuilder() {
|
||||||
return filterBuilder;
|
return filterBuilder;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setFilterBuilder(FilterBuilder filterBuilder) {
|
public PercolateSourceBuilder setFilterBuilder(FilterBuilder filterBuilder) {
|
||||||
this.filterBuilder = filterBuilder;
|
this.filterBuilder = filterBuilder;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PercolateSourceBuilder setSize(int size) {
|
||||||
|
this.size = size;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BytesReference buildAsBytes(XContentType contentType) throws SearchSourceBuilderException {
|
public BytesReference buildAsBytes(XContentType contentType) throws SearchSourceBuilderException {
|
||||||
@ -93,6 +104,9 @@ public class PercolateSourceBuilder implements ToXContent {
|
|||||||
builder.field("filter");
|
builder.field("filter");
|
||||||
filterBuilder.toXContent(builder, params);
|
filterBuilder.toXContent(builder, params);
|
||||||
}
|
}
|
||||||
|
if (size != null) {
|
||||||
|
builder.field("size", size);
|
||||||
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
@ -110,6 +124,13 @@ public class PercolateSourceBuilder implements ToXContent {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DocBuilder setDoc(String field, Object value) {
|
||||||
|
Map<String, Object> values = new HashMap<String, Object>(2);
|
||||||
|
values.put(field, value);
|
||||||
|
setDoc(values);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public DocBuilder setDoc(String doc) {
|
public DocBuilder setDoc(String doc) {
|
||||||
this.doc = new BytesArray(doc);
|
this.doc = new BytesArray(doc);
|
||||||
return this;
|
return this;
|
||||||
|
@ -159,6 +159,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||||||
for (PercolateShardResponse shardResponse : shardResults) {
|
for (PercolateShardResponse shardResponse : shardResults) {
|
||||||
finalCount += shardResponse.count();
|
finalCount += shardResponse.count();
|
||||||
}
|
}
|
||||||
|
|
||||||
return new PercolateResponse(
|
return new PercolateResponse(
|
||||||
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalCount, tookInMillis
|
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalCount, tookInMillis
|
||||||
);
|
);
|
||||||
@ -168,19 +169,29 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||||||
finalCount += response.count();
|
finalCount += response.count();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serializing more than Integer.MAX_VALUE seems insane to me...
|
int requestedSize = shardResults.get(0).requestedSize();
|
||||||
int size = (int) finalCount;
|
boolean limit = shardResults.get(0).limit();
|
||||||
|
if (limit) {
|
||||||
|
requestedSize = (int) Math.min(requestedSize, finalCount);
|
||||||
|
} else {
|
||||||
|
// Serializing more than Integer.MAX_VALUE seems insane to me...
|
||||||
|
requestedSize = (int) finalCount;
|
||||||
|
}
|
||||||
|
|
||||||
// Use a custom impl of AbstractBigArray for Object[]?
|
// Use a custom impl of AbstractBigArray for Object[]?
|
||||||
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(size);
|
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(requestedSize);
|
||||||
for (PercolateShardResponse response : shardResults) {
|
outer: for (PercolateShardResponse response : shardResults) {
|
||||||
Text index = new StringText(response.getIndex());
|
Text index = new StringText(response.getIndex());
|
||||||
for (Text id : response.matches()) {
|
for (Text id : response.matches()) {
|
||||||
finalMatches.add(new PercolateResponse.Match(id, index));
|
finalMatches.add(new PercolateResponse.Match(id, index));
|
||||||
|
if (requestedSize != 0 && finalMatches.size() == requestedSize) {
|
||||||
|
break outer;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new PercolateResponse(
|
return new PercolateResponse(
|
||||||
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalMatches.toArray(new PercolateResponse.Match[size]), finalCount, tookInMillis
|
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalMatches.toArray(new PercolateResponse.Match[requestedSize]), finalCount, tookInMillis
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,13 +31,11 @@ import org.elasticsearch.ElasticSearchParseException;
|
|||||||
import org.elasticsearch.action.percolate.PercolateShardRequest;
|
import org.elasticsearch.action.percolate.PercolateShardRequest;
|
||||||
import org.elasticsearch.action.percolate.PercolateShardResponse;
|
import org.elasticsearch.action.percolate.PercolateShardResponse;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
|
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.text.StringText;
|
|
||||||
import org.elasticsearch.common.text.Text;
|
import org.elasticsearch.common.text.Text;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
@ -85,43 +83,52 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public PercolateShardResponse matchPercolate(final PercolateShardRequest request) {
|
public PercolateShardResponse matchPercolate(final PercolateShardRequest request) {
|
||||||
return innerPercolate(request, new PercolateAction() {
|
return preparePercolate(request, new PercolateAction() {
|
||||||
@Override
|
@Override
|
||||||
public PercolateShardResponse doPercolateAction(PercolateContext context) {
|
public PercolateShardResponse doPercolateAction(PercolateContext context) {
|
||||||
List<Text> matches = new ArrayList<Text>();
|
final List<Text> matches;
|
||||||
|
long count = 0;
|
||||||
if (context.query == null) {
|
if (context.query == null) {
|
||||||
|
matches = new ArrayList<Text>();
|
||||||
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||||
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
|
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
|
||||||
collector.reset();
|
collector.reset();
|
||||||
try {
|
try {
|
||||||
context.searcher.search(entry.getValue(), collector);
|
context.docSearcher.search(entry.getValue(), collector);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (collector.exists()) {
|
if (collector.exists()) {
|
||||||
matches.add(entry.getKey());
|
if (!context.limit) {
|
||||||
|
matches.add(entry.getKey());
|
||||||
|
} else if (count < context.size) {
|
||||||
|
matches.add(entry.getKey());
|
||||||
|
}
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
|
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
|
||||||
try {
|
try {
|
||||||
percolatorSearcher.searcher().search(
|
Match match = match(logger, context.percolateQueries, context.docSearcher, context.fieldDataService, context);
|
||||||
context.query, match(logger, context.percolateQueries, context.searcher, context.fieldDataService, matches)
|
percolatorSearcher.searcher().search(context.query, match);
|
||||||
);
|
matches = match.matches();
|
||||||
|
count = match.counter();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("failed to execute", e);
|
logger.debug("failed to execute", e);
|
||||||
|
throw new PercolateException(context.indexShard.shardId(), "failed to execute", e);
|
||||||
} finally {
|
} finally {
|
||||||
percolatorSearcher.release();
|
percolatorSearcher.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), request.index(), request.shardId());
|
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), count, context, request.index(), request.shardId());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public PercolateShardResponse countPercolate(final PercolateShardRequest request) {
|
public PercolateShardResponse countPercolate(final PercolateShardRequest request) {
|
||||||
return innerPercolate(request, new PercolateAction() {
|
return preparePercolate(request, new PercolateAction() {
|
||||||
@Override
|
@Override
|
||||||
public PercolateShardResponse doPercolateAction(PercolateContext context) {
|
public PercolateShardResponse doPercolateAction(PercolateContext context) {
|
||||||
long count = 0;
|
long count = 0;
|
||||||
@ -130,7 +137,7 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
|
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
|
||||||
collector.reset();
|
collector.reset();
|
||||||
try {
|
try {
|
||||||
context.searcher.search(entry.getValue(), collector);
|
context.docSearcher.search(entry.getValue(), collector);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
||||||
}
|
}
|
||||||
@ -142,7 +149,7 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
} else {
|
} else {
|
||||||
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
|
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
|
||||||
try {
|
try {
|
||||||
Count countCollector = count(logger, context.percolateQueries, context.searcher, context.fieldDataService);
|
Count countCollector = count(logger, context.percolateQueries, context.docSearcher, context.fieldDataService);
|
||||||
percolatorSearcher.searcher().search(context.query, countCollector);
|
percolatorSearcher.searcher().search(context.query, countCollector);
|
||||||
count = countCollector.counter();
|
count = countCollector.counter();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -151,12 +158,12 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
percolatorSearcher.release();
|
percolatorSearcher.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new PercolateShardResponse(count, request.index(), request.shardId());
|
return new PercolateShardResponse(count, context, request.index(), request.shardId());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private PercolateShardResponse innerPercolate(PercolateShardRequest request, PercolateAction action) {
|
private PercolateShardResponse preparePercolate(PercolateShardRequest request, PercolateAction action) {
|
||||||
IndexService percolateIndexService = indicesService.indexServiceSafe(request.index());
|
IndexService percolateIndexService = indicesService.indexServiceSafe(request.index());
|
||||||
IndexShard indexShard = percolateIndexService.shardSafe(request.shardId());
|
IndexShard indexShard = percolateIndexService.shardSafe(request.shardId());
|
||||||
|
|
||||||
@ -164,27 +171,30 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
shardPercolateService.prePercolate();
|
shardPercolateService.prePercolate();
|
||||||
long startTime = System.nanoTime();
|
long startTime = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
final PercolateContext context = new PercolateContext();
|
ConcurrentMap<Text, Query> percolateQueries = indexShard.percolateRegistry().percolateQueries();
|
||||||
context.indexShard = indexShard;
|
if (percolateQueries.isEmpty()) {
|
||||||
context.percolateQueries = indexShard.percolateRegistry().percolateQueries();
|
return new PercolateShardResponse(request.index(), request.shardId());
|
||||||
if (context.percolateQueries.isEmpty()) {
|
|
||||||
return new PercolateShardResponse(StringText.EMPTY_ARRAY, request.index(), request.shardId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ParsedDocument parsedDocument;
|
final PercolateContext context = new PercolateContext();
|
||||||
|
context.percolateQueries = percolateQueries;
|
||||||
|
context.indexShard = indexShard;
|
||||||
|
ParsedDocument parsedDocument = parsePercolate(percolateIndexService, request, context);
|
||||||
if (request.docSource() != null && request.docSource().length() != 0) {
|
if (request.docSource() != null && request.docSource().length() != 0) {
|
||||||
parsedDocument = parseFetchedDoc(request.docSource(), percolateIndexService, request.documentType());
|
parsedDocument = parseFetchedDoc(request.docSource(), percolateIndexService, request.documentType());
|
||||||
context.query = parseQueryOrFilter(percolateIndexService, request.source());
|
} else if (parsedDocument == null) {
|
||||||
} else {
|
throw new ElasticSearchParseException("No doc to percolate in the request");
|
||||||
Tuple<ParsedDocument, Query> parseResult = parsePercolate(percolateIndexService, request.documentType(), request.source());
|
}
|
||||||
parsedDocument = parseResult.v1();
|
|
||||||
context.query = parseResult.v2();
|
if (context.size < 0) {
|
||||||
|
context.size = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// first, parse the source doc into a MemoryIndex
|
// first, parse the source doc into a MemoryIndex
|
||||||
final MemoryIndex memoryIndex = cache.get();
|
final MemoryIndex memoryIndex = cache.get();
|
||||||
try {
|
try {
|
||||||
// TODO: This means percolation does not support nested docs...
|
// TODO: This means percolation does not support nested docs...
|
||||||
|
// So look into: ByteBufferDirectory
|
||||||
for (IndexableField field : parsedDocument.rootDoc().getFields()) {
|
for (IndexableField field : parsedDocument.rootDoc().getFields()) {
|
||||||
if (!field.fieldType().indexed()) {
|
if (!field.fieldType().indexed()) {
|
||||||
continue;
|
continue;
|
||||||
@ -204,15 +214,15 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
context.searcher = memoryIndex.createSearcher();
|
context.docSearcher = memoryIndex.createSearcher();
|
||||||
context.fieldDataService = percolateIndexService.fieldData();
|
context.fieldDataService = percolateIndexService.fieldData();
|
||||||
IndexCache indexCache = percolateIndexService.cache();
|
IndexCache indexCache = percolateIndexService.cache();
|
||||||
try {
|
try {
|
||||||
return action.doPercolateAction(context);
|
return action.doPercolateAction(context);
|
||||||
} finally {
|
} finally {
|
||||||
// explicitly clear the reader, since we can only register on callback on SegmentReader
|
// explicitly clear the reader, since we can only register on callback on SegmentReader
|
||||||
indexCache.clear(context.searcher.getIndexReader());
|
indexCache.clear(context.docSearcher.getIndexReader());
|
||||||
context.fieldDataService.clear(context.searcher.getIndexReader());
|
context.fieldDataService.clear(context.docSearcher.getIndexReader());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
memoryIndex.reset();
|
memoryIndex.reset();
|
||||||
@ -222,8 +232,12 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Tuple<ParsedDocument, Query> parsePercolate(IndexService documentIndexService, String type, BytesReference source) throws ElasticSearchException {
|
private ParsedDocument parsePercolate(IndexService documentIndexService, PercolateShardRequest request, PercolateContext context) throws ElasticSearchException {
|
||||||
Query query = null;
|
BytesReference source = request.source();
|
||||||
|
if (source == null || source.length() == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
ParsedDocument doc = null;
|
ParsedDocument doc = null;
|
||||||
XContentParser parser = null;
|
XContentParser parser = null;
|
||||||
try {
|
try {
|
||||||
@ -241,21 +255,29 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
MapperService mapperService = documentIndexService.mapperService();
|
MapperService mapperService = documentIndexService.mapperService();
|
||||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
|
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
|
||||||
doc = docMapper.parse(source(parser).type(type).flyweight(true));
|
doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true));
|
||||||
}
|
}
|
||||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||||
if ("query".equals(currentFieldName)) {
|
if ("query".equals(currentFieldName)) {
|
||||||
if (query != null) {
|
if (context.query != null) {
|
||||||
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
||||||
}
|
}
|
||||||
query = documentIndexService.queryParserService().parse(parser).query();
|
context.query = documentIndexService.queryParserService().parse(parser).query();
|
||||||
} else if ("filter".equals(currentFieldName)) {
|
} else if ("filter".equals(currentFieldName)) {
|
||||||
if (query != null) {
|
if (context.query != null) {
|
||||||
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
||||||
}
|
}
|
||||||
Filter filter = documentIndexService.queryParserService().parseInnerFilter(parser).filter();
|
Filter filter = documentIndexService.queryParserService().parseInnerFilter(parser).filter();
|
||||||
query = new XConstantScoreQuery(filter);
|
context.query = new XConstantScoreQuery(filter);
|
||||||
|
}
|
||||||
|
} else if (token.isValue()) {
|
||||||
|
if ("size".equals(currentFieldName)) {
|
||||||
|
context.limit = true;
|
||||||
|
context.size = parser.intValue();
|
||||||
|
if (context.size < 0) {
|
||||||
|
throw new ElasticSearchParseException("size is set to [" + context.size + "] and is expected to be higher or equal to 0");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (token == null) {
|
} else if (token == null) {
|
||||||
break;
|
break;
|
||||||
@ -269,11 +291,7 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (doc == null) {
|
return doc;
|
||||||
throw new ElasticSearchParseException("No doc to percolate in the request");
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Tuple<ParsedDocument, Query>(doc, query);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ParsedDocument parseFetchedDoc(BytesReference fetchedDoc, IndexService documentIndexService, String type) {
|
private ParsedDocument parseFetchedDoc(BytesReference fetchedDoc, IndexService documentIndexService, String type) {
|
||||||
@ -299,48 +317,6 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
return doc;
|
return doc;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Query parseQueryOrFilter(IndexService documentIndexService, BytesReference source) {
|
|
||||||
if (source == null || source.length() == 0) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Query query = null;
|
|
||||||
XContentParser parser = null;
|
|
||||||
try {
|
|
||||||
parser = XContentFactory.xContent(source).createParser(source);
|
|
||||||
String currentFieldName = null;
|
|
||||||
XContentParser.Token token;
|
|
||||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
|
||||||
if (token == XContentParser.Token.FIELD_NAME) {
|
|
||||||
currentFieldName = parser.currentName();
|
|
||||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
|
||||||
if ("query".equals(currentFieldName)) {
|
|
||||||
if (query != null) {
|
|
||||||
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
|
||||||
}
|
|
||||||
query = documentIndexService.queryParserService().parse(parser).query();
|
|
||||||
} else if ("filter".equals(currentFieldName)) {
|
|
||||||
if (query != null) {
|
|
||||||
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
|
||||||
}
|
|
||||||
Filter filter = documentIndexService.queryParserService().parseInnerFilter(parser).filter();
|
|
||||||
query = new XConstantScoreQuery(filter);
|
|
||||||
}
|
|
||||||
} else if (token == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ElasticSearchParseException("failed to parse request", e);
|
|
||||||
} finally {
|
|
||||||
if (parser != null) {
|
|
||||||
parser.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return query;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
@ -351,11 +327,14 @@ public class PercolatorService extends AbstractComponent {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class PercolateContext {
|
public class PercolateContext {
|
||||||
|
|
||||||
|
public boolean limit;
|
||||||
|
public int size;
|
||||||
|
|
||||||
Query query;
|
Query query;
|
||||||
ConcurrentMap<Text, Query> percolateQueries;
|
ConcurrentMap<Text, Query> percolateQueries;
|
||||||
IndexSearcher searcher;
|
IndexSearcher docSearcher;
|
||||||
IndexShard indexShard;
|
IndexShard indexShard;
|
||||||
IndexFieldDataService fieldDataService;
|
IndexFieldDataService fieldDataService;
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import org.elasticsearch.index.mapper.Uid;
|
|||||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
@ -60,8 +61,8 @@ abstract class QueryCollector extends Collector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static Match match(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, List<Text> matches) {
|
static Match match(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, PercolatorService.PercolateContext context) {
|
||||||
return new Match(logger, queries, searcher, fieldData, matches);
|
return new Match(logger, queries, searcher, fieldData, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Count count(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData) {
|
static Count count(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData) {
|
||||||
@ -70,11 +71,15 @@ abstract class QueryCollector extends Collector {
|
|||||||
|
|
||||||
final static class Match extends QueryCollector {
|
final static class Match extends QueryCollector {
|
||||||
|
|
||||||
private final List<Text> matches;
|
private final List<Text> matches = new ArrayList<Text>();
|
||||||
|
private final boolean limit;
|
||||||
|
private final int size;
|
||||||
|
private long counter = 0;
|
||||||
|
|
||||||
Match(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, List<Text> matches) {
|
Match(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, PercolatorService.PercolateContext context) {
|
||||||
super(logger, queries, searcher, fieldData);
|
super(logger, queries, searcher, fieldData);
|
||||||
this.matches = matches;
|
this.limit = context.limit;
|
||||||
|
this.size = context.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -94,13 +99,26 @@ abstract class QueryCollector extends Collector {
|
|||||||
collector.reset();
|
collector.reset();
|
||||||
searcher.search(query, collector);
|
searcher.search(query, collector);
|
||||||
if (collector.exists()) {
|
if (collector.exists()) {
|
||||||
matches.add(id);
|
if (!limit) {
|
||||||
|
matches.add(id);
|
||||||
|
} else if (counter < size) {
|
||||||
|
matches.add(id);
|
||||||
|
}
|
||||||
|
counter++;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("[" + id + "] failed to execute query", e);
|
logger.warn("[" + id + "] failed to execute query", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long counter() {
|
||||||
|
return counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Text> matches() {
|
||||||
|
return matches;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final static class Count extends QueryCollector {
|
final static class Count extends QueryCollector {
|
||||||
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
|
|||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.count.CountResponse;
|
import org.elasticsearch.action.count.CountResponse;
|
||||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||||
|
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.support.IgnoreIndices;
|
import org.elasticsearch.action.support.IgnoreIndices;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
@ -46,6 +47,7 @@ import org.junit.Test;
|
|||||||
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
|
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.*;
|
import static org.elasticsearch.common.xcontent.XContentFactory.*;
|
||||||
|
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.*;
|
import static org.elasticsearch.index.query.QueryBuilders.*;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
@ -466,10 +468,12 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||||||
.setRefresh(true)
|
.setRefresh(true)
|
||||||
.execute().actionGet();
|
.execute().actionGet();
|
||||||
|
|
||||||
|
PercolateSourceBuilder sourceBuilder = new PercolateSourceBuilder()
|
||||||
|
.setDoc(docBuilder().setDoc(jsonBuilder().startObject().startObject("type1").field("field1", "value2").endObject().endObject()))
|
||||||
|
.setQueryBuilder(termQuery("color", "red"));
|
||||||
percolate = client().preparePercolate()
|
percolate = client().preparePercolate()
|
||||||
.setIndices("test").setDocumentType("type1")
|
.setIndices("test").setDocumentType("type1")
|
||||||
.setSource(jsonBuilder().startObject().startObject("doc").startObject("type1").field("field1", "value2").endObject().endObject()
|
.setSource(sourceBuilder)
|
||||||
.field("query", termQuery("color", "red")).endObject())
|
|
||||||
.execute().actionGet();
|
.execute().actionGet();
|
||||||
assertThat(percolate.getMatches(), arrayWithSize(1));
|
assertThat(percolate.getMatches(), arrayWithSize(1));
|
||||||
assertThat(convertFromTextArray(percolate.getMatches(), "test"), arrayContaining("susu"));
|
assertThat(convertFromTextArray(percolate.getMatches(), "test"), arrayContaining("susu"));
|
||||||
@ -1077,6 +1081,99 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPercolateSizingWithQueryAndFilter() throws Exception {
|
||||||
|
client().admin().indices().prepareCreate("test").execute().actionGet();
|
||||||
|
ensureGreen();
|
||||||
|
|
||||||
|
int numLevels = randomIntBetween(1, 25);
|
||||||
|
long numQueriesPerLevel = randomIntBetween(10, 250);
|
||||||
|
long totalQueries = numLevels * numQueriesPerLevel;
|
||||||
|
logger.info("--> register " + totalQueries +" queries");
|
||||||
|
for (int level = 1; level <= numLevels; level++) {
|
||||||
|
for (int query = 1; query <= numQueriesPerLevel; query++) {
|
||||||
|
client().prepareIndex("my-index", "_percolator", level + "-" + query)
|
||||||
|
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).field("level", level).endObject())
|
||||||
|
.execute().actionGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean onlyCount = randomBoolean();
|
||||||
|
PercolateResponse response = client().preparePercolate()
|
||||||
|
.setIndices("my-index").setDocumentType("my-type")
|
||||||
|
.setOnlyCount(onlyCount)
|
||||||
|
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||||
|
.execute().actionGet();
|
||||||
|
assertNoFailures(response);
|
||||||
|
assertThat(response.getCount(), equalTo(totalQueries));
|
||||||
|
if (!onlyCount) {
|
||||||
|
assertThat(response.getMatches().length, equalTo((int) totalQueries));
|
||||||
|
}
|
||||||
|
|
||||||
|
int size = randomIntBetween(0, (int) totalQueries - 1);
|
||||||
|
response = client().preparePercolate()
|
||||||
|
.setIndices("my-index").setDocumentType("my-type")
|
||||||
|
.setOnlyCount(onlyCount)
|
||||||
|
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||||
|
.setSize(size)
|
||||||
|
.execute().actionGet();
|
||||||
|
assertNoFailures(response);
|
||||||
|
assertThat(response.getCount(), equalTo(totalQueries));
|
||||||
|
if (!onlyCount) {
|
||||||
|
assertThat(response.getMatches().length, equalTo(size));
|
||||||
|
}
|
||||||
|
|
||||||
|
// The query / filter capabilities are NOT in realtime
|
||||||
|
client().admin().indices().prepareRefresh("my-index").execute().actionGet();
|
||||||
|
|
||||||
|
int runs = randomIntBetween(3, 16);
|
||||||
|
for (int i = 0; i < runs; i++) {
|
||||||
|
onlyCount = randomBoolean();
|
||||||
|
response = client().preparePercolate()
|
||||||
|
.setIndices("my-index").setDocumentType("my-type")
|
||||||
|
.setOnlyCount(onlyCount)
|
||||||
|
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||||
|
.setPercolateQuery(termQuery("level", 1 + randomInt(numLevels - 1)))
|
||||||
|
.execute().actionGet();
|
||||||
|
assertNoFailures(response);
|
||||||
|
assertThat(response.getCount(), equalTo(numQueriesPerLevel));
|
||||||
|
if (!onlyCount) {
|
||||||
|
assertThat(response.getMatches().length, equalTo((int) numQueriesPerLevel));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < runs; i++) {
|
||||||
|
onlyCount = randomBoolean();
|
||||||
|
response = client().preparePercolate()
|
||||||
|
.setIndices("my-index").setDocumentType("my-type")
|
||||||
|
.setOnlyCount(onlyCount)
|
||||||
|
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||||
|
.setPercolateFilter(termFilter("level", 1 + randomInt(numLevels - 1)))
|
||||||
|
.execute().actionGet();
|
||||||
|
assertNoFailures(response);
|
||||||
|
assertThat(response.getCount(), equalTo(numQueriesPerLevel));
|
||||||
|
if (!onlyCount) {
|
||||||
|
assertThat(response.getMatches().length, equalTo((int) numQueriesPerLevel));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < runs; i++) {
|
||||||
|
onlyCount = randomBoolean();
|
||||||
|
size = randomIntBetween(0, (int) numQueriesPerLevel - 1);
|
||||||
|
response = client().preparePercolate()
|
||||||
|
.setIndices("my-index").setDocumentType("my-type")
|
||||||
|
.setOnlyCount(onlyCount)
|
||||||
|
.setSize(size)
|
||||||
|
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||||
|
.setPercolateFilter(termFilter("level", 1 + randomInt(numLevels - 1)))
|
||||||
|
.execute().actionGet();
|
||||||
|
assertNoFailures(response);
|
||||||
|
assertThat(response.getCount(), equalTo(numQueriesPerLevel));
|
||||||
|
if (!onlyCount) {
|
||||||
|
assertThat(response.getMatches().length, equalTo(size));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static String[] convertFromTextArray(PercolateResponse.Match[] matches, String index) {
|
public static String[] convertFromTextArray(PercolateResponse.Match[] matches, String index) {
|
||||||
if (matches.length == 0) {
|
if (matches.length == 0) {
|
||||||
return Strings.EMPTY_ARRAY;
|
return Strings.EMPTY_ARRAY;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user