Add remote recovery to ShardFollowTaskReplicationTests (#39007)

We simulate remote recovery in ShardFollowTaskReplicationTests 
by bootstrapping the follower with the safe commit of the leader.

Relates #35975
This commit is contained in:
Nhat Nguyen 2019-02-18 09:10:44 -05:00
parent 1efb01661c
commit 2947ccf5c3
2 changed files with 297 additions and 217 deletions

View File

@ -267,9 +267,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
public void startPrimary() throws IOException {
final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
recoverPrimary(primary);
HashSet<String> activeIds = new HashSet<>();
activeIds.addAll(activeIds());
activeIds.add(primary.routingEntry().allocationId().getId());
@ -302,6 +300,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
updateAllocationIDsOnPrimary();
}
protected synchronized void recoverPrimary(IndexShard primary) {
final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
}
public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
final ShardRouting shardRouting = TestShardRouting.newShardRouting(

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action;
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;
import org.apache.lucene.store.IOContext;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -17,9 +18,14 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -31,9 +37,16 @@ import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.RestoreOnlyRepository;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
@ -57,6 +70,8 @@ import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ -64,233 +79,241 @@ import static org.hamcrest.Matchers.nullValue;
public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase {
public void testSimpleCcrReplication() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(randomInt(2));
ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) {
try (ReplicationGroup leaderGroup = createLeaderGroup(randomInt(2))) {
leaderGroup.startAll();
int docCount = leaderGroup.appendDocs(randomInt(64));
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, randomInt(2))) {
int docCount = leaderGroup.appendDocs(randomInt(64));
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
followerGroup.getPrimary().getHistoryUUID(),
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
docCount += leaderGroup.appendDocs(randomInt(128));
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
for (IndexShard shard : followerGroup) {
assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount));
docCount += leaderGroup.appendDocs(randomInt(128));
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
for (IndexShard shard : followerGroup) {
assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount));
}
// Deletes should be replicated to the follower
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
for (String deleteId : deleteDocIds) {
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId));
assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED));
}
leaderGroup.syncGlobalCheckpoint();
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size());
});
shardFollowTask.markAsCompleted();
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true);
}
// Deletes should be replicated to the follower
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
for (String deleteId : deleteDocIds) {
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId));
assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED));
}
leaderGroup.syncGlobalCheckpoint();
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size());
});
shardFollowTask.markAsCompleted();
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true);
}
}
public void testAddRemoveShardOnLeader() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(1 + randomInt(1));
ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) {
try (ReplicationGroup leaderGroup = createLeaderGroup(1 + randomInt(1))) {
leaderGroup.startAll();
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, randomInt(2))) {
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
followerGroup.getPrimary().getHistoryUUID(),
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
int batches = between(0, 10);
int docCount = 0;
boolean hasPromotion = false;
for (int i = 0; i < batches; i++) {
docCount += leaderGroup.indexDocs(between(1, 5));
if (leaderGroup.getReplicas().isEmpty() == false && randomInt(100) < 5) {
IndexShard closingReplica = randomFrom(leaderGroup.getReplicas());
leaderGroup.removeReplica(closingReplica);
closingReplica.close("test", false);
closingReplica.store().close();
} else if (leaderGroup.getReplicas().isEmpty() == false && rarely()) {
IndexShard newPrimary = randomFrom(leaderGroup.getReplicas());
leaderGroup.promoteReplicaToPrimary(newPrimary).get();
hasPromotion = true;
} else if (randomInt(100) < 5) {
leaderGroup.addReplica();
leaderGroup.startReplicas(1);
int batches = between(0, 10);
int docCount = 0;
boolean hasPromotion = false;
for (int i = 0; i < batches; i++) {
docCount += leaderGroup.indexDocs(between(1, 5));
if (leaderGroup.getReplicas().isEmpty() == false && randomInt(100) < 5) {
IndexShard closingReplica = randomFrom(leaderGroup.getReplicas());
leaderGroup.removeReplica(closingReplica);
closingReplica.close("test", false);
closingReplica.store().close();
} else if (leaderGroup.getReplicas().isEmpty() == false && rarely()) {
IndexShard newPrimary = randomFrom(leaderGroup.getReplicas());
leaderGroup.promoteReplicaToPrimary(newPrimary).get();
hasPromotion = true;
} else if (randomInt(100) < 5) {
leaderGroup.addReplica();
leaderGroup.startReplicas(1);
}
leaderGroup.syncGlobalCheckpoint();
}
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
assertThat(shardFollowTask.getFailure(), nullValue());
int expectedDoc = docCount;
assertBusy(() -> followerGroup.assertAllEqual(expectedDoc));
shardFollowTask.markAsCompleted();
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, hasPromotion == false);
}
leaderGroup.assertAllEqual(docCount);
assertThat(shardFollowTask.getFailure(), nullValue());
int expectedDoc = docCount;
assertBusy(() -> followerGroup.assertAllEqual(expectedDoc));
shardFollowTask.markAsCompleted();
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, hasPromotion == false);
}
}
public void testChangeLeaderHistoryUUID() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(0);
ReplicationGroup followerGroup = createFollowGroup(0)) {
leaderGroup.startAll();
int docCount = leaderGroup.appendDocs(randomInt(64));
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
followerGroup.getPrimary().getHistoryUUID(),
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
try (ReplicationGroup leaderGroup = createLeaderGroup(0)) {
try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, 0)) {
leaderGroup.startAll();
int docCount = leaderGroup.appendDocs(randomInt(64));
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
followerGroup.getPrimary().getHistoryUUID(),
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
leaderGroup.reinitPrimaryShard();
leaderGroup.getPrimary().store().bootstrapNewHistory();
recoverShardFromStore(leaderGroup.getPrimary());
String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
leaderGroup.reinitPrimaryShard();
leaderGroup.getPrimary().store().bootstrapNewHistory();
recoverShardFromStore(leaderGroup.getPrimary());
String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
// force the global checkpoint on the leader to advance
leaderGroup.appendDocs(64);
// force the global checkpoint on the leader to advance
leaderGroup.appendDocs(64);
assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "]"));
});
assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "]"));
});
}
}
}
public void testChangeFollowerHistoryUUID() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(0);
ReplicationGroup followerGroup = createFollowGroup(0)) {
try (ReplicationGroup leaderGroup = createLeaderGroup(0)) {
leaderGroup.startAll();
int docCount = leaderGroup.appendDocs(randomInt(64));
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
followerGroup.getPrimary().getHistoryUUID(),
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
try(ReplicationGroup followerGroup = createFollowGroup(leaderGroup, 0)) {
int docCount = leaderGroup.appendDocs(randomInt(64));
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
followerGroup.getPrimary().getHistoryUUID(),
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
String oldHistoryUUID = followerGroup.getPrimary().getHistoryUUID();
followerGroup.reinitPrimaryShard();
followerGroup.getPrimary().store().bootstrapNewHistory();
recoverShardFromStore(followerGroup.getPrimary());
String newHistoryUUID = followerGroup.getPrimary().getHistoryUUID();
String oldHistoryUUID = followerGroup.getPrimary().getHistoryUUID();
followerGroup.reinitPrimaryShard();
followerGroup.getPrimary().store().bootstrapNewHistory();
recoverShardFromStore(followerGroup.getPrimary());
String newHistoryUUID = followerGroup.getPrimary().getHistoryUUID();
// force the global checkpoint on the leader to advance
leaderGroup.appendDocs(64);
// force the global checkpoint on the leader to advance
leaderGroup.appendDocs(64);
assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated"));
});
assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated"));
});
}
}
}
public void testRetryBulkShardOperations() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(between(0, 1));
ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) {
try (ReplicationGroup leaderGroup = createLeaderGroup(between(0, 1))) {
leaderGroup.startAll();
followerGroup.startAll();
leaderGroup.appendDocs(between(10, 100));
leaderGroup.refresh("test");
for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) {
long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1;
Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(),
Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i);
for (IndexShard shard : leaderGroup) {
getEngine(shard).noOp(noOp);
}
}
for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) {
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId));
assertThat(resp.getFailure(), nullValue());
}
leaderGroup.syncGlobalCheckpoint();
IndexShard leadingPrimary = leaderGroup.getPrimary();
// Simulates some bulk requests are completed on the primary and replicated to some (but all) replicas of the follower
// but the primary of the follower crashed before these requests completed.
for (int numBulks = between(1, 5), i = 0; i < numBulks; i++) {
long fromSeqNo = randomLongBetween(0, leadingPrimary.getGlobalCheckpoint());
long toSeqNo = randomLongBetween(fromSeqNo, leadingPrimary.getGlobalCheckpoint());
int numOps = Math.toIntExact(toSeqNo + 1 - fromSeqNo);
Translog.Operation[] ops = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(),
fromSeqNo, numOps, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES));
IndexShard followingPrimary = followerGroup.getPrimary();
TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> primaryResult =
TransportBulkShardOperationsAction.shardOperationOnPrimary(followingPrimary.shardId(),
followingPrimary.getHistoryUUID(), Arrays.asList(ops), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(),
followingPrimary, logger);
for (IndexShard replica : randomSubsetOf(followerGroup.getReplicas())) {
final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
replica.acquireReplicaOperationPermit(followingPrimary.getOperationPrimaryTerm(),
followingPrimary.getGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(),
permitFuture, ThreadPool.Names.SAME, primaryResult);
try (Releasable ignored = permitFuture.get()) {
TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger);
try(ReplicationGroup followerGroup = createFollowGroup(leaderGroup, between(1, 3))) {
followerGroup.startAll();
leaderGroup.appendDocs(between(10, 100));
leaderGroup.refresh("test");
for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) {
long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1;
Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(),
Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i);
for (IndexShard shard : leaderGroup) {
getEngine(shard).noOp(noOp);
}
}
}
// A follow-task retries these requests while the primary-replica resync is happening on the follower.
followerGroup.promoteReplicaToPrimary(randomFrom(followerGroup.getReplicas()));
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(),
leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo());
try {
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true);
});
} finally {
shardFollowTask.markAsCompleted();
for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) {
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId));
assertThat(resp.getFailure(), nullValue());
}
leaderGroup.syncGlobalCheckpoint();
IndexShard leadingPrimary = leaderGroup.getPrimary();
// Simulates some bulk requests are completed on the primary and replicated to some (but all) replicas of the follower
// but the primary of the follower crashed before these requests completed.
for (int numBulks = between(1, 5), i = 0; i < numBulks; i++) {
long fromSeqNo = randomLongBetween(0, leadingPrimary.getGlobalCheckpoint());
long toSeqNo = randomLongBetween(fromSeqNo, leadingPrimary.getGlobalCheckpoint());
int numOps = Math.toIntExact(toSeqNo + 1 - fromSeqNo);
Translog.Operation[] ops = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(),
fromSeqNo, numOps, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES));
IndexShard followingPrimary = followerGroup.getPrimary();
TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> primaryResult =
TransportBulkShardOperationsAction.shardOperationOnPrimary(followingPrimary.shardId(),
followingPrimary.getHistoryUUID(), Arrays.asList(ops), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(),
followingPrimary, logger);
for (IndexShard replica : randomSubsetOf(followerGroup.getReplicas())) {
final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
replica.acquireReplicaOperationPermit(followingPrimary.getOperationPrimaryTerm(),
followingPrimary.getGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(),
permitFuture, ThreadPool.Names.SAME, primaryResult);
try (Releasable ignored = permitFuture.get()) {
TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger);
}
}
}
// A follow-task retries these requests while the primary-replica resync is happening on the follower.
followerGroup.promoteReplicaToPrimary(randomFrom(followerGroup.getReplicas()));
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(),
leadingPrimary.getGlobalCheckpoint(),
leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
try {
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true);
});
} finally {
shardFollowTask.markAsCompleted();
}
}
}
}
@ -303,7 +326,17 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
operations.add(new Translog.Index("type", Integer.toString(i), i, primaryTerm, 0, source, null, -1));
}
Future<Void> recoveryFuture = null;
try (ReplicationGroup group = createFollowGroup(between(0, 1))) {
Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB))
.build();
IndexMetaData indexMetaData = buildIndexMetaData(between(0, 1), settings, indexMapping);
try (ReplicationGroup group = new ReplicationGroup(indexMetaData) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return new FollowingEngineFactory();
}
}) {
group.startAll();
while (operations.isEmpty() == false) {
List<Translog.Operation> bulkOps = randomSubsetOf(between(1, operations.size()), operations);
@ -330,35 +363,79 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
}
}
@Override
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000)
.put(settings)
.build();
if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(newSettings)) {
IndexMetaData metaData = buildIndexMetaData(replicas, newSettings, indexMapping);
return new ReplicationGroup(metaData) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return new FollowingEngineFactory();
public void testSimpleRemoteRecovery() throws Exception {
try (ReplicationGroup leader = createLeaderGroup(between(0, 1))) {
leader.startAll();
leader.appendDocs(between(0, 100));
leader.flush();
leader.syncGlobalCheckpoint();
try (ReplicationGroup follower = createFollowGroup(leader, 0)) {
follower.startAll();
ShardFollowNodeTask followTask = createShardFollowTask(leader, follower);
followTask.start(
follower.getPrimary().getHistoryUUID(),
leader.getPrimary().getGlobalCheckpoint(),
leader.getPrimary().seqNoStats().getMaxSeqNo(),
follower.getPrimary().getGlobalCheckpoint(),
follower.getPrimary().seqNoStats().getMaxSeqNo()
);
leader.appendDocs(between(0, 100));
if (randomBoolean()) {
follower.recoverReplica(follower.addReplica());
}
};
} else {
return super.createGroup(replicas, newSettings);
assertBusy(() -> assertConsistentHistoryBetweenLeaderAndFollower(leader, follower, false));
followTask.markAsCompleted();
}
}
}
private ReplicationGroup createFollowGroup(int replicas) throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
private ReplicationGroup createLeaderGroup(int replicas) throws IOException {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB));
return createGroup(replicas, settingsBuilder.build());
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000)
.build();
return createGroup(replicas, settings);
}
private ReplicationGroup createFollowGroup(ReplicationGroup leaderGroup, int replicas) throws IOException {
Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB))
.build();
IndexMetaData indexMetaData = buildIndexMetaData(replicas, settings, indexMapping);
return new ReplicationGroup(indexMetaData) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return new FollowingEngineFactory();
}
@Override
protected synchronized void recoverPrimary(IndexShard primary) {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID()));
ShardRouting routing = ShardRoutingHelper.newWithRestoreSource(primary.routingEntry(),
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test"));
primary.markAsRecovering("remote recovery from leader", new RecoveryState(routing, localNode, null));
primary.restoreFromRepository(new RestoreOnlyRepository(index.getName()) {
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version,
IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
IndexShard leader = leaderGroup.getPrimary();
Lucene.cleanLuceneIndex(primary.store().directory());
try (Engine.IndexCommitRef sourceCommit = leader.acquireSafeIndexCommit()) {
Store.MetadataSnapshot sourceSnapshot = leader.store().getMetadata(sourceCommit.getIndexCommit());
for (StoreFileMetaData md : sourceSnapshot) {
primary.store().directory().copyFrom(
leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT);
}
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
});
}
};
}
private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) {
@ -483,7 +560,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
final List<Tuple<String, Long>> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream()
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
final Set<Tuple<Long, Translog.Operation.Type>> operationsOnLeader = new HashSet<>();
try (Translog.Snapshot snapshot = leader.getPrimary().getHistoryOperations("test", 0)) {
try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType()));
@ -497,13 +574,13 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader));
final Set<Tuple<Long, Translog.Operation.Type>> operationsOnFollower = new HashSet<>();
try (Translog.Snapshot snapshot = followingShard.getHistoryOperations("test", 0)) {
try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType()));
}
}
assertThat(operationsOnFollower, equalTo(operationsOnLeader));
assertThat(followingShard.routingEntry().toString(), operationsOnFollower, equalTo(operationsOnLeader));
}
}