CCR should check historyUUID in every read request (#66220)

Today, CCR only checks the historyUUID of the leader shard when it has
operations to replicate. If the follower shard is already in-sync with
the leader shard, then CCR won't detect if the historyUUID of the leader
shard has been changed. While this is not an issue, it can annoy users
in the following situation:

The follower index is in-sync with the leader index

Users restore the leader index from snapshots

CCR won't detect the issue and report ok in its stats API

CCR suddenly stops working when users start indexing to the leader index

This commit makes sure that we always check historyUUID in every
read-request so we can detect and report the issue as soon as possible.

Backport of #65841
This commit is contained in:
Nhat Nguyen 2020-12-12 12:25:13 -05:00 committed by GitHub
parent 9bf6d055eb
commit 84d4e5bcb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 6 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ccr; package org.elasticsearch.xpack.ccr;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
@ -33,7 +34,9 @@ import java.util.stream.StreamSupport;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
@ -204,6 +207,50 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
}); });
} }
public void testChangeLeaderIndex() throws Exception {
final String settings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
// First, let index-1 is writable and index-2 follows index-1
assertAcked(client().admin().indices().prepareCreate("index-1").setSource(settings, XContentType.JSON));
ensureGreen("index-1");
int numDocs = between(1, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("index-1", "_doc").setSource("{}", XContentType.JSON).get();
}
client().execute(PutFollowAction.INSTANCE, getPutFollowRequest("index-1", "index-2")).get();
assertBusy(() -> assertThat(client().prepareSearch("index-2").get().getHits().getTotalHits().value, equalTo((long) numDocs)));
// Then switch index-1 to be a follower of index-0
assertAcked(client().admin().indices().prepareCreate("index-0").setSource(settings, XContentType.JSON));
final int newDocs;
if (randomBoolean()) {
newDocs = randomIntBetween(0, numDocs);
} else {
newDocs = numDocs + randomIntBetween(1, 100);
}
for (int i = 0; i < newDocs; i++) {
client().prepareIndex("index-0", "_doc").setSource("{}", XContentType.JSON).get();
}
if (randomBoolean()) {
client().admin().indices().prepareFlush("index-0").get();
}
assertAcked(client().admin().indices().prepareClose("index-1"));
client().execute(PutFollowAction.INSTANCE, getPutFollowRequest("index-0", "index-1")).get();
// index-2 should detect that the leader index has changed
assertBusy(() -> {
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[]{"index-2"});
FollowStatsAction.StatsResponses resp = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(resp.getStatsResponses(), hasSize(1));
FollowStatsAction.StatsResponse stats = resp.getStatsResponses().get(0);
assertNotNull(stats.status().getFatalException());
Throwable unwrapped = ExceptionsHelper.unwrap(stats.status().getFatalException(), IllegalStateException.class);
assertNotNull(unwrapped);
assertThat(unwrapped.getMessage(), containsString("unexpected history uuid"));
});
}
public static String getIndexSettings(final int numberOfShards, public static String getIndexSettings(final int numberOfShards,
final int numberOfReplicas, final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException { final Map<String, String> additionalIndexSettings) throws IOException {

View File

@ -447,7 +447,7 @@ public class ShardChangesAction extends ActionType<ShardChangesAction.Response>
listener.onFailure(new IndexNotFoundException(shardId.getIndex())); listener.onFailure(new IndexNotFoundException(shardId.getIndex()));
return; return;
} }
checkHistoryUUID(indexShard, request.expectedHistoryUUID);
final long mappingVersion = indexMetadata.getMappingVersion(); final long mappingVersion = indexMetadata.getMappingVersion();
final long settingsVersion = indexMetadata.getSettingsVersion(); final long settingsVersion = indexMetadata.getSettingsVersion();
final long aliasesVersion = indexMetadata.getAliasesVersion(); final long aliasesVersion = indexMetadata.getAliasesVersion();
@ -493,6 +493,14 @@ public class ShardChangesAction extends ActionType<ShardChangesAction.Response>
static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
private static void checkHistoryUUID(IndexShard indexShard, String expectedHistoryUUID) {
final String historyUUID = indexShard.getHistoryUUID();
if (historyUUID.equals(expectedHistoryUUID) == false) {
throw new IllegalStateException(
"unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" + historyUUID + "]");
}
}
/** /**
* Returns at most the specified maximum number of operations from the specified from sequence number. This method will never return * Returns at most the specified maximum number of operations from the specified from sequence number. This method will never return
* operations above the specified global checkpoint. * operations above the specified global checkpoint.
@ -519,11 +527,7 @@ public class ShardChangesAction extends ActionType<ShardChangesAction.Response>
if (indexShard.state() != IndexShardState.STARTED) { if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
} }
final String historyUUID = indexShard.getHistoryUUID(); checkHistoryUUID(indexShard, expectedHistoryUUID);
if (historyUUID.equals(expectedHistoryUUID) == false) {
throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" +
historyUUID + "]");
}
if (fromSeqNo > globalCheckpoint) { if (fromSeqNo > globalCheckpoint) {
throw new IllegalStateException( throw new IllegalStateException(
"not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]"); "not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]");