From 38d10d19bc75f59f23cffc37f0a6bfa8c4de5812 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 14 Jan 2011 01:01:37 +0200 Subject: [PATCH] Add `prefer_local` flag to analyze and percolate request, closes #625. --- .../admin/indices/analyze/AnalyzeRequest.java | 9 ++ .../analyze/TransportAnalyzeAction.java | 2 +- .../action/percolate/PercolateRequest.java | 9 ++ .../percolate/TransportPercolateAction.java | 2 +- .../custom/SingleCustomOperationRequest.java | 20 ++++ .../TransportSingleCustomOperationAction.java | 96 +++++++++++++------ .../analyze/AnalyzeRequestBuilder.java | 9 ++ .../percolate/PercolateRequestBuilder.java | 9 ++ .../cluster/routing/IndexRoutingTable.java | 2 +- .../indices/analyze/RestAnalyzeAction.java | 1 + .../action/percolate/RestPercolateAction.java | 1 + .../percolator/SimplePercolatorTests.java | 20 +++- 12 files changed, 143 insertions(+), 37 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java index 9592bb6822c..9cec36db840 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java @@ -79,6 +79,15 @@ public class AnalyzeRequest extends SingleCustomOperationRequest { return this.analyzer; } + /** + * if this operation hits a node with a local relevant shard, should it be preferred + * to be executed on, or just do plain round robin. Defaults to true + */ + @Override public AnalyzeRequest preferLocal(boolean preferLocal) { + super.preferLocal(preferLocal); + return this; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = super.validate(); if (index == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java index aaeb35ce982..7c080dfb444 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java @@ -75,7 +75,7 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction @Override protected ShardsIterator shards(ClusterState clusterState, AnalyzeRequest request) { request.index(clusterState.metaData().concreteIndex(request.index())); - return clusterState.routingTable().index(request.index()).allShardsIt(); + return clusterState.routingTable().index(request.index()).randomAllShardsIt(); } @Override protected AnalyzeResponse shardOperation(AnalyzeRequest request, int shardId) throws ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java index 69b3369a8dc..dc7d15dde3e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java @@ -150,6 +150,15 @@ public class PercolateRequest extends SingleCustomOperationRequest { return this; } + /** + * if this operation hits a node with a local relevant shard, should it be preferred + * to be executed on, or just do plain round robin. Defaults to true + */ + @Override public PercolateRequest preferLocal(boolean preferLocal) { + super.preferLocal(preferLocal); + return this; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = super.validate(); if (index == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java index b153d191a4e..7020edde2fc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java @@ -65,7 +65,7 @@ public class TransportPercolateAction extends TransportSingleCustomOperationActi @Override protected ShardsIterator shards(ClusterState clusterState, PercolateRequest request) { request.index(clusterState.metaData().concreteIndex(request.index())); - return clusterState.routingTable().index(request.index()).allShardsIt(); + return clusterState.routingTable().index(request.index()).randomAllShardsIt(); } @Override protected PercolateResponse shardOperation(PercolateRequest request, int shardId) throws ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java index 0661176b04c..dfcd9d0bd53 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java @@ -33,6 +33,7 @@ public abstract class SingleCustomOperationRequest implements ActionRequest { private boolean threadedListener = false; private boolean threadedOperation = true; + private boolean preferLocal = true; protected SingleCustomOperationRequest() { } @@ -68,15 +69,34 @@ public abstract class SingleCustomOperationRequest implements ActionRequest { return this; } + /** + * if this operation hits a node with a local relevant shard, should it be preferred + * to be executed on, or just do plain round robin. Defaults to true + */ + public SingleCustomOperationRequest preferLocal(boolean preferLocal) { + this.preferLocal = preferLocal; + return this; + } + + /** + * if this operation hits a node with a local relevant shard, should it be preferred + * to be executed on, or just do plain round robin. Defaults to true + */ + public boolean preferLocalShard() { + return this.preferLocal; + } + public void beforeLocalFork() { } @Override public void readFrom(StreamInput in) throws IOException { // no need to pass threading over the network, they are always false when coming throw a thread pool + preferLocal = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(preferLocal); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java index 8355321ae39..48e1421bd5a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java @@ -116,40 +116,44 @@ public abstract class TransportSingleCustomOperationAction() { @Override public Response newInstance() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/analyze/AnalyzeRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/analyze/AnalyzeRequestBuilder.java index b830c02c8ca..4ce2664c3d9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/analyze/AnalyzeRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/analyze/AnalyzeRequestBuilder.java @@ -44,6 +44,15 @@ public class AnalyzeRequestBuilder extends BaseIndicesRequestBuildertrue + */ + public AnalyzeRequestBuilder setPreferLocal(boolean preferLocal) { + request.preferLocal(preferLocal); + return this; + } + @Override protected void doExecute(ActionListener listener) { client.analyze(request, listener); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/percolate/PercolateRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/percolate/PercolateRequestBuilder.java index 1deecc17a26..8cba63a693b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/percolate/PercolateRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/percolate/PercolateRequestBuilder.java @@ -128,6 +128,15 @@ public class PercolateRequestBuilder extends BaseRequestBuildertrue + */ + public PercolateRequestBuilder setPreferLocal(boolean preferLocal) { + request.preferLocal(preferLocal); + return this; + } + /** * Controls if the operation will be executed on a separate thread when executed locally. Defaults * to true when running in embedded mode. diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 3f4ddd88d6d..7a6ef09ea6d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -153,7 +153,7 @@ public class IndexRoutingTable implements Iterable { /** * An iterator over all shards (including replicas). */ - public ShardsIterator allShardsIt() { + public ShardsIterator randomAllShardsIt() { return new PlainShardsIterator(allShards, Math.abs(counter.incrementAndGet())); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/analyze/RestAnalyzeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/analyze/RestAnalyzeAction.java index b4266ef4a00..0bb5320f4e3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/analyze/RestAnalyzeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/analyze/RestAnalyzeAction.java @@ -61,6 +61,7 @@ public class RestAnalyzeAction extends BaseRestHandler { } AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index"), text); + analyzeRequest.preferLocal(request.paramAsBoolean("prefer_local", analyzeRequest.preferLocalShard())); analyzeRequest.analyzer(request.param("analyzer")); client.admin().indices().analyze(analyzeRequest, new ActionListener() { @Override public void onResponse(AnalyzeResponse response) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java index 1a13b4f4919..e2f95a1f1e8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java @@ -55,6 +55,7 @@ public class RestPercolateAction extends BaseRestHandler { // we don't spawn, then fork if local percolateRequest.operationThreaded(true); + percolateRequest.preferLocal(request.paramAsBoolean("prefer_local", percolateRequest.preferLocalShard())); client.percolate(percolateRequest, new ActionListener() { @Override public void onResponse(PercolateResponse response) { try { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java index 83a77f8864f..f58fa73fc37 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java @@ -101,11 +101,21 @@ public class SimplePercolatorTests extends AbstractNodesTests { .execute().actionGet(); client.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet(); - PercolateResponse percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1") - .field("field1", "value1") - .endObject().endObject().endObject()) - .execute().actionGet(); - assertThat(percolate.matches().size(), equalTo(1)); + for (int i = 0; i < 10; i++) { + PercolateResponse percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1") + .field("field1", "value1") + .endObject().endObject().endObject()) + .execute().actionGet(); + assertThat(percolate.matches().size(), equalTo(1)); + } + + for (int i = 0; i < 10; i++) { + PercolateResponse percolate = client.preparePercolate("test").setPreferLocal(false).setSource(jsonBuilder().startObject().startObject("doc").startObject("type1") + .field("field1", "value1") + .endObject().endObject().endObject()) + .execute().actionGet(); + assertThat(percolate.matches().size(), equalTo(1)); + } } @Test public void dynamicAddingRemovingQueries() throws Exception {