Removing TransportSingleCustomOperationAction in favour of TransportSingleShardAction to clean up code.

The TransportSingleCustomOperationAction `prefer_local` option has been removed as it isn't worth the effort.
The TransportSingleShardAction will execute the operation on the receiving node if a concrete list doesn't provide a list of candite shards routings to perform the operation on.
This commit is contained in:
Martijn van Groningen 2015-07-21 16:44:58 +02:00
parent 73c0cd512a
commit cafc7078e2
23 changed files with 117 additions and 572 deletions

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.analyze;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -32,7 +32,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* A request to analyze a text associated with a specific index. Allow to provide
* the actual analyzer name to perform the analysis with.
*/
public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest> {
public class AnalyzeRequest extends SingleShardRequest<AnalyzeRequest> {
private String[] text;
@ -114,7 +114,7 @@ public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest>
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
ActionRequestValidationException validationException = null;
if (text == null || text.length == 0) {
validationException = addValidationError("text is missing", validationException);
}

View File

@ -18,13 +18,13 @@
*/
package org.elasticsearch.action.admin.indices.analyze;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequestBuilder;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
*
*/
public class AnalyzeRequestBuilder extends SingleCustomOperationRequestBuilder<AnalyzeRequest, AnalyzeResponse, AnalyzeRequestBuilder> {
public class AnalyzeRequestBuilder extends SingleShardOperationRequestBuilder<AnalyzeRequest, AnalyzeResponse, AnalyzeRequestBuilder> {
public AnalyzeRequestBuilder(ElasticsearchClient client, AnalyzeAction action) {
super(client, action, new AnalyzeRequest());
@ -34,15 +34,6 @@ public class AnalyzeRequestBuilder extends SingleCustomOperationRequestBuilder<A
super(client, action, new AnalyzeRequest(index).text(text));
}
/**
* Sets the index to use to analyzer the text (for example, if it holds specific analyzers
* registered).
*/
public AnalyzeRequestBuilder setIndex(String index) {
request.index(index);
return this;
}
/**
* Sets the analyzer name to use in order to analyze the text.
*

View File

@ -29,7 +29,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -54,7 +54,7 @@ import java.util.List;
/**
* Transport action used to execute analyze requests
*/
public class TransportAnalyzeAction extends TransportSingleCustomOperationAction<AnalyzeRequest, AnalyzeResponse> {
public class TransportAnalyzeAction extends TransportSingleShardAction<AnalyzeRequest, AnalyzeResponse> {
private final IndicesService indicesService;
private final IndicesAnalysisService indicesAnalysisService;

View File

@ -19,16 +19,17 @@
package org.elasticsearch.action.admin.indices.mapping.get;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetFieldMappingsIndexRequest> {
class GetFieldMappingsIndexRequest extends SingleShardRequest<GetFieldMappingsIndexRequest> {
private boolean probablySingleFieldRequest;
private boolean includeDefaults;
@ -42,7 +43,6 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
GetFieldMappingsIndexRequest(GetFieldMappingsRequest other, String index, boolean probablySingleFieldRequest) {
super(other);
this.preferLocal(other.local);
this.probablySingleFieldRequest = probablySingleFieldRequest;
this.includeDefaults = other.includeDefaults();
this.types = other.types();
@ -52,6 +52,11 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
this.originalIndices = new OriginalIndices(other);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public String[] types() {
return types;
}
@ -88,11 +93,6 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
@Override
protected void writeIndex(StreamOutput out) throws IOException {
out.writeString(index());
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -103,8 +103,4 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
protected void readIndex(StreamInput in) throws IOException {
index(in.readString());
}
}

View File

@ -26,7 +26,7 @@ import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -58,7 +58,7 @@ import java.util.Iterator;
/**
* Transport action used to retrieve the mappings related to fields that belong to a specific index
*/
public class TransportGetFieldMappingsIndexAction extends TransportSingleCustomOperationAction<GetFieldMappingsIndexRequest, GetFieldMappingsResponse> {
public class TransportGetFieldMappingsIndexAction extends TransportSingleShardAction<GetFieldMappingsIndexRequest, GetFieldMappingsResponse> {
private static final String ACTION_NAME = GetFieldMappingsAction.NAME + "[index]";

View File

@ -152,7 +152,7 @@ public class ExplainRequest extends SingleShardRequest<ExplainRequest> {
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
ActionRequestValidationException validationException = super.validateNonNullIndex();
if (type == null) {
validationException = ValidateActions.addValidationError("type is missing", validationException);
}

View File

@ -88,7 +88,7 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(ExplainRequest request) {
return true;
}

View File

@ -118,7 +118,7 @@ public class GetRequest extends SingleShardRequest<GetRequest> {
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
ActionRequestValidationException validationException = super.validateNonNullIndex();
if (type == null) {
validationException = ValidateActions.addValidationError("type is missing", validationException);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.get;
import com.carrotsearch.hppc.IntArrayList;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -54,6 +55,11 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
ignoreErrorsOnGeneratedFields = multiGetRequest.ignoreErrorsOnGeneratedFields;
}
@Override
public ActionRequestValidationException validate() {
return super.validateNonNullIndex();
}
public int shardId() {
return this.shardId;
}

View File

@ -58,7 +58,7 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(GetRequest request) {
return true;
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.get;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
@ -68,7 +67,7 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(MultiGetShardRequest request) {
return true;
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.percolate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
@ -76,7 +77,7 @@ public class TransportShardMultiPercolateAction extends TransportSingleShardActi
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(Request request) {
return false;
}
@ -127,6 +128,11 @@ public class TransportShardMultiPercolateAction extends TransportSingleShardActi
this.items = new ArrayList<>();
}
@Override
public ActionRequestValidationException validate() {
return super.validateNonNullIndex();
}
@Override
public String[] indices() {
List<String> indices = new ArrayList<>();

View File

@ -1,142 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.custom;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
*
*/
public abstract class SingleCustomOperationRequest<T extends SingleCustomOperationRequest> extends ActionRequest<T> implements IndicesRequest {
ShardId internalShardId;
private boolean threadedOperation = true;
private boolean preferLocal = true;
private String index;
protected SingleCustomOperationRequest() {
}
protected SingleCustomOperationRequest(ActionRequest request) {
super(request);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
public boolean operationThreaded() {
return threadedOperation;
}
/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
@SuppressWarnings("unchecked")
public final T operationThreaded(boolean threadedOperation) {
this.threadedOperation = threadedOperation;
return (T) this;
}
/**
* if this operation hits a node with a local relevant shard, should it be preferred
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
*/
@SuppressWarnings("unchecked")
public final T preferLocal(boolean preferLocal) {
this.preferLocal = preferLocal;
return (T) this;
}
@SuppressWarnings("unchecked")
public T index(String index) {
this.index = index;
return (T)this;
}
public String index() {
return index;
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
@Override
public String[] indices() {
if (index == null) {
return Strings.EMPTY_ARRAY;
}
return new String[]{index};
}
/**
* if this operation hits a node with a local relevant shard, should it be preferred
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
*/
public boolean preferLocalShard() {
return this.preferLocal;
}
public void beforeLocalFork() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
}
preferLocal = in.readBoolean();
readIndex(in);
}
protected void readIndex(StreamInput in) throws IOException {
index = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeBoolean(preferLocal);
writeIndex(out);
}
protected void writeIndex(StreamOutput out) throws IOException {
out.writeOptionalString(index);
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.custom;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.IndicesAdminClient;
/**
*/
public abstract class SingleCustomOperationRequestBuilder<Request extends SingleCustomOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends SingleCustomOperationRequestBuilder<Request, Response, RequestBuilder>>
extends ActionRequestBuilder<Request, Response, RequestBuilder> {
protected SingleCustomOperationRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
super(client, action, request);
}
/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setOperationThreaded(boolean threadedOperation) {
request.operationThreaded(threadedOperation);
return (RequestBuilder) this;
}
/**
* if this operation hits a node with a local relevant shard, should it be preferred
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setPreferLocal(boolean preferLocal) {
request.preferLocal(preferLocal);
return (RequestBuilder) this;
}
}

View File

@ -1,314 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.custom;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
/**
* Transport action used to send a read request to one of the shards that belong to an index.
* Supports retrying another shard in case of failure.
*/
public abstract class TransportSingleCustomOperationAction<Request extends SingleCustomOperationRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportService transportService;
final String transportShardAction;
final String executor;
protected TransportSingleCustomOperationAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Class<Request> request, String executor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.transportShardAction = actionName + "[s]";
this.executor = executor;
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
}
/**
* Can return null to execute on this local node.
*/
protected abstract ShardsIterator shards(ClusterState state, InternalRequest request);
/**
* Operation to be executed at the shard level. Can be called with shardId set to null, meaning that there is no
* shard involved and the operation just needs to be executed on the local node.
*/
protected abstract Response shardOperation(Request request, ShardId shardId);
protected abstract Response newResponse();
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.concreteIndex());
}
protected abstract boolean resolveIndex(Request request);
private class AsyncSingleAction {
private final ActionListener<Response> listener;
private final ShardsIterator shardsIt;
private final InternalRequest internalRequest;
private final DiscoveryNodes nodes;
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.listener = listener;
ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}
String concreteSingleIndex;
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request);
} else {
concreteSingleIndex = request.index();
}
this.internalRequest = new InternalRequest(request, concreteSingleIndex);
blockException = checkRequestBlock(clusterState, internalRequest);
if (blockException != null) {
throw blockException;
}
this.shardsIt = shards(clusterState, internalRequest);
}
public void start() {
performFirst();
}
private void onFailure(ShardRouting shardRouting, Throwable e) {
if (logger.isTraceEnabled() && e != null) {
logger.trace(shardRouting.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e);
}
perform(e);
}
/**
* First get should try and use a shard that exists on a local node for better performance
*/
private void performFirst() {
if (shardsIt == null) {
// just execute it on the local node
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
Response response = shardOperation(internalRequest.request(), null);
listener.onResponse(response);
} catch (Throwable e) {
onFailure(null, e);
}
}
});
return;
} else {
try {
final Response response = shardOperation(internalRequest.request(), null);
listener.onResponse(response);
return;
} catch (Throwable e) {
onFailure(null, e);
}
}
return;
}
if (internalRequest.request().preferLocalShard()) {
boolean foundLocal = false;
ShardRouting shardX;
while ((shardX = shardsIt.nextOrNull()) != null) {
final ShardRouting shard = shardX;
if (shard.currentNodeId().equals(nodes.localNodeId())) {
foundLocal = true;
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
Response response = shardOperation(internalRequest.request(), shard.shardId());
listener.onResponse(response);
} catch (Throwable e) {
shardsIt.reset();
onFailure(shard, e);
}
}
});
return;
} else {
try {
final Response response = shardOperation(internalRequest.request(), shard.shardId());
listener.onResponse(response);
return;
} catch (Throwable e) {
shardsIt.reset();
onFailure(shard, e);
}
}
}
}
if (!foundLocal) {
// no local node get, go remote
shardsIt.reset();
perform(null);
}
} else {
perform(null);
}
}
private void perform(final Throwable lastException) {
final ShardRouting shard = shardsIt == null ? null : shardsIt.nextOrNull();
if (shard == null) {
Throwable failure = lastException;
if (failure == null) {
failure = new NoShardAvailableActionException(null, "No shard available for [" + internalRequest.request() + "]");
} else {
if (logger.isDebugEnabled()) {
logger.debug("failed to execute [" + internalRequest.request() + "]", failure);
}
}
listener.onFailure(failure);
} else {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
// we don't prefer local shard, so try and do it here
if (!internalRequest.request().preferLocalShard()) {
try {
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
Response response = shardOperation(internalRequest.request(), shard.shardId());
listener.onResponse(response);
} catch (Throwable e) {
onFailure(shard, e);
}
}
});
} else {
final Response response = shardOperation(internalRequest.request(), shard.shardId());
listener.onResponse(response);
}
} catch (Throwable e) {
onFailure(shard, e);
}
} else {
perform(lastException);
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
internalRequest.request().internalShardId = shard.shardId();
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
onFailure(shard, exp);
}
});
}
}
}
}
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
}
}
/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/
protected class InternalRequest {
final Request request;
final String concreteIndex;
InternalRequest(Request request, String concreteIndex) {
this.request = request;
this.concreteIndex = concreteIndex;
}
public Request request() {
return request;
}
public String concreteIndex() {
return concreteIndex;
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
@ -35,10 +36,17 @@ import java.io.IOException;
*/
public abstract class SingleShardRequest<T extends SingleShardRequest> extends ActionRequest<T> implements IndicesRequest {
ShardId internalShardId;
protected String index;
public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
/**
* The concrete index name
*
* Whether index property is optional depends on the concrete implementation. If index property is required the
* concrete implementation should use {@link #validateNonNullIndex()} to check if the index property has been set
*/
@Nullable
protected String index;
ShardId internalShardId;
private boolean threadedOperation = true;
protected SingleShardRequest() {
@ -57,8 +65,10 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
this.index = index;
}
@Override
public ActionRequestValidationException validate() {
/**
* @return a validation exception if the index property hasn't been set
*/
protected ActionRequestValidationException validateNonNullIndex() {
ActionRequestValidationException validationException = null;
if (index == null) {
validationException = ValidateActions.addValidationError("index is missing", validationException);
@ -66,6 +76,13 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
return validationException;
}
/**
* @return The concrete index this request is targeted for or <code>null</code> if index is optional.
* Whether index property is optional depends on the concrete implementation. If index property
* is required the concrete implementation should use {@link #validateNonNullIndex()} to check
* if the index property has been set
*/
@Nullable
public String index() {
return index;
}
@ -111,7 +128,7 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
}
index = in.readString();
index = in.readOptionalString();
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
@ -119,7 +136,7 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeString(index);
out.writeOptionalString(index);
}
}

View File

@ -32,9 +32,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
@ -43,7 +44,9 @@ import org.elasticsearch.transport.*;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
/**
* A base class for single shard read operations.
* A base class for operations that need to perform a read operation on a single shard copy. If the operation fails,
* the read operation can be performed on other shard copies. Concrete implementations can provide their own list
* of candidate shards to try the read operation on.
*/
public abstract class TransportSingleShardAction<Request extends SingleShardRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
@ -88,7 +91,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
protected abstract Response newResponse();
protected abstract boolean resolveIndex();
protected abstract boolean resolveIndex(Request request);
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
@ -102,12 +105,17 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
protected abstract ShardIterator shards(ClusterState state, InternalRequest request);
/**
* Returns the candidate shards to execute the operation on or <code>null</code> the execute
* the operation locally (the node that received the request)
*/
@Nullable
protected abstract ShardsIterator shards(ClusterState state, InternalRequest request);
class AsyncSingleAction {
private final ActionListener<Response> listener;
private final ShardIterator shardIt;
private final ShardsIterator shardIt;
private final InternalRequest internalRequest;
private final DiscoveryNodes nodes;
private volatile Throwable lastFailure;
@ -126,7 +134,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
String concreteSingleIndex;
if (resolveIndex()) {
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request);
} else {
concreteSingleIndex = request.index();
@ -143,7 +151,32 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
public void start() {
perform(null);
if (shardIt == null) {
// just execute it on the local node
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
perform(exp);
}
});
} else {
perform(null);
}
}
private void onFailure(ShardRouting shardRouting, Throwable e) {
@ -163,10 +196,10 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
if (shardRouting == null) {
Throwable failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(shardIt.shardId(), null, failure);
failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: failed to execute [{}]", failure, shardIt.shardId(), internalRequest.request());
logger.debug("{}: failed to execute [{}]", failure, null, internalRequest.request());
}
}
listener.onFailure(failure);
@ -174,7 +207,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardIt.shardId()));
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
internalRequest.request().internalShardId = shardRouting.shardId();
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.termvectors;
import com.carrotsearch.hppc.IntArrayList;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -47,6 +48,11 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest<MultiTermVe
requests = new ArrayList<>();
}
@Override
public ActionRequestValidationException validate() {
return super.validateNonNullIndex();
}
public int shardId() {
return this.shardId;
}

