Add test for `PutFollowAction` on a closed index (#38236)
This is related to #35975. Currently when an index falls behind a leader it encounters a fatal exception. This commit adds a test for that scenario. Additionally, it tests that the user can stop following, close the follower index, and put follow again. After the indexing is re-bootstrapped, it will recover the documents it lost in normal following operations.
This commit is contained in:
parent
c3cdf84c04
commit
5a33816c86
|
@ -8,6 +8,9 @@ package org.elasticsearch.xpack.ccr;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
|
@ -16,10 +19,13 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
|||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
|
@ -53,6 +59,7 @@ import org.elasticsearch.index.IndexSettings;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.snapshots.SnapshotRestoreException;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||
|
@ -75,6 +82,7 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -86,6 +94,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -947,6 +956,98 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|||
assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true));
|
||||
}
|
||||
|
||||
public void testMustCloseIndexAndPauseToRestartWithPutFollowing() throws Exception {
|
||||
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureLeaderYellow("index1");
|
||||
|
||||
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
|
||||
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
assertTrue(response.isFollowIndexCreated());
|
||||
assertTrue(response.isFollowIndexShardsAcked());
|
||||
assertTrue(response.isIndexFollowingStarted());
|
||||
|
||||
final PutFollowAction.Request followRequest2 = putFollow("index1", "index2");
|
||||
expectThrows(SnapshotRestoreException.class,
|
||||
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet());
|
||||
|
||||
followerClient().admin().indices().prepareClose("index2").get();
|
||||
expectThrows(ResourceAlreadyExistsException.class,
|
||||
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet());
|
||||
}
|
||||
|
||||
public void testIndexFallBehind() throws Exception {
|
||||
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureLeaderYellow("index1");
|
||||
|
||||
final int numDocs = randomIntBetween(2, 64);
|
||||
logger.info("Indexing [{}] docs as first batch", numDocs);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
||||
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||
}
|
||||
|
||||
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
|
||||
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
assertTrue(response.isFollowIndexCreated());
|
||||
assertTrue(response.isFollowIndexShardsAcked());
|
||||
assertTrue(response.isIndexFollowingStarted());
|
||||
|
||||
assertIndexFullyReplicatedToFollower("index1", "index2");
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
assertBusy(assertExpectedDocumentRunnable(i));
|
||||
}
|
||||
|
||||
pauseFollow("index2");
|
||||
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i * 2);
|
||||
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||
}
|
||||
leaderClient().prepareDelete("index1", "doc", "1").get();
|
||||
leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet();
|
||||
leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet();
|
||||
ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index1");
|
||||
forceMergeRequest.maxNumSegments(1);
|
||||
leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet();
|
||||
|
||||
followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
|
||||
|
||||
assertBusy(() -> {
|
||||
List<ShardFollowNodeTaskStatus> statuses = getFollowTaskStatuses("index2");
|
||||
Set<ResourceNotFoundException> exceptions = statuses.stream()
|
||||
.map(ShardFollowNodeTaskStatus::getFatalException)
|
||||
.filter(Objects::nonNull)
|
||||
.map(ExceptionsHelper::unwrapCause)
|
||||
.filter(e -> e instanceof ResourceNotFoundException)
|
||||
.map(e -> (ResourceNotFoundException) e)
|
||||
.filter(e -> e.getMetadataKeys().contains("es.requested_operations_missing"))
|
||||
.collect(Collectors.toSet());
|
||||
assertThat(exceptions.size(), greaterThan(0));
|
||||
});
|
||||
|
||||
followerClient().admin().indices().prepareClose("index2").get();
|
||||
pauseFollow("index2");
|
||||
|
||||
|
||||
final PutFollowAction.Request followRequest2 = putFollow("index1", "index2");
|
||||
PutFollowAction.Response response2 = followerClient().execute(PutFollowAction.INSTANCE, followRequest2).get();
|
||||
assertTrue(response2.isFollowIndexCreated());
|
||||
assertTrue(response2.isFollowIndexShardsAcked());
|
||||
assertTrue(response2.isIndexFollowingStarted());
|
||||
|
||||
ensureFollowerGreen("index2");
|
||||
assertIndexFullyReplicatedToFollower("index1", "index2");
|
||||
for (int i = 2; i < numDocs; i++) {
|
||||
assertBusy(assertExpectedDocumentRunnable(i, i * 2));
|
||||
}
|
||||
}
|
||||
|
||||
private long getFollowTaskSettingsVersion(String followerIndex) {
|
||||
long settingsVersion = -1L;
|
||||
for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {
|
||||
|
@ -1032,9 +1133,13 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|||
}
|
||||
|
||||
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
|
||||
return assertExpectedDocumentRunnable(value, value);
|
||||
}
|
||||
|
||||
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int key, final int value) {
|
||||
return () -> {
|
||||
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
|
||||
assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());
|
||||
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(key)).get();
|
||||
assertTrue("Doc with id [" + key + "] is missing", getResponse.isExists());
|
||||
assertTrue((getResponse.getSource().containsKey("f")));
|
||||
assertThat(getResponse.getSource().get("f"), equalTo(value));
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue