* Check if size is set if sort is used, if not throw exception.
* Fix reduce error when reducing shard responses. * Short circuit reduce phase when possible (single matches & no matches)
This commit is contained in:
parent
cca84431f5
commit
5d91bb04b6
|
@ -96,6 +96,10 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
return percolatorTypeId;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return matches.length == 0 && count == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
|
|
@ -127,6 +127,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
List<ShardOperationFailedException> shardFailures = null;
|
||||
|
||||
byte percolatorTypeId = 0x00;
|
||||
int nonEmptyResponses = 0;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
|
@ -145,12 +146,15 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
if (percolateShardResponse.percolatorTypeId() != 0x00) {
|
||||
percolatorTypeId = percolateShardResponse.percolatorTypeId();
|
||||
}
|
||||
if (!percolateShardResponse.isEmpty()) {
|
||||
nonEmptyResponses++;
|
||||
}
|
||||
shardResults.add(percolateShardResponse);
|
||||
successfulShards++;
|
||||
}
|
||||
}
|
||||
|
||||
if (shardResults == null || percolatorTypeId == 0x00) {
|
||||
if (shardResults == null || percolatorTypeId == 0x00 || nonEmptyResponses == 0) {
|
||||
long tookInMillis = System.currentTimeMillis() - request.startTime;
|
||||
return new PercolateResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, tookInMillis);
|
||||
} else {
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
package org.elasticsearch.percolator;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchWrapperException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
/**
|
||||
* Exception during percolating document(s) at runtime.
|
||||
*/
|
||||
public class PercolateException extends ElasticSearchException {
|
||||
public class PercolateException extends ElasticSearchException implements ElasticSearchWrapperException {
|
||||
|
||||
private final ShardId shardId;
|
||||
|
||||
|
|
|
@ -140,7 +140,11 @@ public class PercolatorService extends AbstractComponent {
|
|||
}
|
||||
|
||||
if (context.query == null && (context.score || context.sort)) {
|
||||
throw new ElasticSearchIllegalArgumentException("Can't sort or score if no query is specified");
|
||||
throw new ElasticSearchIllegalArgumentException("Can't sort or score if query isn't specified");
|
||||
}
|
||||
|
||||
if (context.sort && !context.limit) {
|
||||
throw new ElasticSearchIllegalArgumentException("Can't sort if size isn't specified");
|
||||
}
|
||||
|
||||
if (context.size < 0) {
|
||||
|
@ -511,15 +515,25 @@ public class PercolatorService extends AbstractComponent {
|
|||
@Override
|
||||
public ReduceResult reduce(List<PercolateShardResponse> shardResults) {
|
||||
long foundMatches = 0;
|
||||
for (PercolateShardResponse response : shardResults) {
|
||||
int nonEmptyResponses = 0;
|
||||
int firstNonEmptyIndex = 0;
|
||||
for (int i = 0; i < shardResults.size(); i++) {
|
||||
PercolateShardResponse response = shardResults.get(i);
|
||||
foundMatches += response.count();
|
||||
if (response.matches().length != 0) {
|
||||
if (firstNonEmptyIndex == 0) {
|
||||
firstNonEmptyIndex = i;
|
||||
}
|
||||
nonEmptyResponses++;
|
||||
}
|
||||
}
|
||||
|
||||
int requestedSize = shardResults.get(0).requestedSize();
|
||||
|
||||
// Use a custom impl of AbstractBigArray for Object[]?
|
||||
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(requestedSize);
|
||||
if (shardResults.size() == 1) {
|
||||
PercolateShardResponse response = shardResults.get(0);
|
||||
if (nonEmptyResponses == 1) {
|
||||
PercolateShardResponse response = shardResults.get(firstNonEmptyIndex);
|
||||
Text index = new StringText(response.getIndex());
|
||||
for (int i = 0; i < response.matches().length; i++) {
|
||||
float score = response.scores().length == 0 ? Float.NaN : response.scores()[i];
|
||||
|
@ -530,8 +544,8 @@ public class PercolatorService extends AbstractComponent {
|
|||
int[] slots = new int[shardResults.size()];
|
||||
while (true) {
|
||||
float lowestScore = Float.NEGATIVE_INFINITY;
|
||||
int requestIndex = 0;
|
||||
int itemIndex = 0;
|
||||
int requestIndex = -1;
|
||||
int itemIndex = -1;
|
||||
for (int i = 0; i < shardResults.size(); i++) {
|
||||
int scoreIndex = slots[i];
|
||||
float[] scores = shardResults.get(i).scores();
|
||||
|
@ -541,12 +555,19 @@ public class PercolatorService extends AbstractComponent {
|
|||
|
||||
float score = scores[scoreIndex];
|
||||
int cmp = Float.compare(lowestScore, score);
|
||||
// TODO: Maybe add a tie?
|
||||
if (cmp < 0) {
|
||||
requestIndex = i;
|
||||
itemIndex = scoreIndex;
|
||||
lowestScore = score;
|
||||
}
|
||||
}
|
||||
|
||||
// This means the shard matches have been exhausted and we should bail
|
||||
if (requestIndex == -1) {
|
||||
break;
|
||||
}
|
||||
|
||||
slots[requestIndex]++;
|
||||
|
||||
PercolateShardResponse shardResponse = shardResults.get(requestIndex);
|
||||
|
|
|
@ -1220,6 +1220,82 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPercolateSortingWithNoSize() throws Exception {
|
||||
client().admin().indices().prepareCreate("my-index").execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
client().prepareIndex("my-index", "_percolator", "1")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).field("level", 1).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("my-index", "_percolator", "2")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).field("level", 2).endObject())
|
||||
.execute().actionGet();
|
||||
refresh();
|
||||
|
||||
PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type")
|
||||
.setSort(true)
|
||||
.setSize(2)
|
||||
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||
.setPercolateQuery(QueryBuilders.functionScoreQuery(matchAllQuery()).add(new ScriptScoreFunctionBuilder().script("doc['level'].value")))
|
||||
.execute().actionGet();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getCount(), equalTo(2l));
|
||||
assertThat(response.getMatches()[0].id().string(), equalTo("2"));
|
||||
assertThat(response.getMatches()[0].score(), equalTo(2f));
|
||||
assertThat(response.getMatches()[1].id().string(), equalTo("1"));
|
||||
assertThat(response.getMatches()[1].score(), equalTo(1f));
|
||||
|
||||
response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type")
|
||||
.setSort(true)
|
||||
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||
.setPercolateQuery(QueryBuilders.functionScoreQuery(matchAllQuery()).add(new ScriptScoreFunctionBuilder().script("doc['level'].value")))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getCount(), equalTo(0l));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(3));
|
||||
assertThat(response.getShardFailures().length, equalTo(2));
|
||||
assertThat(response.getShardFailures()[0].status().getStatus(), equalTo(400));
|
||||
assertThat(response.getShardFailures()[0].reason(), containsString("Can't sort if size isn't specified"));
|
||||
assertThat(response.getShardFailures()[1].status().getStatus(), equalTo(400));
|
||||
assertThat(response.getShardFailures()[1].reason(), containsString("Can't sort if size isn't specified"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPercolateOnEmptyIndex() throws Exception {
|
||||
client().admin().indices().prepareCreate("my-index").execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type")
|
||||
.setSort(true)
|
||||
.setSize(2)
|
||||
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||
.setPercolateQuery(QueryBuilders.functionScoreQuery(matchAllQuery()).add(new ScriptScoreFunctionBuilder().script("doc['level'].value")))
|
||||
.execute().actionGet();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getCount(), equalTo(0l));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPercolateNotEmptyIndexButNoRefresh() throws Exception {
|
||||
client().admin().indices().prepareCreate("my-index")
|
||||
.setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1))
|
||||
.execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
client().prepareIndex("my-index", "_percolator", "1")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).field("level", 1).endObject())
|
||||
.execute().actionGet();
|
||||
|
||||
PercolateResponse response = client().preparePercolate().setIndices("my-index").setDocumentType("my-type")
|
||||
.setSort(true)
|
||||
.setSize(2)
|
||||
.setPercolateDoc(docBuilder().setDoc("field", "value"))
|
||||
.setPercolateQuery(QueryBuilders.functionScoreQuery(matchAllQuery()).add(new ScriptScoreFunctionBuilder().script("doc['level'].value")))
|
||||
.execute().actionGet();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getCount(), equalTo(0l));
|
||||
}
|
||||
|
||||
public static String[] convertFromTextArray(PercolateResponse.Match[] matches, String index) {
|
||||
if (matches.length == 0) {
|
||||
return Strings.EMPTY_ARRAY;
|
||||
|
|
Loading…
Reference in New Issue