[Close Index API] Add TransportShardCloseAction for pre-closing verifications (#36249)
This pull request adds the TransportShardCloseAction which is a transport replication action that acquires all index shard permits for its execution. This action will be used in the future by the MetaDataIndexStateService in a new index closing process, where we need to execute some sanity checks before closing an index. The action executes the following verifications on the primary and replicas: * there is no other on going operation active on the shard * the data node holding the shard knows that the index is blocked for writes * the shard's max sequence number is equal to the global checkpoint When the verifications are done and successful, the shard is flushed. Relates #33888
This commit is contained in:
parent
97259f0efc
commit
3ca885e9ac
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.indices.close;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
|
||||
TransportVerifyShardBeforeCloseAction.ShardCloseRequest, TransportVerifyShardBeforeCloseAction.ShardCloseRequest, ReplicationResponse> {
|
||||
|
||||
public static final String NAME = CloseIndexAction.NAME + "[s]";
|
||||
private static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
|
||||
|
||||
@Inject
|
||||
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
|
||||
final ClusterService clusterService, final IndicesService indicesService,
|
||||
final ThreadPool threadPool, final ShardStateAction stateAction,
|
||||
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
|
||||
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
|
||||
ShardCloseRequest::new, ShardCloseRequest::new, ThreadPool.Names.MANAGEMENT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicationResponse newResponseInstance() {
|
||||
return new ReplicationResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acquirePrimaryOperationPermit(final IndexShard primary,
|
||||
final ShardCloseRequest request,
|
||||
final ActionListener<Releasable> onAcquired) {
|
||||
primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acquireReplicaOperationPermit(final IndexShard replica,
|
||||
final ShardCloseRequest request,
|
||||
final ActionListener<Releasable> onAcquired,
|
||||
final long primaryTerm,
|
||||
final long globalCheckpoint,
|
||||
final long maxSeqNoOfUpdateOrDeletes) {
|
||||
replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult<ShardCloseRequest, ReplicationResponse> shardOperationOnPrimary(final ShardCloseRequest shardRequest,
|
||||
final IndexShard primary) throws Exception {
|
||||
executeShardOperation(primary);
|
||||
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(final ShardCloseRequest shardRequest, final IndexShard replica) throws Exception {
|
||||
executeShardOperation(replica);
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
||||
private void executeShardOperation(final IndexShard indexShard) {
|
||||
final ShardId shardId = indexShard.shardId();
|
||||
if (indexShard.getActiveOperationsCount() != 0) {
|
||||
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
|
||||
}
|
||||
|
||||
final ClusterBlocks clusterBlocks = clusterService.state().blocks();
|
||||
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), EXPECTED_BLOCK) == false) {
|
||||
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + EXPECTED_BLOCK + " before closing");
|
||||
}
|
||||
|
||||
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
|
||||
if (indexShard.getGlobalCheckpoint() != maxSeqNo) {
|
||||
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
|
||||
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
|
||||
}
|
||||
indexShard.flush(new FlushRequest());
|
||||
logger.debug("{} shard is ready for closing", shardId);
|
||||
}
|
||||
|
||||
public static class ShardCloseRequest extends ReplicationRequest<ShardCloseRequest> {
|
||||
|
||||
ShardCloseRequest(){
|
||||
}
|
||||
|
||||
public ShardCloseRequest(final ShardId shardId) {
|
||||
super(shardId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "close shard {" + shardId + "}";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.indices.close;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.Before;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
|
||||
|
||||
private IndexShard indexShard;
|
||||
private TransportVerifyShardBeforeCloseAction action;
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
indexShard = mock(IndexShard.class);
|
||||
when(indexShard.getActiveOperationsCount()).thenReturn(0);
|
||||
when(indexShard.getGlobalCheckpoint()).thenReturn(0L);
|
||||
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L));
|
||||
|
||||
final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
|
||||
when(indexShard.shardId()).thenReturn(shardId);
|
||||
|
||||
clusterService = mock(ClusterService.class);
|
||||
when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test"))
|
||||
.blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build());
|
||||
|
||||
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, mock(TransportService.class), clusterService,
|
||||
mock(IndicesService.class), mock(ThreadPool.class), mock(ShardStateAction.class), mock(ActionFilters.class),
|
||||
mock(IndexNameExpressionResolver.class));
|
||||
}
|
||||
|
||||
private void executeOnPrimaryOrReplica() throws Exception {
|
||||
final TransportVerifyShardBeforeCloseAction.ShardCloseRequest request =
|
||||
new TransportVerifyShardBeforeCloseAction.ShardCloseRequest(indexShard.shardId());
|
||||
if (randomBoolean()) {
|
||||
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
|
||||
} else {
|
||||
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
|
||||
}
|
||||
}
|
||||
|
||||
public void testOperationSuccessful() throws Exception {
|
||||
executeOnPrimaryOrReplica();
|
||||
verify(indexShard, times(1)).flush(any(FlushRequest.class));
|
||||
}
|
||||
|
||||
public void testOperationFailsWithOnGoingOps() {
|
||||
when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(1, 10));
|
||||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
|
||||
assertThat(exception.getMessage(),
|
||||
equalTo("On-going operations in progress while checking index shard " + indexShard.shardId() + " before closing"));
|
||||
verify(indexShard, times(0)).flush(any(FlushRequest.class));
|
||||
}
|
||||
|
||||
public void testOperationFailsWithNoBlock() {
|
||||
when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")).build());
|
||||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
|
||||
assertThat(exception.getMessage(),
|
||||
equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + INDEX_CLOSED_BLOCK + " before closing"));
|
||||
verify(indexShard, times(0)).flush(any(FlushRequest.class));
|
||||
}
|
||||
|
||||
public void testOperationFailsWithGlobalCheckpointNotCaughtUp() {
|
||||
final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE);
|
||||
final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo);
|
||||
final long globalCheckpoint = randomValueOtherThan(maxSeqNo,
|
||||
() -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint));
|
||||
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint));
|
||||
when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint);
|
||||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
|
||||
assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number ["
|
||||
+ maxSeqNo + "] on index shard " + indexShard.shardId()));
|
||||
verify(indexShard, times(0)).flush(any(FlushRequest.class));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue