Added count percolate api
Added a new percolate api that only returns the number of percolate queries that have matched with the document being percolated. The actual query ids are not included. The percolate total count will be put in the total field and is the only result that will be returned from the dedicated count apis. The total field will also be included in the already existing percolate and percolating existing document apis and are equal to the number of matches. Closes #3430
This commit is contained in:
parent
2a211705a3
commit
890d06f018
|
@ -48,6 +48,7 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
private String routing;
|
||||
private String preference;
|
||||
private GetRequest getRequest;
|
||||
private boolean onlyCount;
|
||||
|
||||
private BytesReference source;
|
||||
private boolean unsafe;
|
||||
|
@ -58,7 +59,7 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
// to hold it temporarily in an easy way
|
||||
long startTime;
|
||||
|
||||
PercolateRequest() {
|
||||
public PercolateRequest() {
|
||||
}
|
||||
|
||||
public PercolateRequest(String[] indices, String documentType) {
|
||||
|
@ -73,6 +74,8 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
this.preference = request.preference();
|
||||
this.source = request.source;
|
||||
this.docSource = docSource;
|
||||
this.onlyCount = request.onlyCount;
|
||||
this.startTime = request.startTime;
|
||||
}
|
||||
|
||||
public String documentType() {
|
||||
|
@ -174,6 +177,14 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean onlyCount() {
|
||||
return onlyCount;
|
||||
}
|
||||
|
||||
public void onlyCount(boolean onlyCount) {
|
||||
this.onlyCount = onlyCount;
|
||||
}
|
||||
|
||||
BytesReference docSource() {
|
||||
return docSource;
|
||||
}
|
||||
|
@ -210,6 +221,7 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
getRequest = new GetRequest(null);
|
||||
getRequest.readFrom(in);
|
||||
}
|
||||
onlyCount = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -227,5 +239,6 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
out.writeBoolean(onlyCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,6 +87,14 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<Pe
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether only to return total count and don't keep track of the matches (Count percolation).
|
||||
*/
|
||||
public PercolateRequestBuilder setOnlyCount(boolean onlyCount) {
|
||||
request.onlyCount(onlyCount);
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setSource(PercolateSourceBuilder source) {
|
||||
sourceBuilder = source;
|
||||
return this;
|
||||
|
|
|
@ -37,13 +37,24 @@ import java.util.List;
|
|||
*/
|
||||
public class PercolateResponse extends BroadcastOperationResponse implements Iterable<PercolateResponse.Match> {
|
||||
|
||||
private static final Match[] EMPTY = new Match[0];
|
||||
|
||||
private long tookInMillis;
|
||||
private Match[] matches;
|
||||
private long count;
|
||||
|
||||
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, Match[] matches, long tookInMillis) {
|
||||
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, Match[] matches, long count, long tookInMillis) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
this.tookInMillis = tookInMillis;
|
||||
this.matches = matches;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, long count, long tookInMillis) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
this.tookInMillis = tookInMillis;
|
||||
this.matches = EMPTY;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, long tookInMillis) {
|
||||
|
@ -76,6 +87,10 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
|
|||
return this.matches;
|
||||
}
|
||||
|
||||
public long getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Match> iterator() {
|
||||
return Arrays.asList(matches).iterator();
|
||||
|
@ -85,6 +100,7 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
tookInMillis = in.readVLong();
|
||||
count = in.readVLong();
|
||||
int size = in.readVInt();
|
||||
matches = new Match[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
@ -97,6 +113,7 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVLong(tookInMillis);
|
||||
out.writeVLong(count);
|
||||
out.writeVInt(matches.length);
|
||||
for (Match match : matches) {
|
||||
match.writeTo(out);
|
||||
|
|
|
@ -14,6 +14,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
private String documentType;
|
||||
private BytesReference source;
|
||||
private BytesReference docSource;
|
||||
private boolean onlyCount;
|
||||
|
||||
public PercolateShardRequest() {
|
||||
}
|
||||
|
@ -23,6 +24,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
this.documentType = request.documentType();
|
||||
this.source = request.source();
|
||||
this.docSource = request.docSource();
|
||||
this.onlyCount = request.onlyCount();
|
||||
}
|
||||
|
||||
public String documentType() {
|
||||
|
@ -37,12 +39,17 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
return docSource;
|
||||
}
|
||||
|
||||
public boolean onlyCount() {
|
||||
return onlyCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
documentType = in.readString();
|
||||
source = in.readBytesReference();
|
||||
docSource = in.readBytesReference();
|
||||
onlyCount = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -51,6 +58,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
out.writeString(documentType);
|
||||
out.writeBytesReference(source);
|
||||
out.writeBytesReference(docSource);
|
||||
out.writeBoolean(onlyCount);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.elasticsearch.action.percolate;
|
|||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.text.StringText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -11,6 +12,7 @@ import java.io.IOException;
|
|||
*/
|
||||
public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
||||
|
||||
private long count;
|
||||
private Text[] matches;
|
||||
|
||||
public PercolateShardResponse() {
|
||||
|
@ -19,25 +21,34 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
public PercolateShardResponse(Text[] matches, String index, int shardId) {
|
||||
super(index, shardId);
|
||||
this.matches = matches;
|
||||
this.count = matches.length;
|
||||
}
|
||||
|
||||
public PercolateShardResponse(long count, String index, int shardId) {
|
||||
super(index, shardId);
|
||||
this.count = count;
|
||||
this.matches = StringText.EMPTY_ARRAY;
|
||||
}
|
||||
|
||||
public Text[] matches() {
|
||||
return matches;
|
||||
}
|
||||
|
||||
public void matches(Text[] matches) {
|
||||
this.matches = matches;
|
||||
public long count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
count = in.readVLong();
|
||||
matches = in.readTextArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVLong(count);
|
||||
out.writeTextArray(matches);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -154,11 +154,23 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
return new PercolateResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, tookInMillis);
|
||||
}
|
||||
|
||||
int size = 0;
|
||||
if (request.onlyCount()) {
|
||||
long finalCount = 0;
|
||||
for (PercolateShardResponse shardResponse : shardResults) {
|
||||
finalCount += shardResponse.count();
|
||||
}
|
||||
return new PercolateResponse(
|
||||
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalCount, tookInMillis
|
||||
);
|
||||
} else {
|
||||
long finalCount = 0;
|
||||
for (PercolateShardResponse response : shardResults) {
|
||||
size += response.matches().length;
|
||||
finalCount += response.count();
|
||||
}
|
||||
|
||||
// Serializing more than Integer.MAX_VALUE seems insane to me...
|
||||
int size = (int) finalCount;
|
||||
// Use a custom impl of AbstractBigArray for Object[]?
|
||||
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(size);
|
||||
for (PercolateShardResponse response : shardResults) {
|
||||
Text index = new StringText(response.getIndex());
|
||||
|
@ -168,9 +180,10 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
}
|
||||
|
||||
return new PercolateResponse(
|
||||
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalMatches.toArray(new PercolateResponse.Match[size]), tookInMillis
|
||||
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalMatches.toArray(new PercolateResponse.Match[size]), finalCount, tookInMillis
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PercolateShardRequest newShardRequest() {
|
||||
|
@ -196,7 +209,11 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
@Override
|
||||
protected PercolateShardResponse shardOperation(PercolateShardRequest request) throws ElasticSearchException {
|
||||
try {
|
||||
return percolatorService.percolate(request);
|
||||
if (request.onlyCount()) {
|
||||
return percolatorService.countPercolate(request);
|
||||
} else {
|
||||
return percolatorService.matchPercolate(request);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.trace("[{}][{}] failed to percolate", t, request.index(), request.shardId());
|
||||
ShardId shardId = new ShardId(request.index(), request.shardId());
|
||||
|
|
|
@ -62,6 +62,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.elasticsearch.index.mapper.SourceToParse.source;
|
||||
import static org.elasticsearch.percolator.QueryCollector.*;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -83,7 +84,79 @@ public class PercolatorService extends AbstractComponent {
|
|||
};
|
||||
}
|
||||
|
||||
public PercolateShardResponse percolate(PercolateShardRequest request) {
|
||||
public PercolateShardResponse matchPercolate(final PercolateShardRequest request) {
|
||||
return innerPercolate(request, new PercolateAction() {
|
||||
@Override
|
||||
public PercolateShardResponse doPercolateAction(PercolateContext context) {
|
||||
List<Text> matches = new ArrayList<Text>();
|
||||
if (context.query == null) {
|
||||
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
|
||||
collector.reset();
|
||||
try {
|
||||
context.searcher.search(entry.getValue(), collector);
|
||||
} catch (IOException e) {
|
||||
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
||||
}
|
||||
|
||||
if (collector.exists()) {
|
||||
matches.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
|
||||
try {
|
||||
percolatorSearcher.searcher().search(
|
||||
context.query, match(logger, context.percolateQueries, context.searcher, context.fieldDataService, matches)
|
||||
);
|
||||
} catch (IOException e) {
|
||||
logger.warn("failed to execute", e);
|
||||
} finally {
|
||||
percolatorSearcher.release();
|
||||
}
|
||||
}
|
||||
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), request.index(), request.shardId());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public PercolateShardResponse countPercolate(final PercolateShardRequest request) {
|
||||
return innerPercolate(request, new PercolateAction() {
|
||||
@Override
|
||||
public PercolateShardResponse doPercolateAction(PercolateContext context) {
|
||||
long count = 0;
|
||||
if (context.query == null) {
|
||||
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
|
||||
collector.reset();
|
||||
try {
|
||||
context.searcher.search(entry.getValue(), collector);
|
||||
} catch (IOException e) {
|
||||
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
||||
}
|
||||
|
||||
if (collector.exists()) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
|
||||
try {
|
||||
Count countCollector = count(logger, context.percolateQueries, context.searcher, context.fieldDataService);
|
||||
percolatorSearcher.searcher().search(context.query, countCollector);
|
||||
count = countCollector.counter();
|
||||
} catch (IOException e) {
|
||||
logger.warn("failed to execute", e);
|
||||
} finally {
|
||||
percolatorSearcher.release();
|
||||
}
|
||||
}
|
||||
return new PercolateShardResponse(count, request.index(), request.shardId());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private PercolateShardResponse innerPercolate(PercolateShardRequest request, PercolateAction action) {
|
||||
IndexService percolateIndexService = indicesService.indexServiceSafe(request.index());
|
||||
IndexShard indexShard = percolateIndexService.shardSafe(request.shardId());
|
||||
|
||||
|
@ -91,23 +164,23 @@ public class PercolatorService extends AbstractComponent {
|
|||
shardPercolateService.prePercolate();
|
||||
long startTime = System.nanoTime();
|
||||
try {
|
||||
ConcurrentMap<Text, Query> percolateQueries = indexShard.percolateRegistry().percolateQueries();
|
||||
if (percolateQueries.isEmpty()) {
|
||||
final PercolateContext context = new PercolateContext();
|
||||
context.indexShard = indexShard;
|
||||
context.percolateQueries = indexShard.percolateRegistry().percolateQueries();
|
||||
if (context.percolateQueries.isEmpty()) {
|
||||
return new PercolateShardResponse(StringText.EMPTY_ARRAY, request.index(), request.shardId());
|
||||
}
|
||||
|
||||
ParsedDocument parsedDocument;
|
||||
Query query;
|
||||
if (request.docSource() != null && request.docSource().length() != 0) {
|
||||
parsedDocument = parseFetchedDoc(request.docSource(), percolateIndexService, request.documentType());
|
||||
query = parseQueryOrFilter(percolateIndexService, request.source());
|
||||
context.query = parseQueryOrFilter(percolateIndexService, request.source());
|
||||
} else {
|
||||
Tuple<ParsedDocument, Query> parseResult = parsePercolate(percolateIndexService, request.documentType(), request.source());
|
||||
parsedDocument = parseResult.v1();
|
||||
query = parseResult.v2();
|
||||
context.query = parseResult.v2();
|
||||
}
|
||||
|
||||
|
||||
// first, parse the source doc into a MemoryIndex
|
||||
final MemoryIndex memoryIndex = cache.get();
|
||||
try {
|
||||
|
@ -131,44 +204,16 @@ public class PercolatorService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
final IndexSearcher searcher = memoryIndex.createSearcher();
|
||||
List<Text> matches = new ArrayList<Text>();
|
||||
|
||||
IndexFieldDataService fieldDataService = percolateIndexService.fieldData();
|
||||
context.searcher = memoryIndex.createSearcher();
|
||||
context.fieldDataService = percolateIndexService.fieldData();
|
||||
IndexCache indexCache = percolateIndexService.cache();
|
||||
try {
|
||||
if (query == null) {
|
||||
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
for (Map.Entry<Text, Query> entry : percolateQueries.entrySet()) {
|
||||
collector.reset();
|
||||
try {
|
||||
searcher.search(entry.getValue(), collector);
|
||||
} catch (IOException e) {
|
||||
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
||||
}
|
||||
|
||||
if (collector.exists()) {
|
||||
matches.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Engine.Searcher percolatorSearcher = indexShard.searcher();
|
||||
try {
|
||||
percolatorSearcher.searcher().search(
|
||||
query, new QueryCollector(logger, percolateQueries, searcher, fieldDataService, matches)
|
||||
);
|
||||
} catch (IOException e) {
|
||||
logger.warn("failed to execute", e);
|
||||
} finally {
|
||||
percolatorSearcher.release();
|
||||
}
|
||||
}
|
||||
return action.doPercolateAction(context);
|
||||
} finally {
|
||||
// explicitly clear the reader, since we can only register on callback on SegmentReader
|
||||
indexCache.clear(searcher.getIndexReader());
|
||||
fieldDataService.clear(searcher.getIndexReader());
|
||||
indexCache.clear(context.searcher.getIndexReader());
|
||||
context.fieldDataService.clear(context.searcher.getIndexReader());
|
||||
}
|
||||
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), request.index(), request.shardId());
|
||||
} finally {
|
||||
memoryIndex.reset();
|
||||
}
|
||||
|
@ -300,6 +345,22 @@ public class PercolatorService extends AbstractComponent {
|
|||
cache.close();
|
||||
}
|
||||
|
||||
interface PercolateAction {
|
||||
|
||||
PercolateShardResponse doPercolateAction(PercolateContext context);
|
||||
|
||||
}
|
||||
|
||||
class PercolateContext {
|
||||
|
||||
Query query;
|
||||
ConcurrentMap<Text, Query> percolateQueries;
|
||||
IndexSearcher searcher;
|
||||
IndexShard indexShard;
|
||||
IndexFieldDataService fieldDataService;
|
||||
|
||||
}
|
||||
|
||||
public static final class Constants {
|
||||
|
||||
public static final String TYPE_NAME = "_percolator";
|
||||
|
|
|
@ -25,23 +25,21 @@ import java.util.concurrent.ConcurrentMap;
|
|||
|
||||
/**
|
||||
*/
|
||||
final class QueryCollector extends Collector {
|
||||
abstract class QueryCollector extends Collector {
|
||||
|
||||
private final IndexFieldData uidFieldData;
|
||||
private final IndexSearcher searcher;
|
||||
private final List<Text> matches;
|
||||
private final ConcurrentMap<Text, Query> queries;
|
||||
private final ESLogger logger;
|
||||
final IndexFieldData uidFieldData;
|
||||
final IndexSearcher searcher;
|
||||
final ConcurrentMap<Text, Query> queries;
|
||||
final ESLogger logger;
|
||||
|
||||
private final Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
final Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
|
||||
private BytesValues values;
|
||||
BytesValues values;
|
||||
|
||||
QueryCollector(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, List<Text> matches) {
|
||||
QueryCollector(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData) {
|
||||
this.logger = logger;
|
||||
this.queries = queries;
|
||||
this.searcher = searcher;
|
||||
this.matches = matches;
|
||||
// TODO: when we move to a UID level mapping def on the index level, we can use that one, now, its per type, and we can't easily choose one
|
||||
this.uidFieldData = fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes")));
|
||||
}
|
||||
|
@ -50,6 +48,35 @@ final class QueryCollector extends Collector {
|
|||
public void setScorer(Scorer scorer) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextReader(AtomicReaderContext context) throws IOException {
|
||||
// we use the UID because id might not be indexed
|
||||
values = uidFieldData.load(context).getBytesValues();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean acceptsDocsOutOfOrder() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
static Match match(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, List<Text> matches) {
|
||||
return new Match(logger, queries, searcher, fieldData, matches);
|
||||
}
|
||||
|
||||
static Count count(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData) {
|
||||
return new Count(logger, queries, searcher, fieldData);
|
||||
}
|
||||
|
||||
final static class Match extends QueryCollector {
|
||||
|
||||
private final List<Text> matches;
|
||||
|
||||
Match(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData, List<Text> matches) {
|
||||
super(logger, queries, searcher, fieldData);
|
||||
this.matches = matches;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
BytesRef uid = values.getValue(doc);
|
||||
|
@ -74,14 +101,44 @@ final class QueryCollector extends Collector {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextReader(AtomicReaderContext context) throws IOException {
|
||||
// we use the UID because id might not be indexed
|
||||
values = uidFieldData.load(context).getBytesValues();
|
||||
}
|
||||
|
||||
final static class Count extends QueryCollector {
|
||||
|
||||
private long counter = 0;
|
||||
|
||||
Count(ESLogger logger, ConcurrentMap<Text, Query> queries, IndexSearcher searcher, IndexFieldDataService fieldData) {
|
||||
super(logger, queries, searcher, fieldData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean acceptsDocsOutOfOrder() {
|
||||
return true;
|
||||
public void collect(int doc) throws IOException {
|
||||
BytesRef uid = values.getValue(doc);
|
||||
if (uid == null) {
|
||||
return;
|
||||
}
|
||||
Text id = new BytesText(Uid.idFromUid(uid));
|
||||
Query query = queries.get(id);
|
||||
if (query == null) {
|
||||
// log???
|
||||
return;
|
||||
}
|
||||
// run the query
|
||||
try {
|
||||
collector.reset();
|
||||
searcher.search(query, collector);
|
||||
if (collector.exists()) {
|
||||
counter++;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("[" + id + "] failed to execute query", e);
|
||||
}
|
||||
}
|
||||
|
||||
long counter() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -51,16 +51,23 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
super(settings, client);
|
||||
controller.registerHandler(GET, "/{index}/{type}/_percolate", this);
|
||||
controller.registerHandler(POST, "/{index}/{type}/_percolate", this);
|
||||
controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate", new RestPercolateExistingDocHandler());
|
||||
controller.registerHandler(POST, "/{index}/{type}/{id}/_percolate", new RestPercolateExistingDocHandler());
|
||||
|
||||
RestPercolateExistingDocHandler existingDocHandler = new RestPercolateExistingDocHandler();
|
||||
controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate", existingDocHandler);
|
||||
controller.registerHandler(POST, "/{index}/{type}/{id}/_percolate", existingDocHandler);
|
||||
|
||||
RestCountPercolateDocHandler countHandler = new RestCountPercolateDocHandler();
|
||||
controller.registerHandler(GET, "/{index}/{type}/_percolate/count", countHandler);
|
||||
controller.registerHandler(POST, "/{index}/{type}/_percolate/count", countHandler);
|
||||
|
||||
RestCountPercolateExistingDocHandler countExistingDocHandler = new RestCountPercolateExistingDocHandler();
|
||||
controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate/count", countExistingDocHandler);
|
||||
controller.registerHandler(POST, "/{index}/{type}/{id}/_percolate/count", countExistingDocHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
String[] indices = RestActions.splitIndices(restRequest.param("index"));
|
||||
String type = restRequest.param("type");
|
||||
|
||||
PercolateRequest percolateRequest = new PercolateRequest(indices, type);
|
||||
void parseDocPercolate(PercolateRequest percolateRequest, RestRequest restRequest, RestChannel restChannel) {
|
||||
percolateRequest.indices(RestActions.splitIndices(restRequest.param("index")));
|
||||
percolateRequest.documentType(restRequest.param("type"));
|
||||
percolateRequest.routing(restRequest.param("routing"));
|
||||
percolateRequest.preference(restRequest.param("preference"));
|
||||
percolateRequest.source(restRequest.content(), restRequest.contentUnsafe());
|
||||
|
@ -73,7 +80,36 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
executePercolate(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
|
||||
void executePercolate(PercolateRequest percolateRequest, final RestRequest restRequest, final RestChannel restChannel) {
|
||||
void parseExistingDocPercolate(PercolateRequest percolateRequest, RestRequest restRequest, RestChannel restChannel) {
|
||||
String index = restRequest.param("index");
|
||||
String type = restRequest.param("type");
|
||||
percolateRequest.indices(RestActions.splitIndices(restRequest.param("percolate_index", index)));
|
||||
percolateRequest.documentType(restRequest.param("percolate_type", type));
|
||||
|
||||
GetRequest getRequest = new GetRequest(index, type,
|
||||
restRequest.param("id"));
|
||||
getRequest.routing(restRequest.param("routing"));
|
||||
getRequest.preference(restRequest.param("preference"));
|
||||
getRequest.refresh(restRequest.paramAsBoolean("refresh", getRequest.refresh()));
|
||||
getRequest.realtime(restRequest.paramAsBooleanOptional("realtime", null));
|
||||
getRequest.version(RestActions.parseVersion(restRequest));
|
||||
getRequest.versionType(VersionType.fromString(restRequest.param("version_type"), getRequest.versionType()));
|
||||
|
||||
percolateRequest.getRequest(getRequest);
|
||||
percolateRequest.routing(restRequest.param("percolate_routing"));
|
||||
percolateRequest.preference(restRequest.param("percolate_preference"));
|
||||
percolateRequest.source(restRequest.content(), restRequest.contentUnsafe());
|
||||
|
||||
percolateRequest.routing(restRequest.param("percolate_routing"));
|
||||
percolateRequest.preference(restRequest.param("percolate_preference"));
|
||||
|
||||
if (restRequest.hasParam("ignore_indices")) {
|
||||
percolateRequest.ignoreIndices(IgnoreIndices.fromString(restRequest.param("ignore_indices")));
|
||||
}
|
||||
executePercolate(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
|
||||
void executePercolate(final PercolateRequest percolateRequest, final RestRequest restRequest, final RestChannel restChannel) {
|
||||
// we just send a response, no need to fork
|
||||
percolateRequest.listenerThreaded(false);
|
||||
client.percolate(percolateRequest, new ActionListener<PercolateResponse>() {
|
||||
|
@ -102,6 +138,8 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.field(Fields.TOTAL, response.getCount());
|
||||
if (!percolateRequest.onlyCount()) {
|
||||
builder.startArray(Fields.MATCHES);
|
||||
boolean justIds = "ids".equals(restRequest.param("percolate_format"));
|
||||
if (justIds) {
|
||||
|
@ -117,6 +155,7 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
}
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
|
||||
|
@ -137,38 +176,40 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
});
|
||||
}
|
||||
|
||||
class RestPercolateExistingDocHandler implements RestHandler {
|
||||
@Override
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
PercolateRequest percolateRequest = new PercolateRequest();
|
||||
parseDocPercolate(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
|
||||
final class RestCountPercolateDocHandler implements RestHandler {
|
||||
|
||||
@Override
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
String index = restRequest.param("index");
|
||||
String type = restRequest.param("type");
|
||||
|
||||
GetRequest getRequest = new GetRequest(index, type,
|
||||
restRequest.param("id"));
|
||||
getRequest.routing(restRequest.param("routing"));
|
||||
getRequest.preference(restRequest.param("preference"));
|
||||
getRequest.refresh(restRequest.paramAsBoolean("refresh", getRequest.refresh()));
|
||||
getRequest.realtime(restRequest.paramAsBooleanOptional("realtime", null));
|
||||
getRequest.version(RestActions.parseVersion(restRequest));
|
||||
getRequest.versionType(VersionType.fromString(restRequest.param("version_type"), getRequest.versionType()));
|
||||
|
||||
PercolateRequest percolateRequest = new PercolateRequest(
|
||||
RestActions.splitIndices(restRequest.param("percolate_index", index)),
|
||||
restRequest.param("percolate_type", type)
|
||||
);
|
||||
percolateRequest.getRequest(getRequest);
|
||||
percolateRequest.routing(restRequest.param("percolate_routing"));
|
||||
percolateRequest.preference(restRequest.param("percolate_preference"));
|
||||
percolateRequest.source(restRequest.content(), restRequest.contentUnsafe());
|
||||
|
||||
percolateRequest.routing(restRequest.param("percolate_routing"));
|
||||
percolateRequest.preference(restRequest.param("percolate_preference"));
|
||||
|
||||
if (restRequest.hasParam("ignore_indices")) {
|
||||
percolateRequest.ignoreIndices(IgnoreIndices.fromString(restRequest.param("ignore_indices")));
|
||||
PercolateRequest percolateRequest = new PercolateRequest();
|
||||
percolateRequest.onlyCount(true);
|
||||
parseDocPercolate(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
executePercolate(percolateRequest, restRequest, restChannel);
|
||||
|
||||
}
|
||||
|
||||
final class RestPercolateExistingDocHandler implements RestHandler {
|
||||
|
||||
@Override
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
PercolateRequest percolateRequest = new PercolateRequest();
|
||||
parseExistingDocPercolate(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
final class RestCountPercolateExistingDocHandler implements RestHandler {
|
||||
|
||||
@Override
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
PercolateRequest percolateRequest = new PercolateRequest();
|
||||
percolateRequest.onlyCount(true);
|
||||
parseExistingDocPercolate(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -97,6 +97,7 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
.setPercolateDoc(docBuilder().setDoc(yamlBuilder().startObject().field("field1", "c").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(response.getCount(), equalTo(2l));
|
||||
assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContainingInAnyOrder("2", "4"));
|
||||
|
||||
logger.info("--> Percolate doc with field1=b c");
|
||||
|
@ -105,6 +106,7 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
.setPercolateDoc(docBuilder().setDoc(smileBuilder().startObject().field("field1", "b c").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(4));
|
||||
assertThat(response.getCount(), equalTo(4l));
|
||||
assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContainingInAnyOrder("1", "2", "3", "4"));
|
||||
|
||||
logger.info("--> Percolate doc with field1=d");
|
||||
|
@ -113,6 +115,7 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", "d").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(1));
|
||||
assertThat(response.getCount(), equalTo(1l));
|
||||
assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContaining("4"));
|
||||
|
||||
logger.info("--> Search dummy doc, percolate queries must not be included");
|
||||
|
@ -879,6 +882,145 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountPercolation() throws Exception {
|
||||
client().admin().indices().prepareCreate("test").execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> Add dummy doc");
|
||||
client().prepareIndex("test", "type", "1").setSource("field", "value").execute().actionGet();
|
||||
|
||||
logger.info("--> register a queries");
|
||||
client().prepareIndex("test", "_percolator", "1")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "b")).field("a", "b").endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "2")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "c")).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "3")
|
||||
.setSource(jsonBuilder().startObject().field("query", boolQuery()
|
||||
.must(matchQuery("field1", "b"))
|
||||
.must(matchQuery("field1", "c"))
|
||||
).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "4")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
|
||||
.execute().actionGet();
|
||||
client().admin().indices().prepareRefresh("test").execute().actionGet();
|
||||
|
||||
logger.info("--> Count percolate doc with field1=b");
|
||||
PercolateResponse response = client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", "b").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getCount(), equalTo(2l));
|
||||
assertThat(response.getMatches(), emptyArray());
|
||||
|
||||
logger.info("--> Count percolate doc with field1=c");
|
||||
response = client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setPercolateDoc(docBuilder().setDoc(yamlBuilder().startObject().field("field1", "c").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getCount(), equalTo(2l));
|
||||
assertThat(response.getMatches(), emptyArray());
|
||||
|
||||
logger.info("--> Count percolate doc with field1=b c");
|
||||
response = client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setPercolateDoc(docBuilder().setDoc(smileBuilder().startObject().field("field1", "b c").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getCount(), equalTo(4l));
|
||||
assertThat(response.getMatches(), emptyArray());
|
||||
|
||||
logger.info("--> Count percolate doc with field1=d");
|
||||
response = client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", "d").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getCount(), equalTo(1l));
|
||||
assertThat(response.getMatches(), emptyArray());
|
||||
|
||||
logger.info("--> Count percolate non existing doc");
|
||||
try {
|
||||
client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("5"))
|
||||
.execute().actionGet();
|
||||
fail("Exception should have been thrown");
|
||||
} catch (DocumentMissingException e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountPercolatingExistingDocs() throws Exception {
|
||||
client().admin().indices().prepareCreate("test").execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> Adding docs");
|
||||
client().prepareIndex("test", "type", "1").setSource("field1", "b").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "2").setSource("field1", "c").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "3").setSource("field1", "b c").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "4").setSource("field1", "d").execute().actionGet();
|
||||
|
||||
logger.info("--> register a queries");
|
||||
client().prepareIndex("test", "_percolator", "1")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "b")).field("a", "b").endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "2")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "c")).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "3")
|
||||
.setSource(jsonBuilder().startObject().field("query", boolQuery()
|
||||
.must(matchQuery("field1", "b"))
|
||||
.must(matchQuery("field1", "c"))
|
||||
).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "4")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
|
||||
.execute().actionGet();
|
||||
client().admin().indices().prepareRefresh("test").execute().actionGet();
|
||||
|
||||
logger.info("--> Count percolate existing doc with id 1");
|
||||
PercolateResponse response = client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("1"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getCount(), equalTo(2l));
|
||||
assertThat(response.getMatches(), emptyArray());
|
||||
|
||||
logger.info("--> Count percolate existing doc with id 2");
|
||||
response = client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("2"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getCount(), equalTo(2l));
|
||||
assertThat(response.getMatches(), emptyArray());
|
||||
|
||||
logger.info("--> Count percolate existing doc with id 3");
|
||||
response = client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("3"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getCount(), equalTo(4l));
|
||||
assertThat(response.getMatches(), emptyArray());
|
||||
|
||||
logger.info("--> Count percolate existing doc with id 4");
|
||||
response = client().preparePercolate()
|
||||
.setIndices("test").setDocumentType("type").setOnlyCount(true)
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("4"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getCount(), equalTo(1l));
|
||||
assertThat(response.getMatches(), emptyArray());
|
||||
}
|
||||
|
||||
public static String[] convertFromTextArray(PercolateResponse.Match[] matches, String index) {
|
||||
if (matches.length == 0) {
|
||||
return Strings.EMPTY_ARRAY;
|
||||
|
|
Loading…
Reference in New Issue