Remove obsolete resolving logic from TRA (#49685)

This stems from a time where index requests were directly forwarded to
TransportReplicationAction. Nowadays they are wrapped in a BulkShardRequest, and this logic is
obsolete.

In contrast to prior PR (#49647), this PR also fixes (see b3697cc) a situation where the previous
index expression logic had an interesting side effect. For bulk requests (which had resolveIndex
= false), the reroute phase was waiting for the index to appear in case where it was not present,
and for all other replication requests (resolveIndex = true) it would right away throw an
IndexNotFoundException while resolving the name and exit. With #49647, every replication
request was now waiting for the index to appear, which was problematic when the given index
had just been deleted (e.g. deleting a follower index while it's still receiving requests from the
leader, where these requests would now wait up to a minute for the index to appear). This PR
now adds b3697cc on top of that prior PR to make sure to reestablish some of the prior behavior
where the reroute phase waits for the bulk request for the index to appear. That logic was in
place to ensure that when an index was created and not all nodes had learned about it yet, that
the bulk would not fail somewhere in the reroute phase. This is now only restricted to the
situation where the current node has an older cluster state than the one that coordinated the
bulk request (which checks that the index is present). This also means that when an index is
deleted, we will no longer unnecessarily wait up to the timeout for the index o appear, and
instead fail the request.

Closes #20279
This commit is contained in:
Yannick Welsch 2019-11-29 11:01:24 +01:00
parent 4edb2e7bb6
commit c2d316a22f
27 changed files with 151 additions and 155 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -58,8 +57,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
final ClusterService clusterService, final IndicesService indicesService,
final ThreadPool threadPool, final ShardStateAction stateAction,
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
final ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters,
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -44,9 +43,9 @@ public class TransportShardFlushAction
@Inject
public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
actionFilters, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
}
@Override

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -46,9 +45,9 @@ public class TransportShardRefreshAction
@Inject
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
}
@Override

View File

@ -118,6 +118,11 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
return stringBuilder.toString();
}
@Override
protected BulkShardRequest routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
return super.routedBasedOnClusterVersion(routedBasedOnClusterVersion);
}
@Override
public void onRetry() {
for (BulkItemRequest item : items) {

View File

@ -490,6 +490,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}

View File

@ -43,7 +43,6 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
@ -91,10 +90,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
@Inject
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
@ -109,11 +107,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return new BulkShardResponse(in);
}
@Override
protected boolean resolveIndex() {
return false;
}
@Override
protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -55,10 +54,9 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
ShardStateAction shardStateAction, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
true /* we should never reject resync because of thread pool capacity on primary */);
}

View File

@ -169,7 +169,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
* Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}
*/
@SuppressWarnings("unchecked")
Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
protected Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
this.routedBasedOnClusterVersion = routedBasedOnClusterVersion;
return (Request) this;
}

View File

@ -41,10 +41,8 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -105,7 +103,6 @@ public abstract class TransportReplicationAction<
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
protected final IndicesService indicesService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected final TransportRequestOptions transportOptions;
protected final String executor;
@ -118,19 +115,17 @@ public abstract class TransportReplicationAction<
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
requestReader, replicaRequestReader, executor, false, false);
}
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
super(actionName, actionFilters, transportService.getTaskManager());
@ -139,7 +134,6 @@ public abstract class TransportReplicationAction<
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor;
this.transportPrimaryAction = actionName + "[p]";
@ -220,21 +214,10 @@ public abstract class TransportReplicationAction<
return null;
}
/**
* True if provided index should be resolved when resolving request
*/
protected boolean resolveIndex() {
return true;
}
protected TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
@ -649,8 +632,7 @@ public abstract class TransportReplicationAction<
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
final String concreteIndex = concreteIndex(state, request);
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
@ -659,23 +641,47 @@ public abstract class TransportReplicationAction<
finishAsFailed(blockException);
}
} else {
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
final IndexMetaData indexMetaData = state.metaData().index(request.shardId().getIndex());
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
// ensure that the cluster state on the node is at least as high as the node that decided that the index was there
if (state.version() < request.routedBasedOnClusterVersion()) {
logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
"Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
request.shardId().getIndex(), request, state.version(), request.routedBasedOnClusterVersion());
retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
"] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]",
request.shardId().getIndexName()));
return;
} else {
finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
return;
}
}
// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
finishAsFailed(new IndexClosedException(indexMetaData.getIndex()));
return;
}
if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
// if the wait for active shard count has not been set in the request,
// resolve it from the index settings
request.waitForActiveShards(indexMetaData.getWaitForActiveShards());
}
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
"request waitForActiveShards must be set in resolveRequest";
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
if (primary == null || primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
return;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
@ -719,27 +725,6 @@ public abstract class TransportReplicationAction<
performAction(node, actionName, false, request);
}
private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
if (primary == null || primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
return true;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
return true;
}
return false;
}
private ShardRouting primary(ClusterState state) {
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
return indexShard.primaryShard();
}
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
final TransportRequest requestToPerform) {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -61,11 +60,10 @@ public abstract class TransportWriteAction<
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
request, replicaRequest, executor, true, forceExecutionOnPrimary);
}
/** Syncs operation result to the translog or throws a shard not available failure */

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -64,8 +63,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
final ActionFilters actionFilters) {
super(
settings,
ACTION_NAME,
@ -75,7 +73,6 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -77,8 +76,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
final ActionFilters actionFilters) {
super(
settings,
ACTION_NAME,
@ -88,7 +86,6 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);

View File

@ -33,7 +33,6 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -74,8 +73,7 @@ public class RetentionLeaseSyncAction extends
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
final ActionFilters actionFilters) {
super(
settings,
ACTION_NAME,
@ -85,7 +83,6 @@ public class RetentionLeaseSyncAction extends
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT, false);

View File

@ -33,7 +33,6 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -119,7 +118,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, transportService, clusterService, mock(IndicesService.class),
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class));
}
@Override

