Moved the reduce logic to the percolator type.

This commit is contained in:
Martijn van Groningen 2013-08-15 13:14:34 +02:00
parent 27b973830d
commit 174707061c
4 changed files with 212 additions and 137 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.rest.action.support.RestActions;
import java.io.IOException;
@ -47,15 +48,12 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
private Match[] matches;
private long count;
private boolean hasScores;
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures,
Match[] matches, long count, long tookInMillis, boolean hasScores) {
Match[] matches, long count, long tookInMillis) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.tookInMillis = tookInMillis;
this.matches = matches;
this.count = count;
this.hasScores = hasScores;
}
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, long count, long tookInMillis) {
@ -63,14 +61,12 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
this.tookInMillis = tookInMillis;
this.matches = EMPTY;
this.count = count;
this.hasScores = false;
}
public PercolateResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, long tookInMillis) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.tookInMillis = tookInMillis;
this.matches = EMPTY;
this.hasScores = false;
}
PercolateResponse() {
@ -127,7 +123,8 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
builder.startObject();
builder.field(Fields._INDEX, match.getIndex());
builder.field(Fields._ID, match.getId());
if (hasScores) {
float score = match.score();
if (score != PercolatorService.NO_SCORE) {
builder.field(Fields._SCORE, match.getScore());
}
builder.endObject();
@ -151,7 +148,6 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
matches[i] = new Match();
matches[i].readFrom(in);
}
hasScores = in.readBoolean();
}
@Override
@ -163,7 +159,6 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite
for (Match match : matches) {
match.writeTo(out);
}
out.writeBoolean(hasScores);
}
public static class Match implements Streamable {

View File

@ -36,12 +36,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
private long count;
private float[] scores;
private BytesRef[] matches;
// Request fields:
private boolean limit;
private byte percolatorTypeId;
private int requestedSize;
private boolean sort;
private boolean score;
public PercolateShardResponse() {
}
@ -51,10 +47,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
this.matches = matches;
this.count = count;
this.scores = scores;
this.limit = context.limit;
this.percolatorTypeId = context.percolatorTypeId;
this.requestedSize = context.size;
this.sort = context.sort;
this.score = context.score;
}
public PercolateShardResponse(BytesRef[] matches, long count, PercolatorService.PercolateContext context, String index, int shardId) {
@ -62,10 +56,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
this.matches = matches;
this.scores = new float[0];
this.count = count;
this.limit = context.limit;
this.percolatorTypeId = context.percolatorTypeId;
this.requestedSize = context.size;
this.sort = context.sort;
this.score = context.score;
}
public PercolateShardResponse(long count, PercolatorService.PercolateContext context, String index, int shardId) {
@ -73,20 +65,15 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
this.count = count;
this.matches = EMPTY;
this.scores = new float[0];
this.limit = context.limit;
this.percolatorTypeId = context.percolatorTypeId;
this.requestedSize = context.size;
this.sort = context.sort;
this.score = context.score;
}
public PercolateShardResponse(PercolatorService.PercolateContext context, String index, int shardId) {
super(index, shardId);
this.matches = EMPTY;
this.scores = new float[0];
this.limit = context.limit;
this.requestedSize = context.size;
this.sort = context.sort;
this.score = context.score;
}
public BytesRef[] matches() {
@ -101,20 +88,12 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
return count;
}
public boolean limit() {
return limit;
}
public int requestedSize() {
return requestedSize;
}
public boolean sort() {
return sort;
}
public boolean score() {
return score;
public byte percolatorTypeId() {
return percolatorTypeId;
}
@Override
@ -129,10 +108,8 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
for (int i = 0; i < scores.length; i++) {
scores[i] = in.readFloat();
}
limit = in.readBoolean();
percolatorTypeId = in.readByte();
requestedSize = in.readVInt();
sort = in.readBoolean();
}
@Override
@ -147,9 +124,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
for (float score : scores) {
out.writeFloat(score);
}
out.writeBoolean(limit);
out.writeByte(percolatorTypeId);
out.writeVLong(requestedSize);
out.writeBoolean(sort);
}
}

