diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateResponse.java b/src/main/java/org/elasticsearch/action/percolate/PercolateResponse.java index 1837bc9e3d2..f85ac9ba269 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateResponse.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateResponse.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.percolator.PercolatorService; import org.elasticsearch.rest.action.support.RestActions; import java.io.IOException; @@ -47,15 +48,12 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite private Match[] matches; private long count; - private boolean hasScores; - public PercolateResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, - Match[] matches, long count, long tookInMillis, boolean hasScores) { + Match[] matches, long count, long tookInMillis) { super(totalShards, successfulShards, failedShards, shardFailures); this.tookInMillis = tookInMillis; this.matches = matches; this.count = count; - this.hasScores = hasScores; } public PercolateResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, long count, long tookInMillis) { @@ -63,14 +61,12 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite this.tookInMillis = tookInMillis; this.matches = EMPTY; this.count = count; - this.hasScores = false; } public PercolateResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, long tookInMillis) { super(totalShards, successfulShards, failedShards, shardFailures); this.tookInMillis = tookInMillis; this.matches = EMPTY; - this.hasScores = false; } PercolateResponse() { @@ -127,7 +123,8 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite builder.startObject(); builder.field(Fields._INDEX, match.getIndex()); builder.field(Fields._ID, match.getId()); - if (hasScores) { + float score = match.score(); + if (score != PercolatorService.NO_SCORE) { builder.field(Fields._SCORE, match.getScore()); } builder.endObject(); @@ -151,7 +148,6 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite matches[i] = new Match(); matches[i].readFrom(in); } - hasScores = in.readBoolean(); } @Override @@ -163,7 +159,6 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite for (Match match : matches) { match.writeTo(out); } - out.writeBoolean(hasScores); } public static class Match implements Streamable { diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java b/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java index d51e6cf1df4..cabb36b2086 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java @@ -36,12 +36,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { private long count; private float[] scores; private BytesRef[] matches; - - // Request fields: - private boolean limit; + private byte percolatorTypeId; private int requestedSize; - private boolean sort; - private boolean score; public PercolateShardResponse() { } @@ -51,10 +47,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { this.matches = matches; this.count = count; this.scores = scores; - this.limit = context.limit; + this.percolatorTypeId = context.percolatorTypeId; this.requestedSize = context.size; - this.sort = context.sort; - this.score = context.score; } public PercolateShardResponse(BytesRef[] matches, long count, PercolatorService.PercolateContext context, String index, int shardId) { @@ -62,10 +56,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { this.matches = matches; this.scores = new float[0]; this.count = count; - this.limit = context.limit; + this.percolatorTypeId = context.percolatorTypeId; this.requestedSize = context.size; - this.sort = context.sort; - this.score = context.score; } public PercolateShardResponse(long count, PercolatorService.PercolateContext context, String index, int shardId) { @@ -73,20 +65,15 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { this.count = count; this.matches = EMPTY; this.scores = new float[0]; - this.limit = context.limit; + this.percolatorTypeId = context.percolatorTypeId; this.requestedSize = context.size; - this.sort = context.sort; - this.score = context.score; } public PercolateShardResponse(PercolatorService.PercolateContext context, String index, int shardId) { super(index, shardId); this.matches = EMPTY; this.scores = new float[0]; - this.limit = context.limit; this.requestedSize = context.size; - this.sort = context.sort; - this.score = context.score; } public BytesRef[] matches() { @@ -101,20 +88,12 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { return count; } - public boolean limit() { - return limit; - } - public int requestedSize() { return requestedSize; } - public boolean sort() { - return sort; - } - - public boolean score() { - return score; + public byte percolatorTypeId() { + return percolatorTypeId; } @Override @@ -129,10 +108,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { for (int i = 0; i < scores.length; i++) { scores[i] = in.readFloat(); } - - limit = in.readBoolean(); + percolatorTypeId = in.readByte(); requestedSize = in.readVInt(); - sort = in.readBoolean(); } @Override @@ -147,9 +124,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { for (float score : scores) { out.writeFloat(score); } - - out.writeBoolean(limit); + out.writeByte(percolatorTypeId); out.writeVLong(requestedSize); - out.writeBoolean(sort); } } diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java index 1781d95a838..45397941057 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java @@ -33,13 +33,9 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.text.BytesText; -import org.elasticsearch.common.text.StringText; -import org.elasticsearch.common.text.Text; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.percolator.PercolateException; @@ -47,7 +43,6 @@ import org.elasticsearch.percolator.PercolatorService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -131,6 +126,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction< List shardResults = null; List shardFailures = null; + byte percolatorTypeId = 0x00; for (int i = 0; i < shardsResponses.length(); i++) { Object shardResponse = shardsResponses.get(i); if (shardResponse == null) { @@ -146,105 +142,23 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction< if (shardResults == null) { shardResults = newArrayList(); } + if (percolateShardResponse.percolatorTypeId() != 0x00) { + percolatorTypeId = percolateShardResponse.percolatorTypeId(); + } shardResults.add(percolateShardResponse); successfulShards++; } } - long tookInMillis = System.currentTimeMillis() - request.startTime; - if (shardResults == null) { + if (shardResults == null || percolatorTypeId == 0x00) { + long tookInMillis = System.currentTimeMillis() - request.startTime; return new PercolateResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, tookInMillis); - } - - if (request.onlyCount()) { - long finalCount = 0; - for (PercolateShardResponse shardResponse : shardResults) { - finalCount += shardResponse.count(); - } - - return new PercolateResponse( - shardsResponses.length(), successfulShards, failedShards, shardFailures, finalCount, tookInMillis - ); } else { - long foundMatches = 0; - int numMatches = 0; - for (PercolateShardResponse response : shardResults) { - foundMatches += response.count(); - numMatches += response.matches().length; - } - - int requestedSize = shardResults.get(0).requestedSize(); - boolean limit = shardResults.get(0).limit(); - boolean sort = shardResults.get(0).sort(); - boolean matchesScored = shardResults.get(0).score(); - if (limit) { - requestedSize = Math.min(requestedSize, numMatches); - } else { - requestedSize = numMatches; - } - - // Use a custom impl of AbstractBigArray for Object[]? - List finalMatches = new ArrayList(requestedSize); - if (sort) { - if (shardResults.size() == 1) { - PercolateShardResponse response = shardResults.get(0); - Text index = new StringText(response.getIndex()); - for (int i = 0; i < response.matches().length; i++) { - float score = response.scores().length == 0 ? Float.NaN : response.scores()[i]; - Text match = new BytesText(new BytesArray(response.matches()[i])); - finalMatches.add(new PercolateResponse.Match(index, match, score)); - } - } else { - int[] slots = new int[shardResults.size()]; - while (true) { - float lowestScore = Float.NEGATIVE_INFINITY; - int requestIndex = 0; - int itemIndex = 0; - for (int i = 0; i < shardResults.size(); i++) { - int scoreIndex = slots[i]; - float[] scores = shardResults.get(i).scores(); - if (scoreIndex >= scores.length) { - continue; - } - - float score = scores[scoreIndex]; - int cmp = Float.compare(lowestScore, score); - if (cmp < 0) { - requestIndex = i; - itemIndex = scoreIndex; - lowestScore = score; - } - } - slots[requestIndex]++; - - PercolateShardResponse shardResponse = shardResults.get(requestIndex); - Text index = new StringText(shardResponse.getIndex()); - Text match = new BytesText(new BytesArray(shardResponse.matches()[itemIndex])); - float score = shardResponse.scores()[itemIndex]; - finalMatches.add(new PercolateResponse.Match(index, match, score)); - if (finalMatches.size() == requestedSize) { - break; - } - } - } - - } else { - outer: for (PercolateShardResponse response : shardResults) { - Text index = new StringText(response.getIndex()); - for (int i = 0; i < response.matches().length; i++) { - float score = response.scores().length == 0 ? 0f : response.scores()[i]; - Text match = new BytesText(new BytesArray(response.matches()[i])); - finalMatches.add(new PercolateResponse.Match(index, match, score)); - if (requestedSize != 0 && finalMatches.size() == requestedSize) { - break outer; - } - } - } - } - + PercolatorService.ReduceResult result = percolatorService.reduce(percolatorTypeId, shardResults); + long tookInMillis = System.currentTimeMillis() - request.startTime; return new PercolateResponse( shardsResponses.length(), successfulShards, failedShards, shardFailures, - finalMatches.toArray(new PercolateResponse.Match[requestedSize]), foundMatches, tookInMillis, sort || matchesScored + result.matches(), result.count(), tookInMillis ); } } diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index e023794a404..0bc8ba092c0 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -18,6 +18,7 @@ package org.elasticsearch.percolator; +import gnu.trove.map.hash.TByteObjectHashMap; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.IndexableField; @@ -30,8 +31,10 @@ import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchParseException; +import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.percolate.PercolateShardRequest; import org.elasticsearch.action.percolate.PercolateShardResponse; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -40,6 +43,9 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.XConstantScoreQuery; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.text.BytesText; +import org.elasticsearch.common.text.StringText; +import org.elasticsearch.common.text.Text; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentFactory; @@ -76,8 +82,11 @@ import static org.elasticsearch.percolator.QueryCollector.*; */ public class PercolatorService extends AbstractComponent { + public final static float NO_SCORE = Float.NEGATIVE_INFINITY; + private final CloseableThreadLocal cache; private final IndicesService indicesService; + private final TByteObjectHashMap percolatorTypes; @Inject public PercolatorService(Settings settings, IndicesService indicesService) { @@ -90,6 +99,20 @@ public class PercolatorService extends AbstractComponent { return new ExtendedMemoryIndex(false, maxReuseBytes); } }; + + percolatorTypes = new TByteObjectHashMap(6); + percolatorTypes.put(countPercolator.id(), countPercolator); + percolatorTypes.put(queryCountPercolator.id(), queryCountPercolator); + percolatorTypes.put(matchPercolator.id(), matchPercolator); + percolatorTypes.put(queryPercolator.id(), queryPercolator); + percolatorTypes.put(scoringPercolator.id(), scoringPercolator); + percolatorTypes.put(topMatchingPercolator.id(), topMatchingPercolator); + } + + + public ReduceResult reduce(byte percolatorTypeId, List shardResults) { + PercolatorType percolatorType = percolatorTypes.get(percolatorTypeId); + return percolatorType.reduce(shardResults); } public PercolateShardResponse percolate(PercolateShardRequest request) { @@ -160,6 +183,7 @@ public class PercolatorService extends AbstractComponent { action = matchPercolator; } } + context.percolatorTypeId = action.id(); context.docSearcher = memoryIndex.createSearcher(); context.fieldData = percolateIndexService.fieldData(); @@ -282,12 +306,31 @@ public class PercolatorService extends AbstractComponent { interface PercolatorType { + // 0x00 is reserved for empty type. + byte id(); + + ReduceResult reduce(List shardResults); + PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context); } private final PercolatorType countPercolator = new PercolatorType() { + @Override + public byte id() { + return 0x01; + } + + @Override + public ReduceResult reduce(List shardResults) { + long finalCount = 0; + for (PercolateShardResponse shardResponse : shardResults) { + finalCount += shardResponse.count(); + } + return new ReduceResult(finalCount); + } + @Override public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { long count = 0; @@ -311,6 +354,16 @@ public class PercolatorService extends AbstractComponent { private final PercolatorType queryCountPercolator = new PercolatorType() { + @Override + public byte id() { + return 0x02; + } + + @Override + public ReduceResult reduce(List shardResults) { + return countPercolator.reduce(shardResults); + } + @Override public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { long count = 0; @@ -330,6 +383,38 @@ public class PercolatorService extends AbstractComponent { }; private final PercolatorType matchPercolator = new PercolatorType() { + + @Override + public byte id() { + return 0x03; + } + + @Override + public ReduceResult reduce(List shardResults) { + long foundMatches = 0; + int numMatches = 0; + for (PercolateShardResponse response : shardResults) { + foundMatches += response.count(); + numMatches += response.matches().length; + } + int requestedSize = shardResults.get(0).requestedSize(); + + // Use a custom impl of AbstractBigArray for Object[]? + List finalMatches = new ArrayList(requestedSize == 0 ? numMatches : requestedSize); + outer: for (PercolateShardResponse response : shardResults) { + Text index = new StringText(response.getIndex()); + for (int i = 0; i < response.matches().length; i++) { + float score = response.scores().length == 0 ? NO_SCORE : response.scores()[i]; + Text match = new BytesText(new BytesArray(response.matches()[i])); + finalMatches.add(new PercolateResponse.Match(index, match, score)); + if (requestedSize != 0 && finalMatches.size() == requestedSize) { + break outer; + } + } + } + return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()])); + } + @Override public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { long count = 0; @@ -356,6 +441,17 @@ public class PercolatorService extends AbstractComponent { }; private final PercolatorType queryPercolator = new PercolatorType() { + + @Override + public byte id() { + return 0x04; + } + + @Override + public ReduceResult reduce(List shardResults) { + return matchPercolator.reduce(shardResults); + } + @Override public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { Engine.Searcher percolatorSearcher = context.indexShard.searcher(); @@ -375,6 +471,17 @@ public class PercolatorService extends AbstractComponent { }; private final PercolatorType scoringPercolator = new PercolatorType() { + + @Override + public byte id() { + return 0x05; + } + + @Override + public ReduceResult reduce(List shardResults) { + return matchPercolator.reduce(shardResults); + } + @Override public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { Engine.Searcher percolatorSearcher = context.indexShard.searcher(); @@ -396,6 +503,65 @@ public class PercolatorService extends AbstractComponent { private final PercolatorType topMatchingPercolator = new PercolatorType() { + @Override + public byte id() { + return 0x06; + } + + @Override + public ReduceResult reduce(List shardResults) { + long foundMatches = 0; + for (PercolateShardResponse response : shardResults) { + foundMatches += response.count(); + } + int requestedSize = shardResults.get(0).requestedSize(); + + // Use a custom impl of AbstractBigArray for Object[]? + List finalMatches = new ArrayList(requestedSize); + if (shardResults.size() == 1) { + PercolateShardResponse response = shardResults.get(0); + Text index = new StringText(response.getIndex()); + for (int i = 0; i < response.matches().length; i++) { + float score = response.scores().length == 0 ? Float.NaN : response.scores()[i]; + Text match = new BytesText(new BytesArray(response.matches()[i])); + finalMatches.add(new PercolateResponse.Match(index, match, score)); + } + } else { + int[] slots = new int[shardResults.size()]; + while (true) { + float lowestScore = Float.NEGATIVE_INFINITY; + int requestIndex = 0; + int itemIndex = 0; + for (int i = 0; i < shardResults.size(); i++) { + int scoreIndex = slots[i]; + float[] scores = shardResults.get(i).scores(); + if (scoreIndex >= scores.length) { + continue; + } + + float score = scores[scoreIndex]; + int cmp = Float.compare(lowestScore, score); + if (cmp < 0) { + requestIndex = i; + itemIndex = scoreIndex; + lowestScore = score; + } + } + slots[requestIndex]++; + + PercolateShardResponse shardResponse = shardResults.get(requestIndex); + Text index = new StringText(shardResponse.getIndex()); + Text match = new BytesText(new BytesArray(shardResponse.matches()[itemIndex])); + float score = shardResponse.scores()[itemIndex]; + finalMatches.add(new PercolateResponse.Match(index, match, score)); + if (finalMatches.size() == requestedSize) { + break; + } + } + } + return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()])); + } + @Override public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) { Engine.Searcher percolatorSearcher = context.indexShard.searcher(); @@ -444,6 +610,7 @@ public class PercolatorService extends AbstractComponent { public int size; public boolean score; public boolean sort; + public byte percolatorTypeId; Query query; ConcurrentMap percolateQueries; @@ -454,6 +621,30 @@ public class PercolatorService extends AbstractComponent { } + public final static class ReduceResult { + + private final long count; + private final PercolateResponse.Match[] matches; + + ReduceResult(long count, PercolateResponse.Match[] matches) { + this.count = count; + this.matches = matches; + } + + public ReduceResult(long count) { + this.count = count; + this.matches = new PercolateResponse.Match[0]; + } + + public long count() { + return count; + } + + public PercolateResponse.Match[] matches() { + return matches; + } + } + public static final class Constants { public static final String TYPE_NAME = "_percolator";