View File

@ -472,7 +472,7 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
ActionRequestValidationException validationException = super.validateNonNullIndex();
if (type == null) {
validationException = ValidateActions.addValidationError("type is missing", validationException);
}

View File

@ -63,7 +63,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(MultiTermVectorsShardRequest request) {
return false;
}

View File

@ -65,7 +65,7 @@ public class TransportTermVectorsAction extends TransportSingleShardAction<TermV
}
@Override
protected boolean resolveIndex() {
protected boolean resolveIndex(TermVectorsRequest request) {
return true;
}

View File

@ -60,7 +60,6 @@ public class RestAnalyzeAction extends BaseRestHandler {
AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index"));
analyzeRequest.text(texts);
analyzeRequest.preferLocal(request.paramAsBoolean("prefer_local", analyzeRequest.preferLocalShard()));
analyzeRequest.analyzer(request.param("analyzer"));
analyzeRequest.field(request.param("field"));
analyzeRequest.tokenizer(request.param("tokenizer"));
@ -93,8 +92,6 @@ public class RestAnalyzeAction extends BaseRestHandler {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("prefer_local".equals(currentFieldName) && token == XContentParser.Token.VALUE_BOOLEAN) {
analyzeRequest.preferLocal(parser.booleanValue());
} else if ("text".equals(currentFieldName) && token == XContentParser.Token.VALUE_STRING) {
analyzeRequest.text(parser.text());
} else if ("text".equals(currentFieldName) && token == XContentParser.Token.START_ARRAY) {

View File

@ -825,3 +825,9 @@ For the record, official plugins which can use this new simplified form are:
Fields used in alias filters no longer have to exist in the mapping upon alias creation time. Alias filters are now
parsed at request time and then the fields in filters are resolved from the mapping, whereas before alias filters were
parsed at alias creation time and the parsed form was kept around in memory.
=== _analyze API
The `prefer_local` has been removed from the _analyze api. The _analyze api is a light operation and the caller shouldn't
be concerned about whether it executes on the node that receives the request or another node.