View File

@ -20,13 +20,16 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -43,12 +46,19 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.isOneOf;
public class BulkIntegrationIT extends ESIntegTestCase {
@Override
@ -157,4 +167,38 @@ public class BulkIntegrationIT extends ESIntegTestCase {
assertTrue(acknowledgedResponse.isAcknowledged());
}
/** This test ensures that index deletion makes indexing fail quickly, not wait on the index that has disappeared */
public void testDeleteIndexWhileIndexing() throws Exception {
String index = "deleted_while_indexing";
createIndex(index);
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 4)];
AtomicInteger docID = new AtomicInteger();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (stopped.get() == false && docID.get() < 5000) {
String id = Integer.toString(docID.incrementAndGet());
try {
IndexResponse response = client().prepareIndex(index, "_doc").setId(id)
.setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()),
XContentType.JSON).get();
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo());
} catch (ElasticsearchException ignore) {
logger.info("--> fail to index id={}", id);
}
}
});
threads[i].start();
}
ensureGreen(index);
assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(1)));
assertAcked(client().admin().indices().prepareDelete(index));
stopped.set(true);
for (Thread thread : threads) {
thread.join(ReplicationRequest.DEFAULT_TIMEOUT.millis() / 2);
assertFalse(thread.isAlive());
}
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -143,9 +142,8 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
final IndicesService indexServices = mock(IndicesService.class);
when(indexServices.indexServiceSafe(eq(index))).thenReturn(indexService);
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), resolver);
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()));
assertThat(action.globalBlockLevel(), nullValue());
assertThat(action.indexBlockLevel(), nullValue());

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -108,9 +107,9 @@ public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCa
@Inject
public TestAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC);
Request::new, Request::new, ThreadPool.Names.GENERIC);
}
@Override

View File

