diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java new file mode 100644 index 00000000000..ab895dd7af8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -0,0 +1,126 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction< + TransportVerifyShardBeforeCloseAction.ShardCloseRequest, TransportVerifyShardBeforeCloseAction.ShardCloseRequest, ReplicationResponse> { + + public static final String NAME = CloseIndexAction.NAME + "[s]"; + private static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK; + + @Inject + public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService, + final ClusterService clusterService, final IndicesService indicesService, + final ThreadPool threadPool, final ShardStateAction stateAction, + final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) { + super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver, + ShardCloseRequest::new, ShardCloseRequest::new, ThreadPool.Names.MANAGEMENT); + } + + @Override + protected ReplicationResponse newResponseInstance() { + return new ReplicationResponse(); + } + + @Override + protected void acquirePrimaryOperationPermit(final IndexShard primary, + final ShardCloseRequest request, + final ActionListener onAcquired) { + primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout()); + } + + @Override + protected void acquireReplicaOperationPermit(final IndexShard replica, + final ShardCloseRequest request, + final ActionListener onAcquired, + final long primaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdateOrDeletes) { + replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout()); + } + + @Override + protected PrimaryResult shardOperationOnPrimary(final ShardCloseRequest shardRequest, + final IndexShard primary) throws Exception { + executeShardOperation(primary); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + } + + @Override + protected ReplicaResult shardOperationOnReplica(final ShardCloseRequest shardRequest, final IndexShard replica) throws Exception { + executeShardOperation(replica); + return new ReplicaResult(); + } + + private void executeShardOperation(final IndexShard indexShard) { + final ShardId shardId = indexShard.shardId(); + if (indexShard.getActiveOperationsCount() != 0) { + throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing"); + } + + final ClusterBlocks clusterBlocks = clusterService.state().blocks(); + if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), EXPECTED_BLOCK) == false) { + throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + EXPECTED_BLOCK + " before closing"); + } + + final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); + if (indexShard.getGlobalCheckpoint() != maxSeqNo) { + throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint() + + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); + } + indexShard.flush(new FlushRequest()); + logger.debug("{} shard is ready for closing", shardId); + } + + public static class ShardCloseRequest extends ReplicationRequest { + + ShardCloseRequest(){ + } + + public ShardCloseRequest(final ShardId shardId) { + super(shardId); + } + + @Override + public String toString() { + return "close shard {" + shardId + "}"; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java new file mode 100644 index 00000000000..ae1a0b87689 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -0,0 +1,122 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { + + private IndexShard indexShard; + private TransportVerifyShardBeforeCloseAction action; + private ClusterService clusterService; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + indexShard = mock(IndexShard.class); + when(indexShard.getActiveOperationsCount()).thenReturn(0); + when(indexShard.getGlobalCheckpoint()).thenReturn(0L); + when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L)); + + final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3)); + when(indexShard.shardId()).thenReturn(shardId); + + clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")) + .blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build()); + + action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, mock(TransportService.class), clusterService, + mock(IndicesService.class), mock(ThreadPool.class), mock(ShardStateAction.class), mock(ActionFilters.class), + mock(IndexNameExpressionResolver.class)); + } + + private void executeOnPrimaryOrReplica() throws Exception { + final TransportVerifyShardBeforeCloseAction.ShardCloseRequest request = + new TransportVerifyShardBeforeCloseAction.ShardCloseRequest(indexShard.shardId()); + if (randomBoolean()) { + assertNotNull(action.shardOperationOnPrimary(request, indexShard)); + } else { + assertNotNull(action.shardOperationOnPrimary(request, indexShard)); + } + } + + public void testOperationSuccessful() throws Exception { + executeOnPrimaryOrReplica(); + verify(indexShard, times(1)).flush(any(FlushRequest.class)); + } + + public void testOperationFailsWithOnGoingOps() { + when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(1, 10)); + + IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); + assertThat(exception.getMessage(), + equalTo("On-going operations in progress while checking index shard " + indexShard.shardId() + " before closing")); + verify(indexShard, times(0)).flush(any(FlushRequest.class)); + } + + public void testOperationFailsWithNoBlock() { + when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")).build()); + + IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); + assertThat(exception.getMessage(), + equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + INDEX_CLOSED_BLOCK + " before closing")); + verify(indexShard, times(0)).flush(any(FlushRequest.class)); + } + + public void testOperationFailsWithGlobalCheckpointNotCaughtUp() { + final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE); + final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo); + final long globalCheckpoint = randomValueOtherThan(maxSeqNo, + () -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint)); + when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint)); + when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint); + + IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); + assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number [" + + maxSeqNo + "] on index shard " + indexShard.shardId())); + verify(indexShard, times(0)).flush(any(FlushRequest.class)); + } +}