Remove obsolete resolving logic from TRA (#49647)
This stems from a time where index requests were directly forwarded to TransportReplicationAction. Nowadays they are wrapped in a BulkShardRequest, and this logic is obsolete. Closes #20279
This commit is contained in:
parent
35732504ba
commit
0827ea2175
|
@ -31,7 +31,6 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -58,8 +57,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
|
|||
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
|
||||
final ClusterService clusterService, final IndicesService indicesService,
|
||||
final ThreadPool threadPool, final ShardStateAction stateAction,
|
||||
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
|
||||
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
|
||||
final ActionFilters actionFilters) {
|
||||
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters,
|
||||
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -44,9 +43,9 @@ public class TransportShardFlushAction
|
|||
@Inject
|
||||
public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
ActionFilters actionFilters) {
|
||||
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
||||
actionFilters, indexNameExpressionResolver, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
|
||||
actionFilters, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.action.support.replication.BasicReplicationRequest;
|
|||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -46,9 +45,9 @@ public class TransportShardRefreshAction
|
|||
@Inject
|
||||
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
ActionFilters actionFilters) {
|
||||
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
|
||||
BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.elasticsearch.cluster.ClusterStateObserver;
|
|||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -91,10 +90,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
@Inject
|
||||
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
|
||||
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
|
||||
this.updateHelper = updateHelper;
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
}
|
||||
|
@ -109,11 +107,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
return new BulkShardResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean resolveIndex() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary,
|
||||
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
|||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -55,10 +54,9 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
|
|||
@Inject
|
||||
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
|
||||
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
|
||||
true /* we should never reject resync because of thread pool capacity on primary */);
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.AllocationId;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
|
@ -105,7 +104,6 @@ public abstract class TransportReplicationAction<
|
|||
protected final ClusterService clusterService;
|
||||
protected final ShardStateAction shardStateAction;
|
||||
protected final IndicesService indicesService;
|
||||
protected final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
protected final TransportRequestOptions transportOptions;
|
||||
protected final String executor;
|
||||
|
||||
|
@ -118,19 +116,17 @@ public abstract class TransportReplicationAction<
|
|||
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService,
|
||||
ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
|
||||
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
|
||||
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
|
||||
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
|
||||
requestReader, replicaRequestReader, executor, false, false);
|
||||
}
|
||||
|
||||
|
||||
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService,
|
||||
ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
|
||||
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
|
||||
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
|
||||
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
|
||||
super(actionName, actionFilters, transportService.getTaskManager());
|
||||
|
@ -139,7 +135,6 @@ public abstract class TransportReplicationAction<
|
|||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
this.shardStateAction = shardStateAction;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.executor = executor;
|
||||
|
||||
this.transportPrimaryAction = actionName + "[p]";
|
||||
|
@ -220,21 +215,10 @@ public abstract class TransportReplicationAction<
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* True if provided index should be resolved when resolving request
|
||||
*/
|
||||
protected boolean resolveIndex() {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected TransportRequestOptions transportOptions(Settings settings) {
|
||||
return TransportRequestOptions.EMPTY;
|
||||
}
|
||||
|
||||
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
|
||||
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
|
||||
}
|
||||
|
||||
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
|
||||
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
|
||||
if (globalBlockLevel != null) {
|
||||
|
@ -649,7 +633,7 @@ public abstract class TransportReplicationAction<
|
|||
protected void doRun() {
|
||||
setPhase(task, "routing");
|
||||
final ClusterState state = observer.setAndGetObservedState();
|
||||
final String concreteIndex = concreteIndex(state, request);
|
||||
final String concreteIndex = request.shardId().getIndexName();
|
||||
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
|
||||
if (blockException != null) {
|
||||
if (blockException.retryable()) {
|
||||
|
@ -659,7 +643,6 @@ public abstract class TransportReplicationAction<
|
|||
finishAsFailed(blockException);
|
||||
}
|
||||
} else {
|
||||
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
|
||||
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
|
||||
if (indexMetaData == null) {
|
||||
retry(new IndexNotFoundException(concreteIndex));
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.action.support.WriteRequest;
|
|||
import org.elasticsearch.action.support.WriteResponse;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -61,11 +60,10 @@ public abstract class TransportWriteAction<
|
|||
|
||||
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
|
||||
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
|
||||
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
|
||||
request, replicaRequest, executor, true, forceExecutionOnPrimary);
|
||||
}
|
||||
|
||||
/** Syncs operation result to the translog or throws a shard not available failure */
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
|
|||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -64,8 +63,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
|
|||
final IndicesService indicesService,
|
||||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
final ActionFilters actionFilters) {
|
||||
super(
|
||||
settings,
|
||||
ACTION_NAME,
|
||||
|
@ -75,7 +73,6 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
Request::new,
|
||||
Request::new,
|
||||
ThreadPool.Names.MANAGEMENT);
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
|
|||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -77,8 +76,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
|
|||
final IndicesService indicesService,
|
||||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
final ActionFilters actionFilters) {
|
||||
super(
|
||||
settings,
|
||||
ACTION_NAME,
|
||||
|
@ -88,7 +86,6 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
Request::new,
|
||||
Request::new,
|
||||
ThreadPool.Names.MANAGEMENT);
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
|
|||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -74,8 +73,7 @@ public class RetentionLeaseSyncAction extends
|
|||
final IndicesService indicesService,
|
||||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
final ActionFilters actionFilters) {
|
||||
super(
|
||||
settings,
|
||||
ACTION_NAME,
|
||||
|
@ -85,7 +83,6 @@ public class RetentionLeaseSyncAction extends
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
ThreadPool.Names.MANAGEMENT, false);
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
|||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -119,7 +118,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
|
|||
|
||||
ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
|
||||
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, transportService, clusterService, mock(IndicesService.class),
|
||||
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
|
||||
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
|||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
|
@ -143,9 +142,8 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
|
|||
final IndicesService indexServices = mock(IndicesService.class);
|
||||
when(indexServices.indexServiceSafe(eq(index))).thenReturn(indexService);
|
||||
|
||||
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
|
||||
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), resolver);
|
||||
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()));
|
||||
|
||||
assertThat(action.globalBlockLevel(), nullValue());
|
||||
assertThat(action.indexBlockLevel(), nullValue());
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.action.ActionType;
|
|||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
|
@ -108,9 +107,9 @@ public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCa
|
|||
@Inject
|
||||
public TestAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
ActionFilters actionFilters) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC);
|
||||
Request::new, Request::new, ThreadPool.Names.GENERIC);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.AllocationId;
|
||||
|
@ -235,10 +234,12 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary("index", true, 0));
|
||||
|
||||
ShardId shardId = new ShardId(clusterService.state().metaData().index("index").getIndex(), 0);
|
||||
|
||||
{
|
||||
setStateWithBlock(clusterService, nonRetryableBlock, globalBlock);
|
||||
|
||||
Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index");
|
||||
Request request = new Request(shardId);
|
||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
|
@ -253,7 +254,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
{
|
||||
setStateWithBlock(clusterService, retryableBlock, globalBlock);
|
||||
|
||||
Request requestWithTimeout = (globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index")).timeout("5ms");
|
||||
Request requestWithTimeout = (globalBlock ? new Request(shardId) : new Request(shardId)).timeout("5ms");
|
||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
|
@ -269,7 +270,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
{
|
||||
setStateWithBlock(clusterService, retryableBlock, globalBlock);
|
||||
|
||||
Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index");
|
||||
Request request = new Request(shardId);
|
||||
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
|
@ -1245,7 +1246,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
ThreadPool threadPool, IndicesService indicesService) {
|
||||
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
|
||||
new ActionFilters(new HashSet<>()),
|
||||
Request::new, Request::new, ThreadPool.Names.SAME);
|
||||
}
|
||||
|
||||
|
@ -1267,11 +1268,6 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
request.processedOnReplicas.incrementAndGet();
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean resolveIndex() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private IndicesService mockIndicesService(ClusterService clusterService) {
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
|
@ -418,7 +417,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
|
|||
SetOnce<Boolean> executedOnPrimary) {
|
||||
super(settings, actionName, transportService, clusterService, mockIndicesService(shardId, executedOnPrimary, primary, replica),
|
||||
threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME);
|
||||
new ActionFilters(new HashSet<>()), Request::new, Request::new, ThreadPool.Names.SAME);
|
||||
this.shardId = Objects.requireNonNull(shardId);
|
||||
this.primary = Objects.requireNonNull(primary);
|
||||
assertEquals(shardId, primary.shardId());
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -399,8 +398,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
super(Settings.EMPTY, "internal:test",
|
||||
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> null, null, Collections.emptySet()), null, null, null, null,
|
||||
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
|
||||
TestRequest::new, ThreadPool.Names.SAME, false);
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
|
||||
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
|
||||
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
|
||||
}
|
||||
|
@ -409,8 +407,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
|
||||
super(settings, actionName, transportService, clusterService,
|
||||
mockIndicesService(clusterService), threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
|
||||
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
|
||||
this.withDocumentFailureOnPrimary = false;
|
||||
this.withDocumentFailureOnReplica = false;
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.elasticsearch.index.seqno;
|
|||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActionTestUtils;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
@ -110,8 +109,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
new ActionFilters(Collections.emptySet()));
|
||||
final GlobalCheckpointSyncAction.Request primaryRequest = new GlobalCheckpointSyncAction.Request(indexShard.shardId());
|
||||
if (randomBoolean()) {
|
||||
action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {}));
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.action.support.ActionTestUtils;
|
|||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -121,8 +120,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
new ActionFilters(Collections.emptySet()));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseBackgroundSyncAction.Request request =
|
||||
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
|
@ -159,8 +157,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
new ActionFilters(Collections.emptySet()));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseBackgroundSyncAction.Request request =
|
||||
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
|
@ -201,8 +198,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver()) {
|
||||
new ActionFilters(Collections.emptySet())) {
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
|
||||
|
@ -265,8 +261,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
new ActionFilters(Collections.emptySet()));
|
||||
|
||||
assertNull(action.indexBlockLevel());
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.action.support.ActionTestUtils;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
@ -112,8 +111,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
new ActionFilters(Collections.emptySet()));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
action.shardOperationOnPrimary(request, indexShard,
|
||||
|
@ -149,8 +147,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
new ActionFilters(Collections.emptySet()));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
|
||||
|
@ -191,8 +188,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver()) {
|
||||
new ActionFilters(Collections.emptySet())) {
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
|
@ -251,8 +247,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
new ActionFilters(Collections.emptySet()));
|
||||
|
||||
assertNull(action.indexBlockLevel());
|
||||
}
|
||||
|
|
|
@ -191,7 +191,7 @@ public class ClusterStateChanges {
|
|||
};
|
||||
|
||||
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction = new TransportVerifyShardBeforeCloseAction(SETTINGS,
|
||||
transportService, clusterService, indicesService, threadPool, null, actionFilters, indexNameExpressionResolver);
|
||||
transportService, clusterService, indicesService, threadPool, null, actionFilters);
|
||||
MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(clusterService, allocationService,
|
||||
metaDataIndexUpgradeService, indicesService, threadPool, transportVerifyShardBeforeCloseAction);
|
||||
MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(SETTINGS, clusterService, allocationService);
|
||||
|
|
|
@ -1128,8 +1128,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver)),
|
||||
actionFilters)),
|
||||
new GlobalCheckpointSyncAction(
|
||||
settings,
|
||||
transportService,
|
||||
|
@ -1137,8 +1136,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver),
|
||||
actionFilters),
|
||||
new RetentionLeaseSyncAction(
|
||||
settings,
|
||||
transportService,
|
||||
|
@ -1146,8 +1144,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver),
|
||||
actionFilters),
|
||||
new RetentionLeaseBackgroundSyncAction(
|
||||
settings,
|
||||
transportService,
|
||||
|
@ -1155,9 +1152,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver));
|
||||
Map<ActionType, TransportAction> actions = new HashMap<>();
|
||||
actionFilters));
|
||||
Map<ActionType, TransportAction> actions = new HashMap<>();
|
||||
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService,
|
||||
indicesService,
|
||||
allocationService, new AliasValidator(), environment, indexScopedSettings,
|
||||
|
@ -1170,9 +1166,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
));
|
||||
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
|
||||
mappingUpdatedAction.setClient(client);
|
||||
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
|
||||
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
|
||||
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
|
||||
actionFilters, indexNameExpressionResolver);
|
||||
actionFilters);
|
||||
actions.put(BulkAction.INSTANCE,
|
||||
new TransportBulkAction(threadPool, transportService, clusterService,
|
||||
new IngestService(
|
||||
|
|
|
@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -44,8 +43,7 @@ public class TransportBulkShardOperationsAction
|
|||
final IndicesService indicesService,
|
||||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
final ActionFilters actionFilters) {
|
||||
super(
|
||||
settings,
|
||||
BulkShardOperationsAction.NAME,
|
||||
|
@ -55,7 +53,6 @@ public class TransportBulkShardOperationsAction
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
BulkShardOperationsRequest::new,
|
||||
BulkShardOperationsRequest::new,
|
||||
ThreadPool.Names.WRITE, false);
|
||||
|
|
Loading…
Reference in New Issue