make broadcast action more extendable by refactoring out type/id

This commit is contained in:
kimchy 2011-06-26 16:33:44 +03:00
parent b670a7f2a3
commit 0bbf71188c
6 changed files with 107 additions and 67 deletions

View File

@ -30,12 +30,17 @@ import java.io.IOException;
*/
public class SinglePingRequest extends SingleShardOperationRequest {
protected String type;
protected String id;
public SinglePingRequest(String index) {
super(index, null, null);
super(index);
}
public SinglePingRequest(String index, String type, String id) {
super(index, type, id);
super(index);
this.type = type;
this.id = id;
}
public SinglePingRequest() {
@ -68,10 +73,13 @@ public class SinglePingRequest extends SingleShardOperationRequest {
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readUTF();
id = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(type);
out.writeUTF(id);
}
}

View File

@ -23,6 +23,8 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -49,6 +51,11 @@ public class TransportSinglePingAction extends TransportShardSingleOperationActi
return "/cluster/ping/single/shard";
}
@Override protected ShardIterator shards(ClusterState clusterState, SinglePingRequest request) throws ElasticSearchException {
return clusterService.operationRouting()
.indexShards(clusterService.state(), request.index(), request.type, request.id, null);
}
@Override protected SinglePingResponse shardOperation(SinglePingRequest request, int shardId) throws ElasticSearchException {
return new SinglePingResponse();
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.get;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.Actions;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Required;
@ -41,6 +43,11 @@ import java.io.IOException;
*/
public class GetRequest extends SingleShardOperationRequest {
protected String type;
protected String id;
protected String routing;
protected String preference;
private String[] fields;
private boolean refresh = false;
@ -56,7 +63,8 @@ public class GetRequest extends SingleShardOperationRequest {
* must be set.
*/
public GetRequest(String index) {
super(index, "_all", null);
super(index);
this.type = "_all";
}
/**
@ -67,7 +75,20 @@ public class GetRequest extends SingleShardOperationRequest {
* @param id The id of the document
*/
public GetRequest(String index, String type, String id) {
super(index, type, id);
super(index);
this.type = type;
this.id = id;
}
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (type == null) {
validationException = Actions.addValidationError("type is missing", validationException);
}
if (id == null) {
validationException = Actions.addValidationError("id is missing", validationException);
}
return validationException;
}
/**
@ -116,6 +137,22 @@ public class GetRequest extends SingleShardOperationRequest {
return this;
}
public String type() {
return type;
}
public String id() {
return id;
}
public String routing() {
return this.routing;
}
public String preference() {
return this.preference;
}
/**
* Explicitly specify the fields that will be returned. By default, the <tt>_source</tt>
* field will be returned.
@ -174,6 +211,16 @@ public class GetRequest extends SingleShardOperationRequest {
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readUTF();
id = in.readUTF();
if (in.readBoolean()) {
routing = in.readUTF();
}
if (in.readBoolean()) {
preference = in.readUTF();
}
refresh = in.readBoolean();
int size = in.readInt();
if (size >= 0) {
@ -192,6 +239,22 @@ public class GetRequest extends SingleShardOperationRequest {
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(type);
out.writeUTF(id);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
if (preference == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(preference);
}
out.writeBoolean(refresh);
if (fields == null) {
out.writeInt(-1);
@ -211,6 +274,6 @@ public class GetRequest extends SingleShardOperationRequest {
}
@Override public String toString() {
return "[" + index + "][" + type + "][" + id + "]";
return "[" + index + "][" + type + "][" + id + "]: routing [" + routing + "]";
}
}

View File

@ -32,6 +32,8 @@ import org.elasticsearch.action.support.single.shard.TransportShardSingleOperati
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.bloom.BloomFilter;
@ -100,10 +102,19 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, request.index());
}
@Override protected ShardIterator shards(ClusterState clusterState, GetRequest request) {
return clusterService.operationRouting()
.indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
}
@Override protected void doExecute(GetRequest request, ActionListener<GetResponse> listener) {
if (request.realtime == null) {
request.realtime = this.realtime;
}
// update the routing (request#index here is possibly an alias)
MetaData metaData = clusterService.state().metaData();
request.routing(metaData.resolveIndexRouting(request.routing(), request.index()));
super.doExecute(request, listener);
}

View File

@ -33,10 +33,6 @@ import java.io.IOException;
public abstract class SingleShardOperationRequest implements ActionRequest {
protected String index;
protected String type;
protected String id;
protected String routing;
protected String preference;
private boolean threadedListener = false;
private boolean threadedOperation = true;
@ -44,10 +40,8 @@ public abstract class SingleShardOperationRequest implements ActionRequest {
protected SingleShardOperationRequest() {
}
public SingleShardOperationRequest(String index, String type, String id) {
public SingleShardOperationRequest(String index) {
this.index = index;
this.type = type;
this.id = id;
}
@Override public ActionRequestValidationException validate() {
@ -55,12 +49,6 @@ public abstract class SingleShardOperationRequest implements ActionRequest {
if (index == null) {
validationException = Actions.addValidationError("index is missing", validationException);
}
if (type == null) {
validationException = Actions.addValidationError("type is missing", validationException);
}
if (id == null) {
validationException = Actions.addValidationError("id is missing", validationException);
}
return validationException;
}
@ -73,22 +61,6 @@ public abstract class SingleShardOperationRequest implements ActionRequest {
return this;
}
public String type() {
return type;
}
public String id() {
return id;
}
public String routing() {
return this.routing;
}
public String preference() {
return this.preference;
}
/**
* Should the listener be called on a separate thread if needed.
*/
@ -118,33 +90,11 @@ public abstract class SingleShardOperationRequest implements ActionRequest {
@Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
type = in.readUTF();
id = in.readUTF();
if (in.readBoolean()) {
routing = in.readUTF();
}
if (in.readBoolean()) {
preference = in.readUTF();
}
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(type);
out.writeUTF(id);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
if (preference == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(preference);
}
}
}

View File

@ -35,10 +35,13 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Set;
/**
* @author kimchy (shay.banon)
@ -86,6 +89,8 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
protected abstract ShardIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
private class AsyncSingleAction {
private final ActionListener<Response> listener;
@ -104,15 +109,11 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
nodes = clusterState.nodes();
// update to the concrete shard and find routing to use
String alias = request.index();
request.index(clusterState.metaData().concreteIndex(request.index()));
String effectiveRouting = clusterState.metaData().resolveIndexRouting(request.routing(), alias);
checkBlock(request, clusterState);
this.shardIt = clusterService.operationRouting()
.getShards(clusterState, request.index(), request.type(), request.id(), effectiveRouting, request.preference());
this.shardIt = shards(clusterState, request);
}
public void start() {
@ -121,7 +122,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
private void onFailure(ShardRouting shardRouting, Exception e) {
if (logger.isTraceEnabled() && e != null) {
logger.trace(shardRouting.shortSummary() + ": Failed to get [" + request.type() + "#" + request.id() + "]", e);
logger.trace(shardRouting.shortSummary() + ": Failed to execute [{}]", e, request);
}
perform(e);
}
@ -193,10 +194,10 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
if (!shardIt.hasNextActive()) {
Exception failure = lastException;
if (failure == null) {
failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]");
failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request + "]");
} else {
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to get [" + request.type() + "#" + request.id() + "]", failure);
logger.debug(shardIt.shardId() + ": Failed to get [{}]", failure, request);
}
}
listener.onFailure(failure);