View File

@ -33,13 +33,9 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.BytesText;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.percolator.PercolateException;
@ -47,7 +43,6 @@ import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -131,6 +126,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
List<PercolateShardResponse> shardResults = null;
List<ShardOperationFailedException> shardFailures = null;
byte percolatorTypeId = 0x00;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
@ -146,105 +142,23 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
if (shardResults == null) {
shardResults = newArrayList();
}
if (percolateShardResponse.percolatorTypeId() != 0x00) {
percolatorTypeId = percolateShardResponse.percolatorTypeId();
}
shardResults.add(percolateShardResponse);
successfulShards++;
}
}
if (shardResults == null || percolatorTypeId == 0x00) {
long tookInMillis = System.currentTimeMillis() - request.startTime;
if (shardResults == null) {
return new PercolateResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, tookInMillis);
}
if (request.onlyCount()) {
long finalCount = 0;
for (PercolateShardResponse shardResponse : shardResults) {
finalCount += shardResponse.count();
}
return new PercolateResponse(
shardsResponses.length(), successfulShards, failedShards, shardFailures, finalCount, tookInMillis
);
} else {
long foundMatches = 0;
int numMatches = 0;
for (PercolateShardResponse response : shardResults) {
foundMatches += response.count();
numMatches += response.matches().length;
}
int requestedSize = shardResults.get(0).requestedSize();
boolean limit = shardResults.get(0).limit();
boolean sort = shardResults.get(0).sort();
boolean matchesScored = shardResults.get(0).score();
if (limit) {
requestedSize = Math.min(requestedSize, numMatches);
} else {
requestedSize = numMatches;
}
// Use a custom impl of AbstractBigArray for Object[]?
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(requestedSize);
if (sort) {
if (shardResults.size() == 1) {
PercolateShardResponse response = shardResults.get(0);
Text index = new StringText(response.getIndex());
for (int i = 0; i < response.matches().length; i++) {
float score = response.scores().length == 0 ? Float.NaN : response.scores()[i];
Text match = new BytesText(new BytesArray(response.matches()[i]));
finalMatches.add(new PercolateResponse.Match(index, match, score));
}
} else {
int[] slots = new int[shardResults.size()];
while (true) {
float lowestScore = Float.NEGATIVE_INFINITY;
int requestIndex = 0;
int itemIndex = 0;
for (int i = 0; i < shardResults.size(); i++) {
int scoreIndex = slots[i];
float[] scores = shardResults.get(i).scores();
if (scoreIndex >= scores.length) {
continue;
}
float score = scores[scoreIndex];
int cmp = Float.compare(lowestScore, score);
if (cmp < 0) {
requestIndex = i;
itemIndex = scoreIndex;
lowestScore = score;
}
}
slots[requestIndex]++;
PercolateShardResponse shardResponse = shardResults.get(requestIndex);
Text index = new StringText(shardResponse.getIndex());
Text match = new BytesText(new BytesArray(shardResponse.matches()[itemIndex]));
float score = shardResponse.scores()[itemIndex];
finalMatches.add(new PercolateResponse.Match(index, match, score));
if (finalMatches.size() == requestedSize) {
break;
}
}
}
} else {
outer: for (PercolateShardResponse response : shardResults) {
Text index = new StringText(response.getIndex());
for (int i = 0; i < response.matches().length; i++) {
float score = response.scores().length == 0 ? 0f : response.scores()[i];
Text match = new BytesText(new BytesArray(response.matches()[i]));
finalMatches.add(new PercolateResponse.Match(index, match, score));
if (requestedSize != 0 && finalMatches.size() == requestedSize) {
break outer;
}
}
}
}
PercolatorService.ReduceResult result = percolatorService.reduce(percolatorTypeId, shardResults);
long tookInMillis = System.currentTimeMillis() - request.startTime;
return new PercolateResponse(
shardsResponses.length(), successfulShards, failedShards, shardFailures,
finalMatches.toArray(new PercolateResponse.Match[requestedSize]), foundMatches, tookInMillis, sort || matchesScored
result.matches(), result.count(), tookInMillis
);
}
}

