Revert "Remove obsolete resolving logic from TRA (#49647)"

This reverts commit 0827ea2175.
This commit is contained in:
Yannick Welsch 2019-11-28 13:12:07 +01:00
parent 0827ea2175
commit 04e9cbd6eb
22 changed files with 121 additions and 50 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -57,8 +58,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
final ClusterService clusterService, final IndicesService indicesService,
final ThreadPool threadPool, final ShardStateAction stateAction,
final ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters,
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -43,9 +44,9 @@ public class TransportShardFlushAction
@Inject
public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
actionFilters, indexNameExpressionResolver, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -45,9 +46,9 @@ public class TransportShardRefreshAction
@Inject
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
}
@Override

View File

@ -43,6 +43,7 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
@ -90,9 +91,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
@Inject
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
@ -107,6 +109,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return new BulkShardResponse(in);
}
@Override
protected boolean resolveIndex() {
return false;
}
@Override
protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -54,9 +55,10 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters) {
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
true /* we should never reject resync because of thread pool capacity on primary */);
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -104,6 +105,7 @@ public abstract class TransportReplicationAction<
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
protected final IndicesService indicesService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected final TransportRequestOptions transportOptions;
protected final String executor;
@ -116,17 +118,19 @@ public abstract class TransportReplicationAction<
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
requestReader, replicaRequestReader, executor, false, false);
indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
}
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
super(actionName, actionFilters, transportService.getTaskManager());
@ -135,6 +139,7 @@ public abstract class TransportReplicationAction<
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor;
this.transportPrimaryAction = actionName + "[p]";
@ -215,10 +220,21 @@ public abstract class TransportReplicationAction<
return null;
}
/**
* True if provided index should be resolved when resolving request
*/
protected boolean resolveIndex() {
return true;
}
protected TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
@ -633,7 +649,7 @@ public abstract class TransportReplicationAction<
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
final String concreteIndex = request.shardId().getIndexName();
final String concreteIndex = concreteIndex(state, request);
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
if (blockException != null) {
if (blockException.retryable()) {
@ -643,6 +659,7 @@ public abstract class TransportReplicationAction<
finishAsFailed(blockException);
}
} else {
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -60,10 +61,11 @@ public abstract class TransportWriteAction<
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
request, replicaRequest, executor, true, forceExecutionOnPrimary);
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
}
/** Syncs operation result to the translog or throws a shard not available failure */

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -63,7 +64,8 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
@ -73,6 +75,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -76,7 +77,8 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
@ -86,6 +88,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -73,7 +74,8 @@ public class RetentionLeaseSyncAction extends
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
@ -83,6 +85,7 @@ public class RetentionLeaseSyncAction extends
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT, false);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -118,7 +119,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, transportService, clusterService, mock(IndicesService.class),
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class));
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
}
@Override

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -142,8 +143,9 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
final IndicesService indexServices = mock(IndicesService.class);
when(indexServices.indexServiceSafe(eq(index))).thenReturn(indexService);
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()));
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), resolver);
assertThat(action.globalBlockLevel(), nullValue());
assertThat(action.indexBlockLevel(), nullValue());

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -107,9 +108,9 @@ public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCa
@Inject
public TestAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
Request::new, Request::new, ThreadPool.Names.GENERIC);
indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC);
}
@Override

