From 84d4e5bcb1851d9007b552d13789b233971e7504 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 12 Dec 2020 12:25:13 -0500 Subject: [PATCH] CCR should check historyUUID in every read request (#66220) Today, CCR only checks the historyUUID of the leader shard when it has operations to replicate. If the follower shard is already in-sync with the leader shard, then CCR won't detect if the historyUUID of the leader shard has been changed. While this is not an issue, it can annoy users in the following situation: The follower index is in-sync with the leader index Users restore the leader index from snapshots CCR won't detect the issue and report ok in its stats API CCR suddenly stops working when users start indexing to the leader index This commit makes sure that we always check historyUUID in every read-request so we can detect and report the issue as soon as possible. Backport of #65841 --- .../xpack/ccr/LocalIndexFollowingIT.java | 47 +++++++++++++++++++ .../xpack/ccr/action/ShardChangesAction.java | 16 ++++--- 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 0368ea90f5e..918a8f4181f 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesArray; @@ -33,7 +34,9 @@ import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.nullValue; public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { @@ -204,6 +207,50 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { }); } + public void testChangeLeaderIndex() throws Exception { + final String settings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + + // First, let index-1 is writable and index-2 follows index-1 + assertAcked(client().admin().indices().prepareCreate("index-1").setSource(settings, XContentType.JSON)); + ensureGreen("index-1"); + int numDocs = between(1, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("index-1", "_doc").setSource("{}", XContentType.JSON).get(); + } + client().execute(PutFollowAction.INSTANCE, getPutFollowRequest("index-1", "index-2")).get(); + assertBusy(() -> assertThat(client().prepareSearch("index-2").get().getHits().getTotalHits().value, equalTo((long) numDocs))); + + // Then switch index-1 to be a follower of index-0 + assertAcked(client().admin().indices().prepareCreate("index-0").setSource(settings, XContentType.JSON)); + final int newDocs; + if (randomBoolean()) { + newDocs = randomIntBetween(0, numDocs); + } else { + newDocs = numDocs + randomIntBetween(1, 100); + } + for (int i = 0; i < newDocs; i++) { + client().prepareIndex("index-0", "_doc").setSource("{}", XContentType.JSON).get(); + } + if (randomBoolean()) { + client().admin().indices().prepareFlush("index-0").get(); + } + assertAcked(client().admin().indices().prepareClose("index-1")); + client().execute(PutFollowAction.INSTANCE, getPutFollowRequest("index-0", "index-1")).get(); + + // index-2 should detect that the leader index has changed + assertBusy(() -> { + FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[]{"index-2"}); + FollowStatsAction.StatsResponses resp = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(resp.getStatsResponses(), hasSize(1)); + FollowStatsAction.StatsResponse stats = resp.getStatsResponses().get(0); + assertNotNull(stats.status().getFatalException()); + Throwable unwrapped = ExceptionsHelper.unwrap(stats.status().getFatalException(), IllegalStateException.class); + assertNotNull(unwrapped); + assertThat(unwrapped.getMessage(), containsString("unexpected history uuid")); + }); + } + public static String getIndexSettings(final int numberOfShards, final int numberOfReplicas, final Map additionalIndexSettings) throws IOException { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 2b2d7a35367..ea563baa7f8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -447,7 +447,7 @@ public class ShardChangesAction extends ActionType listener.onFailure(new IndexNotFoundException(shardId.getIndex())); return; } - + checkHistoryUUID(indexShard, request.expectedHistoryUUID); final long mappingVersion = indexMetadata.getMappingVersion(); final long settingsVersion = indexMetadata.getSettingsVersion(); final long aliasesVersion = indexMetadata.getAliasesVersion(); @@ -493,6 +493,14 @@ public class ShardChangesAction extends ActionType static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; + private static void checkHistoryUUID(IndexShard indexShard, String expectedHistoryUUID) { + final String historyUUID = indexShard.getHistoryUUID(); + if (historyUUID.equals(expectedHistoryUUID) == false) { + throw new IllegalStateException( + "unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" + historyUUID + "]"); + } + } + /** * Returns at most the specified maximum number of operations from the specified from sequence number. This method will never return * operations above the specified global checkpoint. @@ -519,11 +527,7 @@ public class ShardChangesAction extends ActionType if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } - final String historyUUID = indexShard.getHistoryUUID(); - if (historyUUID.equals(expectedHistoryUUID) == false) { - throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" + - historyUUID + "]"); - } + checkHistoryUUID(indexShard, expectedHistoryUUID); if (fromSeqNo > globalCheckpoint) { throw new IllegalStateException( "not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]");