Distinguish primary and replica request types in TransportWriteAction

This commit is contained in:
Areek Zillur 2016-10-13 14:37:08 -04:00
parent 289a69bf68
commit 1b1f484c28
5 changed files with 16 additions and 15 deletions

View File

@ -66,7 +66,7 @@ import static org.elasticsearch.action.support.replication.ReplicationOperation.
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException; import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
/** Performs shard-level bulk (index, delete or update) operations */ /** Performs shard-level bulk (index, delete or update) operations */
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardResponse> { public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
public static final String ACTION_NAME = BulkAction.NAME + "[s]"; public static final String ACTION_NAME = BulkAction.NAME + "[s]";
@ -80,7 +80,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters, MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) { IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BulkShardRequest::new, ThreadPool.Names.BULK); indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.BULK);
this.updateHelper = updateHelper; this.updateHelper = updateHelper;
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.mappingUpdatedAction = mappingUpdatedAction; this.mappingUpdatedAction = mappingUpdatedAction;

View File

@ -49,7 +49,7 @@ import org.elasticsearch.transport.TransportService;
/** /**
* Performs the delete operation. * Performs the delete operation.
*/ */
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteResponse> { public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest,DeleteResponse> {
private final AutoCreateIndex autoCreateIndex; private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction; private final TransportCreateIndexAction createIndexAction;
@ -61,7 +61,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
IndexNameExpressionResolver indexNameExpressionResolver, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) { AutoCreateIndex autoCreateIndex) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, DeleteRequest::new, ThreadPool.Names.INDEX); indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction; this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex; this.autoCreateIndex = autoCreateIndex;
} }

View File

@ -60,7 +60,7 @@ import org.elasticsearch.transport.TransportService;
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>. * <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul> * </ul>
*/ */
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexResponse> { public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {
private final AutoCreateIndex autoCreateIndex; private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration; private final boolean allowIdGeneration;
@ -76,7 +76,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) { AutoCreateIndex autoCreateIndex) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, ThreadPool.Names.INDEX); actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
this.mappingUpdatedAction = mappingUpdatedAction; this.mappingUpdatedAction = mappingUpdatedAction;
this.createIndexAction = createIndexAction; this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex; this.autoCreateIndex = autoCreateIndex;

View File

@ -47,15 +47,16 @@ import java.util.function.Supplier;
*/ */
public abstract class TransportWriteAction< public abstract class TransportWriteAction<
Request extends ReplicatedWriteRequest<Request>, Request extends ReplicatedWriteRequest<Request>,
ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
Response extends ReplicationResponse & WriteResponse Response extends ReplicationResponse & WriteResponse
> extends TransportReplicationAction<Request, Request, Response> { > extends TransportReplicationAction<Request, ReplicaRequest, Response> {
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
String executor) { Supplier<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, request, executor); indexNameExpressionResolver, request, replicaRequest, executor);
} }
/** /**
@ -68,16 +69,16 @@ public abstract class TransportWriteAction<
* *
* @return the translog location of the {@linkplain IndexShard} after the write was completed or null if no write occurred * @return the translog location of the {@linkplain IndexShard} after the write was completed or null if no write occurred
*/ */
protected abstract Translog.Location onReplicaShard(Request request, IndexShard indexShard); protected abstract Translog.Location onReplicaShard(ReplicaRequest request, IndexShard indexShard);
@Override @Override
protected final WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception { protected final WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception {
WriteResult<Response> result = onPrimaryShard(request, primary); WriteResult<Response> result = onPrimaryShard(request, primary);
return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), primary); return new WritePrimaryResult(((ReplicaRequest) request), result.getResponse(), result.getLocation(), primary);
} }
@Override @Override
protected final WriteReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) {
Translog.Location location = onReplicaShard(request, replica); Translog.Location location = onReplicaShard(request, replica);
return new WriteReplicaResult(replica, request, location); return new WriteReplicaResult(replica, request, location);
} }
@ -110,7 +111,7 @@ public abstract class TransportWriteAction<
boolean finishedAsyncActions; boolean finishedAsyncActions;
ActionListener<Response> listener = null; ActionListener<Response> listener = null;
public WritePrimaryResult(Request request, Response finalResponse, public WritePrimaryResult(ReplicaRequest request, Response finalResponse,
@Nullable Translog.Location location, @Nullable Translog.Location location,
IndexShard indexShard) { IndexShard indexShard) {
super(request, finalResponse); super(request, finalResponse);

View File

@ -128,12 +128,12 @@ public class TransportWriteActionTests extends ESTestCase {
resultChecker.accept(listener.response, forcedRefresh); resultChecker.accept(listener.response, forcedRefresh);
} }
private class TestAction extends TransportWriteAction<TestRequest, TestResponse> { private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
protected TestAction() { protected TestAction() {
super(Settings.EMPTY, "test", super(Settings.EMPTY, "test",
new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null), null, null, null, new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null), null, null, null,
null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new,
ThreadPool.Names.SAME); TestRequest::new, ThreadPool.Names.SAME);
} }
@Override @Override