diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 0368ea90f5e..918a8f4181f 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesArray; @@ -33,7 +34,9 @@ import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.nullValue; public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { @@ -204,6 +207,50 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { }); } + public void testChangeLeaderIndex() throws Exception { + final String settings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + + // First, let index-1 is writable and index-2 follows index-1 + assertAcked(client().admin().indices().prepareCreate("index-1").setSource(settings, XContentType.JSON)); + ensureGreen("index-1"); + int numDocs = between(1, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("index-1", "_doc").setSource("{}", XContentType.JSON).get(); + } + client().execute(PutFollowAction.INSTANCE, getPutFollowRequest("index-1", "index-2")).get(); + assertBusy(() -> assertThat(client().prepareSearch("index-2").get().getHits().getTotalHits().value, equalTo((long) numDocs))); + + // Then switch index-1 to be a follower of index-0 + assertAcked(client().admin().indices().prepareCreate("index-0").setSource(settings, XContentType.JSON)); + final int newDocs; + if (randomBoolean()) { + newDocs = randomIntBetween(0, numDocs); + } else { + newDocs = numDocs + randomIntBetween(1, 100); + } + for (int i = 0; i < newDocs; i++) { + client().prepareIndex("index-0", "_doc").setSource("{}", XContentType.JSON).get(); + } + if (randomBoolean()) { + client().admin().indices().prepareFlush("index-0").get(); + } + assertAcked(client().admin().indices().prepareClose("index-1")); + client().execute(PutFollowAction.INSTANCE, getPutFollowRequest("index-0", "index-1")).get(); + + // index-2 should detect that the leader index has changed + assertBusy(() -> { + FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[]{"index-2"}); + FollowStatsAction.StatsResponses resp = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(resp.getStatsResponses(), hasSize(1)); + FollowStatsAction.StatsResponse stats = resp.getStatsResponses().get(0); + assertNotNull(stats.status().getFatalException()); + Throwable unwrapped = ExceptionsHelper.unwrap(stats.status().getFatalException(), IllegalStateException.class); + assertNotNull(unwrapped); + assertThat(unwrapped.getMessage(), containsString("unexpected history uuid")); + }); + } + public static String getIndexSettings(final int numberOfShards, final int numberOfReplicas, final Map additionalIndexSettings) throws IOException { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 2b2d7a35367..ea563baa7f8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -447,7 +447,7 @@ public class ShardChangesAction extends ActionType listener.onFailure(new IndexNotFoundException(shardId.getIndex())); return; } - + checkHistoryUUID(indexShard, request.expectedHistoryUUID); final long mappingVersion = indexMetadata.getMappingVersion(); final long settingsVersion = indexMetadata.getSettingsVersion(); final long aliasesVersion = indexMetadata.getAliasesVersion(); @@ -493,6 +493,14 @@ public class ShardChangesAction extends ActionType static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; + private static void checkHistoryUUID(IndexShard indexShard, String expectedHistoryUUID) { + final String historyUUID = indexShard.getHistoryUUID(); + if (historyUUID.equals(expectedHistoryUUID) == false) { + throw new IllegalStateException( + "unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" + historyUUID + "]"); + } + } + /** * Returns at most the specified maximum number of operations from the specified from sequence number. This method will never return * operations above the specified global checkpoint. @@ -519,11 +527,7 @@ public class ShardChangesAction extends ActionType if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } - final String historyUUID = indexShard.getHistoryUUID(); - if (historyUUID.equals(expectedHistoryUUID) == false) { - throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" + - historyUUID + "]"); - } + checkHistoryUUID(indexShard, expectedHistoryUUID); if (fromSeqNo > globalCheckpoint) { throw new IllegalStateException( "not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]");