[Close Index API] Propagate tasks ids between Freeze, Close and Verify Shard actions (#36630)

This pull request changes the Freeze Index and Close Index actions so 
that these actions always requires a Task. The task's id is then propagated 
from the Freeze action to the Close action, and then to the Verify shard action. 
This way it is possible to track which Freeze task initiates the closing of an index, 
and which consecutive verifiy shard are executed for the index closing.
This commit is contained in:
Tanguy Leroux 2019-01-07 09:43:50 +01:00 committed by GitHub
parent bd2af2c400
commit 19593884ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 42 additions and 22 deletions

View File

@ -25,7 +25,13 @@ import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
*/
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {
public CloseIndexClusterStateUpdateRequest() {
private final long taskId;
public CloseIndexClusterStateUpdateRequest(final long taskId) {
this.taskId = taskId;
}
public long taskId() {
return taskId;
}
}

View File

@ -99,13 +99,19 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
@Override
protected void masterOperation(final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}
@Override
protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest()
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -141,8 +142,9 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
ShardRequest(){
}
public ShardRequest(final ShardId shardId) {
public ShardRequest(final ShardId shardId, final TaskId parentTaskId) {
super(shardId);
setParentTask(parentTaskId);
}
@Override

View File

@ -46,7 +46,6 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
@ -63,6 +62,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
@ -120,9 +120,6 @@ public class MetaDataIndexStateService {
throw new IllegalArgumentException("Index name is required");
}
final TimeValue timeout = request.ackTimeout();
final TimeValue masterTimeout = request.masterNodeTimeout();
clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT) {
@ -141,7 +138,7 @@ public class MetaDataIndexStateService {
} else {
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(new WaitForClosedBlocksApplied(blockedIndices, timeout,
.execute(new WaitForClosedBlocksApplied(blockedIndices, request,
ActionListener.wrap(closedBlocksResults ->
clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
@ -176,7 +173,7 @@ public class MetaDataIndexStateService {
@Override
public TimeValue timeout() {
return masterTimeout;
return request.masterNodeTimeout();
}
}
);
@ -246,18 +243,18 @@ public class MetaDataIndexStateService {
class WaitForClosedBlocksApplied extends AbstractRunnable {
private final Set<Index> blockedIndices;
private final @Nullable TimeValue timeout;
private final CloseIndexClusterStateUpdateRequest request;
private final ActionListener<Map<Index, AcknowledgedResponse>> listener;
private WaitForClosedBlocksApplied(final Set<Index> blockedIndices,
final @Nullable TimeValue timeout,
final CloseIndexClusterStateUpdateRequest request,
final ActionListener<Map<Index, AcknowledgedResponse>> listener) {
if (blockedIndices == null || blockedIndices.isEmpty()) {
throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices");
}
this.blockedIndices = blockedIndices;
this.request = request;
this.listener = listener;
this.timeout = timeout;
}
@Override
@ -271,7 +268,7 @@ public class MetaDataIndexStateService {
final CountDown countDown = new CountDown(blockedIndices.size());
final ClusterState state = clusterService.state();
for (Index blockedIndex : blockedIndices) {
waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> {
waitForShardsReadyForClosing(blockedIndex, state, response -> {
results.put(blockedIndex, response);
if (countDown.countDown()) {
listener.onResponse(unmodifiableMap(results));
@ -280,7 +277,7 @@ public class MetaDataIndexStateService {
}
}
private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout,
private void waitForShardsReadyForClosing(final Index index, final ClusterState state,
final Consumer<AcknowledgedResponse> onResponse) {
final IndexMetaData indexMetaData = state.metaData().index(index);
if (indexMetaData == null) {
@ -302,7 +299,7 @@ public class MetaDataIndexStateService {
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
final IndexShardRoutingTable shardRoutingTable = shard.value;
final ShardId shardId = shardRoutingTable.shardId();
sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener<ReplicationResponse>() {
sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener<ReplicationResponse>() {
@Override
public void innerOnResponse(final ReplicationResponse replicationResponse) {
ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo();
@ -326,7 +323,7 @@ public class MetaDataIndexStateService {
}
}
private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, @Nullable final TimeValue timeout,
private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable,
final ActionListener<ReplicationResponse> listener) {
final ShardId shardId = shardRoutingTable.shardId();
if (shardRoutingTable.primaryShard().unassigned()) {
@ -336,10 +333,11 @@ public class MetaDataIndexStateService {
listener.onResponse(response);
return;
}
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId());
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
if (timeout != null) {
shardRequest.timeout(timeout);
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, parentTaskId);
if (request.ackTimeout() != null) {
shardRequest.timeout(request.ackTimeout());
}
// TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests
transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
@ -130,7 +131,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
private void executeOnPrimaryOrReplica() throws Exception {
final TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId());
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), new TaskId("_node_id", randomNonNegativeLong()));
if (randomBoolean()) {
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
} else {
@ -204,7 +205,8 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0));
final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, new TaskId(clusterService.localNode().getId(), 0L));
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy(primaryTerm);
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation =

View File

@ -109,13 +109,19 @@ public final class TransportFreezeIndexAction extends
@Override
protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener<FreezeResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}
@Override
protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeRequest request, ClusterState state,
ActionListener<TransportFreezeIndexAction.FreezeResponse> listener) throws Exception {
final Index[] concreteIndices = resolveIndices(request, state);
if (concreteIndices.length == 0) {
listener.onResponse(new FreezeResponse(true, true));
return;
}
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest()
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);