From dc212de822e7b3675ddb3950014829f092ff6d4e Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 11 Feb 2019 17:34:17 +0100 Subject: [PATCH] Specialize pre-closing checks for engine implementations (#38702) (#38722) The Close Index API has been refactored in 6.7.0 and it now performs pre-closing sanity checks on shards before an index is closed: the maximum sequence number must be equals to the global checkpoint. While this is a strong requirement for regular shards, we identified the need to relax this check in the case of CCR following shards. The following shards are not in charge of managing the max sequence number or global checkpoint, which are pulled from a leader shard. They also fetch and process batches of operations from the leader in an unordered way, potentially leaving gaps in the history of ops. If the following shard lags a lot it's possible that the global checkpoint and max seq number never get in sync, preventing the following shard to be closed and a new PUT Follow action to be issued on this shard (which is our recommended way to resume/restart a CCR following). This commit allows each Engine implementation to define the specific verification it must perform before closing the index. In order to allow following/frozen/closed shards to be closed whatever the max seq number or global checkpoint are, the FollowingEngine and ReadOnlyEngine do not perform any check before the index is closed. Co-authored-by: Martijn van Groningen --- ...TransportVerifyShardBeforeCloseAction.java | 8 +- .../elasticsearch/index/engine/Engine.java | 14 +++ .../index/engine/ReadOnlyEngine.java | 10 ++ .../elasticsearch/index/shard/IndexShard.java | 9 ++ ...portVerifyShardBeforeCloseActionTests.java | 24 ++--- .../index/engine/ReadOnlyEngineTests.java | 21 +++++ .../ccr/index/engine/FollowingEngine.java | 8 ++ .../xpack/ccr/CloseFollowerIndexIT.java | 91 +++++++++++++++++++ .../index/engine/FollowingEngineTests.java | 19 ++++ 9 files changed, 183 insertions(+), 21 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 2c3d178db88..9ae7d065dd9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -108,13 +108,7 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } - - final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - if (indexShard.getGlobalCheckpoint() != maxSeqNo) { - throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint() - + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); - } - + indexShard.verifyShardBeforeIndexClosing(); indexShard.flush(new FlushRequest().force(true)); logger.trace("{} shard is ready for closing", shardId); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e450e93e9d3..b79bb079d94 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -265,6 +265,20 @@ public abstract class Engine implements Closeable { return new DocsStats(numDocs, numDeletedDocs, sizeInBytes); } + /** + * Performs the pre-closing checks on the {@link Engine}. + * + * @throws IllegalStateException if the sanity checks failed + */ + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo(); + if (globalCheckpoint != maxSeqNo) { + throw new IllegalStateException("Global checkpoint [" + globalCheckpoint + + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); + } + } + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 5c09708b62c..a33f6f8fe27 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -142,6 +142,16 @@ public class ReadOnlyEngine extends Engine { } } + @Override + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + // the value of the global checkpoint is verified when the read-only engine is opened, + // and it is not expected to change during the lifecycle of the engine. We could also + // check this value before closing the read-only engine but if something went wrong + // and the global checkpoint is not in-sync with the max. sequence number anymore, + // checking the value here again would prevent the read-only engine to be closed and + // reopened as an internal engine, which would be the path to fix the issue. + } + protected final DirectoryReader wrapReader(DirectoryReader reader, Function readerWrapperFunction) throws IOException { reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 22976af581b..1ea894e7aed 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3092,4 +3092,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; } + + /** + * Performs the pre-closing checks on the {@link IndexShard}. + * + * @throws IllegalStateException if the sanity checks failed + */ + public void verifyShardBeforeIndexClosing() throws IllegalStateException { + getEngine().verifyEngineBeforeIndexClosing(); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 1b192edfda6..687b0168070 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -40,8 +40,6 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.seqno.SeqNoStats; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -73,6 +71,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -100,8 +99,6 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { indexShard = mock(IndexShard.class); when(indexShard.getActiveOperationsCount()).thenReturn(0); - when(indexShard.getGlobalCheckpoint()).thenReturn(0L); - when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L)); final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3)); when(indexShard.shardId()).thenReturn(shardId); @@ -174,17 +171,16 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { verify(indexShard, times(0)).flush(any(FlushRequest.class)); } - public void testOperationFailsWithGlobalCheckpointNotCaughtUp() { - final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE); - final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo); - final long globalCheckpoint = randomValueOtherThan(maxSeqNo, - () -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint)); - when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint)); - when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint); + public void testVerifyShardBeforeIndexClosing() throws Exception { + executeOnPrimaryOrReplica(); + verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); + verify(indexShard, times(1)).flush(any(FlushRequest.class)); + } - IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); - assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number [" - + maxSeqNo + "] on index shard " + indexShard.shardId())); + public void testVerifyShardBeforeIndexClosingFailed() { + doThrow(new IllegalStateException("test")).when(indexShard).verifyShardBeforeIndexClosing(); + expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); + verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); verify(indexShard, times(0)).flush(any(FlushRequest.class)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index b345afe9b8f..87bf9b4c3de 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -189,4 +189,25 @@ public class ReadOnlyEngineTests extends EngineTestCase { } } } + + /** + * Test that {@link ReadOnlyEngine#verifyEngineBeforeIndexClosing()} never fails + * whatever the value of the global checkpoint to check is. + */ + public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + store.createEmpty(Version.CURRENT.luceneVersion); + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { + globalCheckpoint.set(randomNonNegativeLong()); + try { + readOnlyEngine.verifyEngineBeforeIndexClosing(); + } catch (final IllegalStateException e) { + fail("Read-only engine pre-closing verifications failed"); + } + } + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 23157c17781..c779b491d58 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -195,4 +195,12 @@ public final class FollowingEngine extends InternalEngine { public long getNumberOfOptimizedIndexing() { return numOfOptimizedIndexing.count(); } + + @Override + public void verifyEngineBeforeIndexClosing() throws IllegalStateException { + // the value of the global checkpoint is not verified when the following engine is closed, + // allowing it to be closed even in the case where all operations have not been fetched and + // processed from the leader and the operations history has gaps. This way the following + // engine can be closed and reopened in order to bootstrap the follower index again. + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java new file mode 100644 index 00000000000..0551d30c2e7 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class CloseFollowerIndexIT extends CcrIntegTestCase { + + public void testCloseAndReopenFollowerIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + PutFollowAction.Request followRequest = new PutFollowAction.Request(); + followRequest.setRemoteCluster("leader_cluster"); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowerIndex("index2"); + followRequest.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10)); + followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10)); + followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(1)); + followRequest.getParameters().setMaxOutstandingReadRequests(128); + followRequest.waitForActiveShards(ActiveShardCount.DEFAULT); + + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + ensureFollowerGreen("index2"); + + AtomicBoolean isRunning = new AtomicBoolean(true); + int numThreads = 4; + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread(() -> { + while (isRunning.get()) { + leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + }); + threads[i].start(); + } + + atLeastDocsIndexed(followerClient(), "index2", 32); + AcknowledgedResponse response = followerClient().admin().indices().close(new CloseIndexRequest("index2")).get(); + assertThat(response.isAcknowledged(), is(true)); + + ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); + List blocks = new ArrayList<>(clusterState.getBlocks().indices().get("index2")); + assertThat(blocks.size(), equalTo(1)); + assertThat(blocks.get(0).id(), equalTo(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)); + + isRunning.set(false); + for (Thread thread : threads) { + thread.join(); + } + assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2")).get()); + + refresh(leaderClient(), "index1"); + SearchRequest leaderSearchRequest = new SearchRequest("index1"); + leaderSearchRequest.source().trackTotalHits(true); + long leaderIndexDocs = leaderClient().search(leaderSearchRequest).actionGet().getHits().getTotalHits().value; + assertBusy(() -> { + refresh(followerClient(), "index2"); + SearchRequest followerSearchRequest = new SearchRequest("index2"); + followerSearchRequest.source().trackTotalHits(true); + long followerIndexDocs = followerClient().search(followerSearchRequest).actionGet().getHits().getTotalHits().value; + assertThat(followerIndexDocs, equalTo(leaderIndexDocs)); + }); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index df406a4c09a..67d31ff3900 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -640,4 +640,23 @@ public class FollowingEngineTests extends ESTestCase { } } } + + /** + * Test that {@link FollowingEngine#verifyEngineBeforeIndexClosing()} never fails + * whatever the value of the global checkpoint to check is. + */ + public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { + final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); + runIndexTest( + seqNo, + Engine.Operation.Origin.PRIMARY, + (followingEngine, index) -> { + globalCheckpoint.set(randomNonNegativeLong()); + try { + followingEngine.verifyEngineBeforeIndexClosing(); + } catch (final IllegalStateException e) { + fail("Following engine pre-closing verifications failed"); + } + }); + } }