@ -40,7 +40,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
@ -235,10 +234,12 @@ public class TransportReplicationActionTests extends ESTestCase {
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary("index", true, 0));
ShardId shardId = new ShardId(clusterService.state().metaData().index("index").getIndex(), 0);
{
setStateWithBlock(clusterService, nonRetryableBlock, globalBlock);
Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index");
Request request = new Request(shardId);
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
@ -253,7 +254,7 @@ public class TransportReplicationActionTests extends ESTestCase {
{
setStateWithBlock(clusterService, retryableBlock, globalBlock);
Request requestWithTimeout = (globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index")).timeout("5ms");
Request requestWithTimeout = (globalBlock ? new Request(shardId) : new Request(shardId)).timeout("5ms");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
@ -269,7 +270,7 @@ public class TransportReplicationActionTests extends ESTestCase {
{
setStateWithBlock(clusterService, retryableBlock, globalBlock);
Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index");
Request request = new Request(shardId);
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
@ -487,6 +488,7 @@ public class TransportReplicationActionTests extends ESTestCase {
setState(clusterService, state(index, true,
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
@ -495,7 +497,21 @@ public class TransportReplicationActionTests extends ESTestCase {
reroutePhase.run();
assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class);
assertPhase(task, "failed");
assertFalse(request.isRetrySet.get());
// try again with a request that is based on a newer cluster state, make sure we waited until that
// cluster state for the index to appear
request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
request.routedBasedOnClusterVersion(clusterService.state().version() + 1);
listener = new PlainActionFuture<>();
task = maybeTask();
reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class);
assertPhase(task, "failed");
assertTrue(request.isRetrySet.get());
request = new Request(new ShardId(index, "_na_", 10)).timeout("1ms");
listener = new PlainActionFuture<>();
reroutePhase = action.new ReroutePhase(null, request, listener);
@ -513,7 +529,7 @@ public class TransportReplicationActionTests extends ESTestCase {
new CloseIndexRequest(index)));
assertThat(clusterService.state().metaData().indices().get(index).getState(), equalTo(IndexMetaData.State.CLOSE));
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms");
Request request = new Request(new ShardId(clusterService.state().metaData().indices().get(index).getIndex(), 0)).timeout("1ms");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
@ -1245,7 +1261,7 @@ public class TransportReplicationActionTests extends ESTestCase {
ThreadPool threadPool, IndicesService indicesService) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
new ActionFilters(new HashSet<>()),
Request::new, Request::new, ThreadPool.Names.SAME);
}
@ -1267,11 +1283,6 @@ public class TransportReplicationActionTests extends ESTestCase {
request.processedOnReplicas.incrementAndGet();
return new ReplicaResult();
}
@Override
protected boolean resolveIndex() {
return false;
}
}
private IndicesService mockIndicesService(ClusterService clusterService) {

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@ -418,7 +417,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
SetOnce<Boolean> executedOnPrimary) {
super(settings, actionName, transportService, clusterService, mockIndicesService(shardId, executedOnPrimary, primary, replica),
threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME);
new ActionFilters(new HashSet<>()), Request::new, Request::new, ThreadPool.Names.SAME);
this.shardId = Objects.requireNonNull(shardId);
this.primary = Objects.requireNonNull(primary);
assertEquals(shardId, primary.shardId());

View File

@ -31,7 +31,6 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -399,8 +398,7 @@ public class TransportWriteActionTests extends ESTestCase {
super(Settings.EMPTY, "internal:test",
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null, null, Collections.emptySet()), null, null, null, null,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
TestRequest::new, ThreadPool.Names.SAME, false);
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
@ -409,8 +407,7 @@ public class TransportWriteActionTests extends ESTestCase {
ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService,
mockIndicesService(clusterService), threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
}

View File

@ -20,7 +20,6 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
@ -110,8 +109,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
new ActionFilters(Collections.emptySet()));
final GlobalCheckpointSyncAction.Request primaryRequest = new GlobalCheckpointSyncAction.Request(indexShard.shardId());
if (randomBoolean()) {
action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {}));

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -121,8 +120,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
new ActionFilters(Collections.emptySet()));
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseBackgroundSyncAction.Request request =
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
@ -159,8 +157,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
new ActionFilters(Collections.emptySet()));
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseBackgroundSyncAction.Request request =
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
@ -201,8 +198,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver()) {
new ActionFilters(Collections.emptySet())) {
@Override
protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
@ -265,8 +261,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
new ActionFilters(Collections.emptySet()));
assertNull(action.indexBlockLevel());
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
@ -112,8 +111,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
new ActionFilters(Collections.emptySet()));
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
action.shardOperationOnPrimary(request, indexShard,
@ -149,8 +147,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
new ActionFilters(Collections.emptySet()));
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
@ -191,8 +188,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver()) {
new ActionFilters(Collections.emptySet())) {
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
@ -251,8 +247,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
new ActionFilters(Collections.emptySet()));
assertNull(action.indexBlockLevel());
}

View File

@ -191,7 +191,7 @@ public class ClusterStateChanges {
};
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction = new TransportVerifyShardBeforeCloseAction(SETTINGS,
transportService, clusterService, indicesService, threadPool, null, actionFilters, indexNameExpressionResolver);
transportService, clusterService, indicesService, threadPool, null, actionFilters);
MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(clusterService, allocationService,
metaDataIndexUpgradeService, indicesService, threadPool, transportVerifyShardBeforeCloseAction);
MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(SETTINGS, clusterService, allocationService);

View File

@ -1128,8 +1128,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver)),
actionFilters)),
new GlobalCheckpointSyncAction(
settings,
transportService,
@ -1137,8 +1136,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver),
actionFilters),
new RetentionLeaseSyncAction(
settings,
transportService,
@ -1146,8 +1144,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver),
actionFilters),
new RetentionLeaseBackgroundSyncAction(
settings,
transportService,
@ -1155,8 +1152,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver));
actionFilters));
Map<ActionType, TransportAction> actions = new HashMap<>();
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService,
indicesService,
@ -1172,7 +1168,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
mappingUpdatedAction.setClient(client);
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
actionFilters, indexNameExpressionResolver);
actionFilters);
actions.put(BulkAction.INSTANCE,
new TransportBulkAction(threadPool, transportService, clusterService,
new IngestService(

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
@ -44,8 +43,7 @@ public class TransportBulkShardOperationsAction
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
final ActionFilters actionFilters) {
super(
settings,
BulkShardOperationsAction.NAME,
@ -55,7 +53,6 @@ public class TransportBulkShardOperationsAction
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
BulkShardOperationsRequest::new,
BulkShardOperationsRequest::new,
ThreadPool.Names.WRITE, false);

View File

@ -411,7 +411,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
numNodeTasks++;
}
}
assertThat(numNodeTasks, equalTo(0));
assertThat(listTasksResponse.getTasks().toString(), numNodeTasks, equalTo(0));
}, 30, TimeUnit.SECONDS);
}