View File

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
@ -234,12 +235,10 @@ public class TransportReplicationActionTests extends ESTestCase {
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary("index", true, 0));
ShardId shardId = new ShardId(clusterService.state().metaData().index("index").getIndex(), 0);
{
setStateWithBlock(clusterService, nonRetryableBlock, globalBlock);
Request request = new Request(shardId);
Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
@ -254,7 +253,7 @@ public class TransportReplicationActionTests extends ESTestCase {
{
setStateWithBlock(clusterService, retryableBlock, globalBlock);
Request requestWithTimeout = (globalBlock ? new Request(shardId) : new Request(shardId)).timeout("5ms");
Request requestWithTimeout = (globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index")).timeout("5ms");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
@ -270,7 +269,7 @@ public class TransportReplicationActionTests extends ESTestCase {
{
setStateWithBlock(clusterService, retryableBlock, globalBlock);
Request request = new Request(shardId);
Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
@ -1246,7 +1245,7 @@ public class TransportReplicationActionTests extends ESTestCase {
ThreadPool threadPool, IndicesService indicesService) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()),
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
Request::new, Request::new, ThreadPool.Names.SAME);
}
@ -1268,6 +1267,11 @@ public class TransportReplicationActionTests extends ESTestCase {
request.processedOnReplicas.incrementAndGet();
return new ReplicaResult();
}
@Override
protected boolean resolveIndex() {
return false;
}
}
private IndicesService mockIndicesService(ClusterService clusterService) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@ -417,7 +418,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
SetOnce<Boolean> executedOnPrimary) {
super(settings, actionName, transportService, clusterService, mockIndicesService(shardId, executedOnPrimary, primary, replica),
threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), Request::new, Request::new, ThreadPool.Names.SAME);
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME);
this.shardId = Objects.requireNonNull(shardId);
this.primary = Objects.requireNonNull(primary);
assertEquals(shardId, primary.shardId());

View File

@ -31,6 +31,7 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -398,7 +399,8 @@ public class TransportWriteActionTests extends ESTestCase {
super(Settings.EMPTY, "internal:test",
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null, null, Collections.emptySet()), null, null, null, null,
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
@ -407,7 +409,8 @@ public class TransportWriteActionTests extends ESTestCase {
ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService,
mockIndicesService(clusterService), threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
@ -109,7 +110,8 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
final GlobalCheckpointSyncAction.Request primaryRequest = new GlobalCheckpointSyncAction.Request(indexShard.shardId());
if (randomBoolean()) {
action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {}));

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -120,7 +121,8 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseBackgroundSyncAction.Request request =
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
@ -157,7 +159,8 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseBackgroundSyncAction.Request request =
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
@ -198,7 +201,8 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet())) {
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver()) {
@Override
protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
@ -261,7 +265,8 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
assertNull(action.indexBlockLevel());
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
@ -111,7 +112,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
action.shardOperationOnPrimary(request, indexShard,
@ -147,7 +149,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
@ -188,7 +191,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet())) {
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver()) {
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
@ -247,7 +251,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
assertNull(action.indexBlockLevel());
}

View File

@ -191,7 +191,7 @@ public class ClusterStateChanges {
};
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction = new TransportVerifyShardBeforeCloseAction(SETTINGS,
transportService, clusterService, indicesService, threadPool, null, actionFilters);
transportService, clusterService, indicesService, threadPool, null, actionFilters, indexNameExpressionResolver);
MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(clusterService, allocationService,
metaDataIndexUpgradeService, indicesService, threadPool, transportVerifyShardBeforeCloseAction);
MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(SETTINGS, clusterService, allocationService);

View File

@ -1128,7 +1128,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters)),
actionFilters,
indexNameExpressionResolver)),
new GlobalCheckpointSyncAction(
settings,
transportService,
@ -1136,7 +1137,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters),
actionFilters,
indexNameExpressionResolver),
new RetentionLeaseSyncAction(
settings,
transportService,
@ -1144,7 +1146,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters),
actionFilters,
indexNameExpressionResolver),
new RetentionLeaseBackgroundSyncAction(
settings,
transportService,
@ -1152,8 +1155,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters));
Map<ActionType, TransportAction> actions = new HashMap<>();
actionFilters,
indexNameExpressionResolver));
Map<ActionType, TransportAction> actions = new HashMap<>();
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService,
indicesService,
allocationService, new AliasValidator(), environment, indexScopedSettings,
@ -1166,9 +1170,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
));
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
mappingUpdatedAction.setClient(client);
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
actionFilters);
actionFilters, indexNameExpressionResolver);
actions.put(BulkAction.INSTANCE,
new TransportBulkAction(threadPool, transportService, clusterService,
new IngestService(

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
@ -43,7 +44,8 @@ public class TransportBulkShardOperationsAction
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
BulkShardOperationsAction.NAME,
@ -53,6 +55,7 @@ public class TransportBulkShardOperationsAction
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
BulkShardOperationsRequest::new,
BulkShardOperationsRequest::new,
ThreadPool.Names.WRITE, false);