View File

@ -18,6 +18,7 @@
package org.elasticsearch.percolator;
import gnu.trove.map.hash.TByteObjectHashMap;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexableField;
@ -30,8 +31,10 @@ import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.percolate.PercolateShardResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -40,6 +43,9 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.BytesText;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -76,8 +82,11 @@ import static org.elasticsearch.percolator.QueryCollector.*;
*/
public class PercolatorService extends AbstractComponent {
public final static float NO_SCORE = Float.NEGATIVE_INFINITY;
private final CloseableThreadLocal<MemoryIndex> cache;
private final IndicesService indicesService;
private final TByteObjectHashMap<PercolatorType> percolatorTypes;
@Inject
public PercolatorService(Settings settings, IndicesService indicesService) {
@ -90,6 +99,20 @@ public class PercolatorService extends AbstractComponent {
return new ExtendedMemoryIndex(false, maxReuseBytes);
}
};
percolatorTypes = new TByteObjectHashMap<PercolatorType>(6);
percolatorTypes.put(countPercolator.id(), countPercolator);
percolatorTypes.put(queryCountPercolator.id(), queryCountPercolator);
percolatorTypes.put(matchPercolator.id(), matchPercolator);
percolatorTypes.put(queryPercolator.id(), queryPercolator);
percolatorTypes.put(scoringPercolator.id(), scoringPercolator);
percolatorTypes.put(topMatchingPercolator.id(), topMatchingPercolator);
}
public ReduceResult reduce(byte percolatorTypeId, List<PercolateShardResponse> shardResults) {
PercolatorType percolatorType = percolatorTypes.get(percolatorTypeId);
return percolatorType.reduce(shardResults);
}
public PercolateShardResponse percolate(PercolateShardRequest request) {
@ -160,6 +183,7 @@ public class PercolatorService extends AbstractComponent {
action = matchPercolator;
}
}
context.percolatorTypeId = action.id();
context.docSearcher = memoryIndex.createSearcher();
context.fieldData = percolateIndexService.fieldData();
@ -282,12 +306,31 @@ public class PercolatorService extends AbstractComponent {
interface PercolatorType {
// 0x00 is reserved for empty type.
byte id();
ReduceResult reduce(List<PercolateShardResponse> shardResults);
PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context);
}
private final PercolatorType countPercolator = new PercolatorType() {
@Override
public byte id() {
return 0x01;
}
@Override
public ReduceResult reduce(List<PercolateShardResponse> shardResults) {
long finalCount = 0;
for (PercolateShardResponse shardResponse : shardResults) {
finalCount += shardResponse.count();
}
return new ReduceResult(finalCount);
}
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
long count = 0;
@ -311,6 +354,16 @@ public class PercolatorService extends AbstractComponent {
private final PercolatorType queryCountPercolator = new PercolatorType() {
@Override
public byte id() {
return 0x02;
}
@Override
public ReduceResult reduce(List<PercolateShardResponse> shardResults) {
return countPercolator.reduce(shardResults);
}
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
long count = 0;
@ -330,6 +383,38 @@ public class PercolatorService extends AbstractComponent {
};
private final PercolatorType matchPercolator = new PercolatorType() {
@Override
public byte id() {
return 0x03;
}
@Override
public ReduceResult reduce(List<PercolateShardResponse> shardResults) {
long foundMatches = 0;
int numMatches = 0;
for (PercolateShardResponse response : shardResults) {
foundMatches += response.count();
numMatches += response.matches().length;
}
int requestedSize = shardResults.get(0).requestedSize();
// Use a custom impl of AbstractBigArray for Object[]?
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(requestedSize == 0 ? numMatches : requestedSize);
outer: for (PercolateShardResponse response : shardResults) {
Text index = new StringText(response.getIndex());
for (int i = 0; i < response.matches().length; i++) {
float score = response.scores().length == 0 ? NO_SCORE : response.scores()[i];
Text match = new BytesText(new BytesArray(response.matches()[i]));
finalMatches.add(new PercolateResponse.Match(index, match, score));
if (requestedSize != 0 && finalMatches.size() == requestedSize) {
break outer;
}
}
}
return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]));
}
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
long count = 0;
@ -356,6 +441,17 @@ public class PercolatorService extends AbstractComponent {
};
private final PercolatorType queryPercolator = new PercolatorType() {
@Override
public byte id() {
return 0x04;
}
@Override
public ReduceResult reduce(List<PercolateShardResponse> shardResults) {
return matchPercolator.reduce(shardResults);
}
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
@ -375,6 +471,17 @@ public class PercolatorService extends AbstractComponent {
};
private final PercolatorType scoringPercolator = new PercolatorType() {
@Override
public byte id() {
return 0x05;
}
@Override
public ReduceResult reduce(List<PercolateShardResponse> shardResults) {
return matchPercolator.reduce(shardResults);
}
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
@ -396,6 +503,65 @@ public class PercolatorService extends AbstractComponent {
private final PercolatorType topMatchingPercolator = new PercolatorType() {
@Override
public byte id() {
return 0x06;
}
@Override
public ReduceResult reduce(List<PercolateShardResponse> shardResults) {
long foundMatches = 0;
for (PercolateShardResponse response : shardResults) {
foundMatches += response.count();
}
int requestedSize = shardResults.get(0).requestedSize();
// Use a custom impl of AbstractBigArray for Object[]?
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(requestedSize);
if (shardResults.size() == 1) {
PercolateShardResponse response = shardResults.get(0);
Text index = new StringText(response.getIndex());
for (int i = 0; i < response.matches().length; i++) {
float score = response.scores().length == 0 ? Float.NaN : response.scores()[i];
Text match = new BytesText(new BytesArray(response.matches()[i]));
finalMatches.add(new PercolateResponse.Match(index, match, score));
}
} else {
int[] slots = new int[shardResults.size()];
while (true) {
float lowestScore = Float.NEGATIVE_INFINITY;
int requestIndex = 0;
int itemIndex = 0;
for (int i = 0; i < shardResults.size(); i++) {
int scoreIndex = slots[i];
float[] scores = shardResults.get(i).scores();
if (scoreIndex >= scores.length) {
continue;
}
float score = scores[scoreIndex];
int cmp = Float.compare(lowestScore, score);
if (cmp < 0) {
requestIndex = i;
itemIndex = scoreIndex;
lowestScore = score;
}
}
slots[requestIndex]++;
PercolateShardResponse shardResponse = shardResults.get(requestIndex);
Text index = new StringText(shardResponse.getIndex());
Text match = new BytesText(new BytesArray(shardResponse.matches()[itemIndex]));
float score = shardResponse.scores()[itemIndex];
finalMatches.add(new PercolateResponse.Match(index, match, score));
if (finalMatches.size() == requestedSize) {
break;
}
}
}
return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]));
}
@Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
Engine.Searcher percolatorSearcher = context.indexShard.searcher();
@ -444,6 +610,7 @@ public class PercolatorService extends AbstractComponent {
public int size;
public boolean score;
public boolean sort;
public byte percolatorTypeId;
Query query;
ConcurrentMap<HashedBytesRef, Query> percolateQueries;
@ -454,6 +621,30 @@ public class PercolatorService extends AbstractComponent {
}
public final static class ReduceResult {
private final long count;
private final PercolateResponse.Match[] matches;
ReduceResult(long count, PercolateResponse.Match[] matches) {
this.count = count;
this.matches = matches;
}
public ReduceResult(long count) {
this.count = count;
this.matches = new PercolateResponse.Match[0];
}
public long count() {
return count;
}
public PercolateResponse.Match[] matches() {
return matches;
}
}
public static final class Constants {
public static final String TYPE_NAME = "_percolator";