Add number of shards statistic to PercolateContext instead of throwing exception.

Certain features like significant_terms aggregation rely on this statistic for sizing heuristics.

Closes #6037
Closes #6123
This commit is contained in:
Martijn van Groningen 2014-05-12 12:38:45 +02:00
parent 16e5cdf8d0
commit e8e684c6c4
15 changed files with 44 additions and 16 deletions

View File

@ -107,7 +107,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
}
@Override
protected ShardClearIndicesCacheRequest newShardRequest(ShardRouting shard, ClearIndicesCacheRequest request) {
protected ShardClearIndicesCacheRequest newShardRequest(int numShards, ShardRouting shard, ClearIndicesCacheRequest request) {
return new ShardClearIndicesCacheRequest(shard.index(), shard.id(), request);
}

View File

@ -99,7 +99,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
}
@Override
protected ShardFlushRequest newShardRequest(ShardRouting shard, FlushRequest request) {
protected ShardFlushRequest newShardRequest(int numShards, ShardRouting shard, FlushRequest request) {
return new ShardFlushRequest(shard.index(), shard.id(), request);
}

View File

@ -100,7 +100,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
}
@Override
protected ShardOptimizeRequest newShardRequest(ShardRouting shard, OptimizeRequest request) {
protected ShardOptimizeRequest newShardRequest(int numShards, ShardRouting shard, OptimizeRequest request) {
return new ShardOptimizeRequest(shard.index(), shard.id(), request);
}

View File

@ -136,7 +136,7 @@ public class TransportRecoveryAction extends
}
@Override
protected ShardRecoveryRequest newShardRequest(ShardRouting shard, RecoveryRequest request) {
protected ShardRecoveryRequest newShardRequest(int numShards, ShardRouting shard, RecoveryRequest request) {
return new ShardRecoveryRequest(shard.index(), shard.id(), request);
}

View File

@ -100,7 +100,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
}
@Override
protected ShardRefreshRequest newShardRequest(ShardRouting shard, RefreshRequest request) {
protected ShardRefreshRequest newShardRequest(int numShards, ShardRouting shard, RefreshRequest request) {
return new ShardRefreshRequest(shard.index(), shard.id(), request);
}

View File

@ -124,7 +124,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
}
@Override
protected IndexShardSegmentRequest newShardRequest(ShardRouting shard, IndicesSegmentsRequest request) {
protected IndexShardSegmentRequest newShardRequest(int numShards, ShardRouting shard, IndicesSegmentsRequest request) {
return new IndexShardSegmentRequest(shard.index(), shard.id(), request);
}

View File

@ -125,7 +125,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
}
@Override
protected IndexShardStatsRequest newShardRequest(ShardRouting shard, IndicesStatsRequest request) {
protected IndexShardStatsRequest newShardRequest(int numShards, ShardRouting shard, IndicesStatsRequest request) {
return new IndexShardStatsRequest(shard.index(), shard.id(), request);
}

View File

@ -108,7 +108,7 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
}
@Override
protected ShardValidateQueryRequest newShardRequest(ShardRouting shard, ValidateQueryRequest request) {
protected ShardValidateQueryRequest newShardRequest(int numShards, ShardRouting shard, ValidateQueryRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());
return new ShardValidateQueryRequest(shard.index(), shard.id(), filteringAliases, request);
}

View File

@ -112,7 +112,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}
@Override
protected ShardCountRequest newShardRequest(ShardRouting shard, CountRequest request) {
protected ShardCountRequest newShardRequest(int numShards, ShardRouting shard, CountRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());
return new ShardCountRequest(shard.index(), shard.id(), filteringAliases, request);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.percolate;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -35,6 +36,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
private BytesReference source;
private BytesReference docSource;
private boolean onlyCount;
private int numberOfShards;
public PercolateShardRequest() {
}
@ -43,12 +45,13 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
super(index, shardId);
}
public PercolateShardRequest(String index, int shardId, PercolateRequest request) {
public PercolateShardRequest(String index, int shardId, int numberOfShards, PercolateRequest request) {
super(index, shardId, request);
this.documentType = request.documentType();
this.source = request.source();
this.docSource = request.docSource();
this.onlyCount = request.onlyCount();
this.numberOfShards = numberOfShards;
}
public PercolateShardRequest(ShardId shardId, PercolateRequest request) {
@ -91,6 +94,10 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
this.onlyCount = onlyCount;
}
public int getNumberOfShards() {
return numberOfShards;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -98,6 +105,9 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
source = in.readBytesReference();
docSource = in.readBytesReference();
onlyCount = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
numberOfShards = in.readVInt();
}
}
@Override
@ -107,6 +117,9 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
out.writeBytesReference(source);
out.writeBytesReference(docSource);
out.writeBoolean(onlyCount);
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVInt(numberOfShards);
}
}
}

View File

@ -173,8 +173,8 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
}
@Override
protected PercolateShardRequest newShardRequest(ShardRouting shard, PercolateRequest request) {
return new PercolateShardRequest(shard.index(), shard.id(), request);
protected PercolateShardRequest newShardRequest(int numShards, ShardRouting shard, PercolateRequest request) {
return new PercolateShardRequest(shard.index(), shard.id(), numShards, request);
}
@Override

View File

@ -93,7 +93,7 @@ public class TransportSuggestAction extends TransportBroadcastOperationAction<Su
}
@Override
protected ShardSuggestRequest newShardRequest(ShardRouting shard, SuggestRequest request) {
protected ShardSuggestRequest newShardRequest(int numShards, ShardRouting shard, SuggestRequest request) {
return new ShardSuggestRequest(shard.index(), shard.id(), request);
}

View File

@ -83,7 +83,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract ShardRequest newShardRequest();
protected abstract ShardRequest newShardRequest(ShardRouting shard, Request request);
protected abstract ShardRequest newShardRequest(int numShards, ShardRouting shard, Request request);
protected abstract ShardResponse newShardResponse();
@ -161,7 +161,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
final ShardRequest shardRequest = newShardRequest(shard, request);
final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
threadPool.executor(executor).execute(new Runnable() {
@Override

View File

@ -93,6 +93,7 @@ public class PercolateContext extends SearchContext {
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final ConcurrentMap<HashedBytesRef, Query> percolateQueries;
private final int numberOfShards;
private String[] types;
private Engine.Searcher docSearcher;
@ -127,6 +128,7 @@ public class PercolateContext extends SearchContext {
this.engineSearcher = indexShard.acquireSearcher("percolate");
this.searcher = new ContextIndexSearcher(this, engineSearcher);
this.scriptService = scriptService;
this.numberOfShards = request.getNumberOfShards();
}
public IndexSearcher docSearcher() {
@ -327,7 +329,7 @@ public class PercolateContext extends SearchContext {
@Override
public int numberOfShards() {
throw new UnsupportedOperationException();
return numberOfShards;
}
@Override

View File

@ -37,6 +37,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertMatchCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
@ -122,4 +123,16 @@ public class PercolatorFacetsAndAggregationsTests extends ElasticsearchIntegrati
}
}
@Test
public void testSignificantAggs() throws Exception {
client().admin().indices().prepareCreate("test").execute().actionGet();
ensureGreen();
PercolateRequestBuilder percolateRequestBuilder = client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", "value").endObject()))
.addAggregation(AggregationBuilders.significantTerms("a").field("field2"));
PercolateResponse response = percolateRequestBuilder.get();
assertNoFailures(response);
}
}