Internal: refactored TransportSingleCustomOperationAction, subclasses and requests

TransportSingleCustomOperationAction is subclassed by two similar, yet different transport action: TransportAnalyzeAction and TransportGetFieldMappingsAction. Made their difference and similarities more explicit by sharing common code and moving specific code to subclasses:
- moved index field to the parent SingleCustomOperationAction class
- moved the common check blocks code to the parent transport action class
- moved the main transport handler to the TransportAnalyzeAction subclass as it is only used to receive external requests through clients. In the case of the TransportGetFieldMappingsIndexAction instead, the action is internal and executed only locally as part of the user facing TransportGetFieldMappingsAction. The corresponding request gets sent over the transport though as part of the related shard request
- removed the get field mappings index action from the action names mapping as it is not a transport handler anymore. It was before although never used.

Closes #7214
This commit is contained in:
javanna 2014-08-09 03:16:05 +02:00 committed by Luca Cavanna
parent ac40eae3e3
commit a03860970b
7 changed files with 109 additions and 131 deletions

View File

@ -20,8 +20,6 @@ package org.elasticsearch.action.admin.indices.analyze;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest; import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -36,9 +34,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* A request to analyze a text associated with a specific index. Allow to provide * A request to analyze a text associated with a specific index. Allow to provide
* the actual analyzer name to perform the analysis with. * the actual analyzer name to perform the analysis with.
*/ */
public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest> implements IndicesRequest { public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest> {
private String index;
private String text; private String text;
@ -72,7 +68,7 @@ public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest>
* @param text The text to analyze * @param text The text to analyze
*/ */
public AnalyzeRequest(@Nullable String index, String text) { public AnalyzeRequest(@Nullable String index, String text) {
this.index = index; this.index(index);
this.text = text; this.text = text;
} }
@ -80,28 +76,6 @@ public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest>
return this.text; return this.text;
} }
public AnalyzeRequest index(String index) {
this.index = index;
return this;
}
public String index() {
return this.index;
}
@Override
public String[] indices() {
if (index == null) {
return Strings.EMPTY_ARRAY;
}
return new String[]{index};
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
public AnalyzeRequest analyzer(String analyzer) { public AnalyzeRequest analyzer(String analyzer) {
this.analyzer = analyzer; this.analyzer = analyzer;
return this; return this;
@ -165,7 +139,6 @@ public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest>
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
index = in.readOptionalString();
text = in.readString(); text = in.readString();
analyzer = in.readOptionalString(); analyzer = in.readOptionalString();
tokenizer = in.readOptionalString(); tokenizer = in.readOptionalString();
@ -179,7 +152,6 @@ public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest>
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeOptionalString(index);
out.writeString(text); out.writeString(text);
out.writeOptionalString(analyzer); out.writeOptionalString(analyzer);
out.writeOptionalString(tokenizer); out.writeOptionalString(tokenizer);

View File

@ -27,12 +27,12 @@ import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
import org.apache.lucene.analysis.tokenattributes.TypeAttribute; import org.apache.lucene.analysis.tokenattributes.TypeAttribute;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction; import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
@ -44,13 +44,15 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
/** /**
* * Transport action used to execute analyze requests
*/ */
public class TransportAnalyzeAction extends TransportSingleCustomOperationAction<AnalyzeRequest, AnalyzeResponse> { public class TransportAnalyzeAction extends TransportSingleCustomOperationAction<AnalyzeRequest, AnalyzeResponse> {
@ -64,6 +66,7 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
super(settings, AnalyzeAction.NAME, threadPool, clusterService, transportService, actionFilters); super(settings, AnalyzeAction.NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService; this.indicesService = indicesService;
this.indicesAnalysisService = indicesAnalysisService; this.indicesAnalysisService = indicesAnalysisService;
transportService.registerHandler(AnalyzeAction.NAME, new TransportHandler());
} }
@Override @Override
@ -81,16 +84,11 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
return new AnalyzeResponse(); return new AnalyzeResponse();
} }
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, AnalyzeRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override @Override
protected ClusterBlockException checkRequestBlock(ClusterState state, AnalyzeRequest request) { protected ClusterBlockException checkRequestBlock(ClusterState state, AnalyzeRequest request) {
if (request.index() != null) { if (request.index() != null) {
request.index(state.metaData().concreteSingleIndex(request.index(), request.indicesOptions())); request.index(state.metaData().concreteSingleIndex(request.index(), request.indicesOptions()));
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index()); return super.checkRequestBlock(state, request);
} }
return null; return null;
} }
@ -253,4 +251,44 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
return new AnalyzeResponse(tokens); return new AnalyzeResponse(tokens);
} }
private class TransportHandler extends BaseTransportRequestHandler<AnalyzeRequest> {
@Override
public AnalyzeRequest newInstance() {
return newRequest();
}
@Override
public void messageReceived(AnalyzeRequest request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true);
execute(request, new ActionListener<AnalyzeResponse>() {
@Override
public void onResponse(AnalyzeResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for get", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -19,9 +19,6 @@
package org.elasticsearch.action.admin.indices.mapping.get; package org.elasticsearch.action.admin.indices.mapping.get;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest; import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -29,9 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException; import java.io.IOException;
class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetFieldMappingsIndexRequest> implements IndicesRequest { class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetFieldMappingsIndexRequest> {
private String index;
private boolean probablySingleFieldRequest; private boolean probablySingleFieldRequest;
private boolean includeDefaults; private boolean includeDefaults;
@ -48,21 +43,7 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
this.types = other.types(); this.types = other.types();
this.fields = other.fields(); this.fields = other.fields();
assert index != null; assert index != null;
this.index = index; this.index(index);
}
public String index() {
return index;
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
@Override
public String[] indices() {
return new String[]{index};
} }
public String[] types() { public String[] types() {
@ -81,34 +62,31 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
return includeDefaults; return includeDefaults;
} }
/** Indicates whether default mapping settings should be returned */
public GetFieldMappingsIndexRequest includeDefaults(boolean includeDefaults) {
this.includeDefaults = includeDefaults;
return this;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(index);
out.writeStringArray(types); out.writeStringArray(types);
out.writeStringArray(fields); out.writeStringArray(fields);
out.writeBoolean(includeDefaults); out.writeBoolean(includeDefaults);
out.writeBoolean(probablySingleFieldRequest); out.writeBoolean(probablySingleFieldRequest);
} }
@Override
protected void writeIndex(StreamOutput out) throws IOException {
out.writeString(index());
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
index = in.readString();
types = in.readStringArray(); types = in.readStringArray();
fields = in.readStringArray(); fields = in.readStringArray();
includeDefaults = in.readBoolean(); includeDefaults = in.readBoolean();
probablySingleFieldRequest = in.readBoolean(); probablySingleFieldRequest = in.readBoolean();
} }
@Override
protected void readIndex(StreamInput in) throws IOException {
index(in.readString());
}
} }

View File

@ -28,8 +28,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction; import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -53,6 +51,7 @@ import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
* Transport action used to retrieve the mappings related to fields that belong to a specific index
*/ */
public class TransportGetFieldMappingsIndexAction extends TransportSingleCustomOperationAction<GetFieldMappingsIndexRequest, GetFieldMappingsResponse> { public class TransportGetFieldMappingsIndexAction extends TransportSingleCustomOperationAction<GetFieldMappingsIndexRequest, GetFieldMappingsResponse> {
@ -125,16 +124,6 @@ public class TransportGetFieldMappingsIndexAction extends TransportSingleCustomO
return new GetFieldMappingsResponse(); return new GetFieldMappingsResponse();
} }
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, GetFieldMappingsIndexRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, GetFieldMappingsIndexRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
private static final ToXContent.Params includeDefaultsParams = new ToXContent.Params() { private static final ToXContent.Params includeDefaultsParams = new ToXContent.Params() {
final static String INCLUDE_DEFAULTS = "include_defaults"; final static String INCLUDE_DEFAULTS = "include_defaults";

View File

@ -20,6 +20,9 @@ package org.elasticsearch.action.support.single.custom;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,10 +31,11 @@ import java.io.IOException;
/** /**
* *
*/ */
public abstract class SingleCustomOperationRequest<T extends SingleCustomOperationRequest> extends ActionRequest<T> { public abstract class SingleCustomOperationRequest<T extends SingleCustomOperationRequest> extends ActionRequest<T> implements IndicesRequest {
private boolean threadedOperation = true; private boolean threadedOperation = true;
private boolean preferLocal = true; private boolean preferLocal = true;
private String index;
protected SingleCustomOperationRequest() { protected SingleCustomOperationRequest() {
} }
@ -67,6 +71,29 @@ public abstract class SingleCustomOperationRequest<T extends SingleCustomOperati
return (T) this; return (T) this;
} }
@SuppressWarnings("unchecked")
public T index(String index) {
this.index = index;
return (T)this;
}
public String index() {
return index;
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
@Override
public String[] indices() {
if (index == null) {
return Strings.EMPTY_ARRAY;
}
return new String[]{index};
}
/** /**
* if this operation hits a node with a local relevant shard, should it be preferred * if this operation hits a node with a local relevant shard, should it be preferred
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt> * to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
@ -83,12 +110,22 @@ public abstract class SingleCustomOperationRequest<T extends SingleCustomOperati
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
preferLocal = in.readBoolean(); preferLocal = in.readBoolean();
readIndex(in);
}
protected void readIndex(StreamInput in) throws IOException {
index = in.readOptionalString();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeBoolean(preferLocal); out.writeBoolean(preferLocal);
writeIndex(out);
}
protected void writeIndex(StreamOutput out) throws IOException {
out.writeOptionalString(index);
} }
} }

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
@ -40,7 +41,8 @@ import org.elasticsearch.transport.*;
import java.io.IOException; import java.io.IOException;
/** /**
* * Transport action used to send a read request to one of the shards that belong to an index.
* Supports retrying another shard in case of failure.
*/ */
public abstract class TransportSingleCustomOperationAction<Request extends SingleCustomOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> { public abstract class TransportSingleCustomOperationAction<Request extends SingleCustomOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
@ -59,7 +61,6 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
this.transportShardAction = actionName + "[s]"; this.transportShardAction = actionName + "[s]";
this.executor = executor(); this.executor = executor();
transportService.registerHandler(actionName, new TransportHandler());
transportService.registerHandler(transportShardAction, new ShardTransportHandler()); transportService.registerHandler(transportShardAction, new ShardTransportHandler());
} }
@ -81,9 +82,13 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
protected abstract Response newResponse(); protected abstract Response newResponse();
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request); protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request); protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}
private class AsyncSingleAction { private class AsyncSingleAction {
@ -101,7 +106,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes(); nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState, request); ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) { if (blockException != null) {
throw blockException; throw blockException;
} }
@ -267,46 +272,6 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
} }
} }
private class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequest();
}
@Override
public void messageReceived(Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for get", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
private class ShardTransportHandler extends BaseTransportRequestHandler<ShardSingleOperationRequest> { private class ShardTransportHandler extends BaseTransportRequestHandler<ShardSingleOperationRequest> {
@Override @Override

View File

@ -205,7 +205,6 @@ final class ActionNames {
builder.put(DeleteMappingAction.NAME, "indices/mapping/delete"); builder.put(DeleteMappingAction.NAME, "indices/mapping/delete");
builder.put(PutMappingAction.NAME, "indices/mapping/put"); builder.put(PutMappingAction.NAME, "indices/mapping/put");
builder.put(GetFieldMappingsAction.NAME, "mappings/fields/get"); builder.put(GetFieldMappingsAction.NAME, "mappings/fields/get");
builder.put(GetFieldMappingsAction.NAME + "[index]", "mappings/fields/get/index");
builder.put(GetFieldMappingsAction.NAME + "[index][s]", "mappings/fields/get/index/s"); builder.put(GetFieldMappingsAction.NAME + "[index][s]", "mappings/fields/get/index/s");
builder.put(GetMappingsAction.NAME, "mappings/get"); builder.put(GetMappingsAction.NAME, "mappings/get");