From 0bbf71188cb94185b87859a2cb52ea4863cfea2b Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 26 Jun 2011 16:33:44 +0300 Subject: [PATCH] make broadcast action more extendable by refactoring out type/id --- .../ping/single/SinglePingRequest.java | 14 +++- .../single/TransportSinglePingAction.java | 7 ++ .../elasticsearch/action/get/GetRequest.java | 69 ++++++++++++++++++- .../action/get/TransportGetAction.java | 11 +++ .../shard/SingleShardOperationRequest.java | 52 +------------- .../TransportShardSingleOperationAction.java | 21 +++--- 6 files changed, 107 insertions(+), 67 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/SinglePingRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/SinglePingRequest.java index afeb21a804b..5cda2aae92c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/SinglePingRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/SinglePingRequest.java @@ -30,12 +30,17 @@ import java.io.IOException; */ public class SinglePingRequest extends SingleShardOperationRequest { + protected String type; + protected String id; + public SinglePingRequest(String index) { - super(index, null, null); + super(index); } public SinglePingRequest(String index, String type, String id) { - super(index, type, id); + super(index); + this.type = type; + this.id = id; } public SinglePingRequest() { @@ -68,10 +73,13 @@ public class SinglePingRequest extends SingleShardOperationRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + type = in.readUTF(); + id = in.readUTF(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeUTF(type); + out.writeUTF(id); } - } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/TransportSinglePingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/TransportSinglePingAction.java index 83d509e2f57..7eb7193c681 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/TransportSinglePingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/single/TransportSinglePingAction.java @@ -23,6 +23,8 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; @@ -49,6 +51,11 @@ public class TransportSinglePingAction extends TransportShardSingleOperationActi return "/cluster/ping/single/shard"; } + @Override protected ShardIterator shards(ClusterState clusterState, SinglePingRequest request) throws ElasticSearchException { + return clusterService.operationRouting() + .indexShards(clusterService.state(), request.index(), request.type, request.id, null); + } + @Override protected SinglePingResponse shardOperation(SinglePingRequest request, int shardId) throws ElasticSearchException { return new SinglePingResponse(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetRequest.java index 48bc1b032f8..2c291e3ed6a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.get; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.Actions; import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Required; @@ -41,6 +43,11 @@ import java.io.IOException; */ public class GetRequest extends SingleShardOperationRequest { + protected String type; + protected String id; + protected String routing; + protected String preference; + private String[] fields; private boolean refresh = false; @@ -56,7 +63,8 @@ public class GetRequest extends SingleShardOperationRequest { * must be set. */ public GetRequest(String index) { - super(index, "_all", null); + super(index); + this.type = "_all"; } /** @@ -67,7 +75,20 @@ public class GetRequest extends SingleShardOperationRequest { * @param id The id of the document */ public GetRequest(String index, String type, String id) { - super(index, type, id); + super(index); + this.type = type; + this.id = id; + } + + @Override public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = super.validate(); + if (type == null) { + validationException = Actions.addValidationError("type is missing", validationException); + } + if (id == null) { + validationException = Actions.addValidationError("id is missing", validationException); + } + return validationException; } /** @@ -116,6 +137,22 @@ public class GetRequest extends SingleShardOperationRequest { return this; } + public String type() { + return type; + } + + public String id() { + return id; + } + + public String routing() { + return this.routing; + } + + public String preference() { + return this.preference; + } + /** * Explicitly specify the fields that will be returned. By default, the _source * field will be returned. @@ -174,6 +211,16 @@ public class GetRequest extends SingleShardOperationRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + + type = in.readUTF(); + id = in.readUTF(); + if (in.readBoolean()) { + routing = in.readUTF(); + } + if (in.readBoolean()) { + preference = in.readUTF(); + } + refresh = in.readBoolean(); int size = in.readInt(); if (size >= 0) { @@ -192,6 +239,22 @@ public class GetRequest extends SingleShardOperationRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + + out.writeUTF(type); + out.writeUTF(id); + if (routing == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(routing); + } + if (preference == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(preference); + } + out.writeBoolean(refresh); if (fields == null) { out.writeInt(-1); @@ -211,6 +274,6 @@ public class GetRequest extends SingleShardOperationRequest { } @Override public String toString() { - return "[" + index + "][" + type + "][" + id + "]"; + return "[" + index + "][" + type + "][" + id + "]: routing [" + routing + "]"; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index f3790cdc3bd..010d93273b5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -32,6 +32,8 @@ import org.elasticsearch.action.support.single.shard.TransportShardSingleOperati import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.bloom.BloomFilter; @@ -100,10 +102,19 @@ public class TransportGetAction extends TransportShardSingleOperationAction listener) { if (request.realtime == null) { request.realtime = this.realtime; } + // update the routing (request#index here is possibly an alias) + MetaData metaData = clusterService.state().metaData(); + request.routing(metaData.resolveIndexRouting(request.routing(), request.index())); + super.doExecute(request, listener); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardOperationRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardOperationRequest.java index 924f70dd1e8..19f1f9735dc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardOperationRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardOperationRequest.java @@ -33,10 +33,6 @@ import java.io.IOException; public abstract class SingleShardOperationRequest implements ActionRequest { protected String index; - protected String type; - protected String id; - protected String routing; - protected String preference; private boolean threadedListener = false; private boolean threadedOperation = true; @@ -44,10 +40,8 @@ public abstract class SingleShardOperationRequest implements ActionRequest { protected SingleShardOperationRequest() { } - public SingleShardOperationRequest(String index, String type, String id) { + public SingleShardOperationRequest(String index) { this.index = index; - this.type = type; - this.id = id; } @Override public ActionRequestValidationException validate() { @@ -55,12 +49,6 @@ public abstract class SingleShardOperationRequest implements ActionRequest { if (index == null) { validationException = Actions.addValidationError("index is missing", validationException); } - if (type == null) { - validationException = Actions.addValidationError("type is missing", validationException); - } - if (id == null) { - validationException = Actions.addValidationError("id is missing", validationException); - } return validationException; } @@ -73,22 +61,6 @@ public abstract class SingleShardOperationRequest implements ActionRequest { return this; } - public String type() { - return type; - } - - public String id() { - return id; - } - - public String routing() { - return this.routing; - } - - public String preference() { - return this.preference; - } - /** * Should the listener be called on a separate thread if needed. */ @@ -118,33 +90,11 @@ public abstract class SingleShardOperationRequest implements ActionRequest { @Override public void readFrom(StreamInput in) throws IOException { index = in.readUTF(); - type = in.readUTF(); - id = in.readUTF(); - if (in.readBoolean()) { - routing = in.readUTF(); - } - if (in.readBoolean()) { - preference = in.readUTF(); - } // no need to pass threading over the network, they are always false when coming throw a thread pool } @Override public void writeTo(StreamOutput out) throws IOException { out.writeUTF(index); - out.writeUTF(type); - out.writeUTF(id); - if (routing == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeUTF(routing); - } - if (preference == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeUTF(preference); - } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java index f54828749de..d6ea757a071 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java @@ -35,10 +35,13 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.BaseTransportRequestHandler; +import org.elasticsearch.transport.BaseTransportResponseHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Set; /** * @author kimchy (shay.banon) @@ -86,6 +89,8 @@ public abstract class TransportShardSingleOperationAction listener; @@ -104,15 +109,11 @@ public abstract class TransportShardSingleOperationAction