Inline TransportReplAction#registerRequestHandlers (#40762)
It is important that resync actions are not rejected on the primary even if its `write` threadpool is overloaded. Today we do this by exposing `registerRequestHandlers` to subclasses and overriding it in `TransportResyncReplicationAction`. This isn't ideal because it obscures the difference between this action and other replication actions, and also might allow subclasses to try and use some state before they are properly initialised. This change replaces this override with a constructor parameter to solve these issues. Relates #40706
This commit is contained in:
parent
31e79a73d7
commit
1d2bc85586
|
@ -92,7 +92,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE);
|
||||
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
|
||||
this.threadPool = threadPool;
|
||||
this.updateHelper = updateHelper;
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.elasticsearch.transport.TransportResponseHandler;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
|
||||
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
|
||||
|
@ -61,22 +60,8 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
|
|||
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
|
||||
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
|
||||
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
|
||||
// we should never reject resync because of thread pool capacity on primary
|
||||
transportService.registerRequestHandler(transportPrimaryAction,
|
||||
() -> new ConcreteShardRequest<>(request),
|
||||
executor, true, true,
|
||||
this::handlePrimaryRequest);
|
||||
transportService.registerRequestHandler(transportReplicaAction,
|
||||
() -> new ConcreteReplicaRequest<>(replicaRequest),
|
||||
executor, true, true,
|
||||
this::handleReplicaRequest);
|
||||
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
|
||||
true /* we should never reject resync because of thread pool capacity on primary */);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -123,7 +123,7 @@ public abstract class TransportReplicationAction<
|
|||
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||
Supplier<ReplicaRequest> replicaRequest, String executor) {
|
||||
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, request, replicaRequest, executor, false);
|
||||
indexNameExpressionResolver, request, replicaRequest, executor, false, false);
|
||||
}
|
||||
|
||||
|
||||
|
@ -133,7 +133,7 @@ public abstract class TransportReplicationAction<
|
|||
ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||
Supplier<ReplicaRequest> replicaRequest, String executor,
|
||||
boolean syncGlobalCheckpointAfterOperation) {
|
||||
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
|
||||
super(actionName, actionFilters, transportService.getTaskManager());
|
||||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
|
@ -145,23 +145,21 @@ public abstract class TransportReplicationAction<
|
|||
|
||||
this.transportPrimaryAction = actionName + "[p]";
|
||||
this.transportReplicaAction = actionName + "[r]";
|
||||
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
|
||||
|
||||
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
|
||||
|
||||
transportService.registerRequestHandler(transportPrimaryAction,
|
||||
() -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest);
|
||||
|
||||
// we must never reject on because of thread pool capacity on replicas
|
||||
transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest),
|
||||
executor, true, true, this::handleReplicaRequest);
|
||||
|
||||
this.transportOptions = transportOptions(settings);
|
||||
|
||||
this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
|
||||
}
|
||||
|
||||
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
|
||||
Supplier<ReplicaRequest> replicaRequest, String executor) {
|
||||
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
|
||||
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
|
||||
this::handlePrimaryRequest);
|
||||
// we must never reject on because of thread pool capacity on replicas
|
||||
transportService.registerRequestHandler(
|
||||
transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
assert request.shardId() != null : "request shardId must be set";
|
||||
|
|
|
@ -60,11 +60,12 @@ public abstract class TransportWriteAction<
|
|||
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
|
||||
|
||||
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||
Supplier<ReplicaRequest> replicaRequest, String executor) {
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||
Supplier<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
|
||||
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, request, replicaRequest, executor, true);
|
||||
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
|
||||
}
|
||||
|
||||
/** Syncs operation result to the translog or throws a shard not available failure */
|
||||
|
|
|
@ -88,7 +88,7 @@ public class RetentionLeaseSyncAction extends
|
|||
indexNameExpressionResolver,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
ThreadPool.Names.MANAGEMENT);
|
||||
ThreadPool.Names.MANAGEMENT, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -402,7 +402,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> null, null, Collections.emptySet()), null, null, null, null,
|
||||
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
|
||||
TestRequest::new, ThreadPool.Names.SAME);
|
||||
TestRequest::new, ThreadPool.Names.SAME, false);
|
||||
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
|
||||
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
|
||||
}
|
||||
|
@ -412,7 +412,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
super(settings, actionName, transportService, clusterService,
|
||||
mockIndicesService(clusterService), threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
|
||||
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
|
||||
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
|
||||
this.withDocumentFailureOnPrimary = false;
|
||||
this.withDocumentFailureOnReplica = false;
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ public class TransportBulkShardOperationsAction
|
|||
indexNameExpressionResolver,
|
||||
BulkShardOperationsRequest::new,
|
||||
BulkShardOperationsRequest::new,
|
||||
ThreadPool.Names.WRITE);
|
||||
ThreadPool.Names.WRITE, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue