From 90ca5b1fdeede0aa83d51892ff7ff63ff715333a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 19 Oct 2018 12:38:06 -0400 Subject: [PATCH] Fill LocalCheckpointTracker with Lucene commit (#34474) Today we rely on the LocalCheckpointTracker to ensure no duplicate when enabling optimization using max_seq_no_of_updates. The problem is that the LocalCheckpointTracker is not fully reloaded when opening an engine with an out-of-order index commit. Suppose the starting commit has seq#0 and seq#2, then the current LocalCheckpointTracker would return "false" when asking if seq#2 was processed before although seq#2 in the commit. This change scans the existing sequence numbers in the starting commit, then marks these as completed in the LocalCheckpointTracker to ensure the consistent state between LocalCheckpointTracker and Lucene commit. --- .../elasticsearch/common/lucene/Lucene.java | 39 ++++++++++ .../index/engine/InternalEngine.java | 39 +++++++--- .../index/engine/InternalEngineTests.java | 71 +++++++++++++++++++ .../RecoveryDuringReplicationTests.java | 46 ++++++++++++ .../xpack/ccr/IndexFollowingIT.java | 58 +++++++++++++++ .../ShardFollowTaskReplicationTests.java | 39 ++++++++++ 6 files changed, 281 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 86db2dae2da..b384e45f56d 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -27,6 +27,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.document.LatLonDocValuesField; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; @@ -42,6 +43,7 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; @@ -82,6 +84,7 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import java.io.IOException; import java.text.ParseException; @@ -91,6 +94,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.LongConsumer; public class Lucene { public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70"; @@ -968,4 +972,39 @@ public class Lucene { public static NumericDocValuesField newSoftDeletesField() { return new NumericDocValuesField(SOFT_DELETES_FIELD, 1); } + + /** + * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive) + * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. + * + * @param directoryReader the directory reader to scan + * @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive) + * @param toSeqNo the upper bound of a range of seq_no to scan (inclusive) + * @param onNewSeqNo the callback to be called whenever a new valid sequence number is found + */ + public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo, + LongConsumer onNewSeqNo) throws IOException { + final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); + final IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + for (LeafReaderContext leaf : reader.leaves()) { + final Scorer scorer = weight.scorer(leaf); + if (scorer == null) { + continue; + } + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + int docId; + while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) { + throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId); + } + final long seqNo = seqNoDocValues.longValue(); + assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo; + onNewSeqNo.accept(seqNo); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 38ba0257491..2930fce4c02 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -102,6 +102,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Stream; public class InternalEngine extends Engine { @@ -189,7 +190,6 @@ public class InternalEngine extends Engine { translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier()); assert translog.getGeneration() != null; this.translog = translog; - this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled(); this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = @@ -223,6 +223,8 @@ public class InternalEngine extends Engine { for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) { this.internalSearcherManager.addListener(listener); } + this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger, + () -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier); this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); success = true; @@ -238,16 +240,29 @@ public class InternalEngine extends Engine { logger.trace("created new InternalEngine"); } - private LocalCheckpointTracker createLocalCheckpointTracker( - BiFunction localCheckpointTrackerSupplier) throws IOException { - final long maxSeqNo; - final long localCheckpoint; - final SequenceNumbers.CommitInfo seqNoStats = - SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet()); - maxSeqNo = seqNoStats.maxSeqNo; - localCheckpoint = seqNoStats.localCheckpoint; - logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); - return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); + private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos, + Logger logger, Supplier searcherSupplier, BiFunction localCheckpointTrackerSupplier) { + try { + final SequenceNumbers.CommitInfo seqNoStats = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet()); + final long maxSeqNo = seqNoStats.maxSeqNo; + final long localCheckpoint = seqNoStats.localCheckpoint; + logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); + final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); + // Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will + // create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed. + // Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and + // Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to + // disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker. + if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + try (Searcher searcher = searcherSupplier.get()) { + Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, tracker::markSeqNoAsCompleted); + } + } + return tracker; + } catch (IOException ex) { + throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex); + } } private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { @@ -665,6 +680,8 @@ public class InternalEngine extends Engine { } else if (op.seqNo() > docAndSeqNo.seqNo) { status = OpVsLuceneDocStatus.OP_NEWER; } else if (op.seqNo() == docAndSeqNo.seqNo) { + assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false : + "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id(); // load term to tie break final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field()); if (op.primaryTerm() > existingTerm) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 26c2453a271..f36476a9685 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5081,6 +5081,77 @@ public class InternalEngineTests extends EngineTestCase { } } + public void testRebuildLocalCheckpointTracker() throws Exception { + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Path translogPath = createTempDir(); + int numOps = scaledRandomIntBetween(1, 500); + List operations = new ArrayList<>(); + for (int i = 0; i < numOps; i++) { + long seqNo = i; + final ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null); + if (randomBoolean()) { + operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true)); + } else if (randomBoolean()) { + operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis())); + } else { + operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, + threadPool.relativeTimeInMillis(), "test-" + i)); + } + } + Randomness.shuffle(operations); + List> commits = new ArrayList<>(); + commits.add(new ArrayList<>()); + try (Store store = createStore()) { + EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get); + try (InternalEngine engine = createEngine(config)) { + List flushedOperations = new ArrayList<>(); + for (Engine.Operation op : operations) { + flushedOperations.add(op); + if (op instanceof Engine.Index) { + engine.index((Engine.Index) op); + } else if (op instanceof Engine.Delete) { + engine.delete((Engine.Delete) op); + } else { + engine.noOp((Engine.NoOp) op); + } + if (randomInt(100) < 10) { + engine.refresh("test"); + } + if (randomInt(100) < 5) { + engine.flush(); + commits.add(new ArrayList<>(flushedOperations)); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + } + } + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + engine.syncTranslog(); + } + trimUnsafeCommits(config); + List safeCommit = null; + for (int i = commits.size() - 1; i >= 0; i--) { + if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) { + safeCommit = commits.get(i); + break; + } + } + assertThat(safeCommit, notNullValue()); + try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog + final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker(); + for (Engine.Operation op : operations) { + assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(), + tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op))); + } + } + } + } + static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index e32161af7fe..05dac1ee324 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexableField; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -61,6 +62,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.anyOf; @@ -681,6 +683,50 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } } + public void testAddNewReplicas() throws Exception { + try (ReplicationGroup shards = createGroup(between(0, 1))) { + shards.startAll(); + Thread[] threads = new Thread[between(1, 3)]; + AtomicBoolean isStopped = new AtomicBoolean(); + boolean appendOnly = randomBoolean(); + AtomicInteger docId = new AtomicInteger(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + while (isStopped.get() == false) { + try { + if (appendOnly) { + String id = randomBoolean() ? Integer.toString(docId.incrementAndGet()) : null; + shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); + } else if (frequently()) { + String id = Integer.toString(frequently() ? docId.incrementAndGet() : between(0, 10)); + shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); + } else { + String id = Integer.toString(between(0, docId.get())); + shards.delete(new DeleteRequest(index.getName(), "type", id)); + } + if (randomInt(100) < 10) { + shards.getPrimary().flush(new FlushRequest()); + } + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }); + threads[i].start(); + } + assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(50))); + shards.getPrimary().sync(); + IndexShard newReplica = shards.addReplica(); + shards.recoverReplica(newReplica); + assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(100))); + isStopped.set(true); + for (Thread thread : threads) { + thread.join(); + } + assertBusy(() -> assertThat(getDocIdAndSeqNos(newReplica), equalTo(getDocIdAndSeqNos(shards.getPrimary())))); + } + } + public static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index fc6f0563e96..8a1ad645c36 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -691,6 +691,64 @@ public class IndexFollowingIT extends CCRIntegTestCase { assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]")); } + public void testAddNewReplicasOnFollower() throws Exception { + int numberOfReplicas = between(0, 1); + String leaderIndexSettings = getIndexSettings(1, numberOfReplicas, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON)); + PutFollowAction.Request follow = follow("leader-index", "follower-index"); + followerClient().execute(PutFollowAction.INSTANCE, follow).get(); + getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(2, 3)); + ensureFollowerGreen("follower-index"); + AtomicBoolean stopped = new AtomicBoolean(); + AtomicInteger docID = new AtomicInteger(); + boolean appendOnly = randomBoolean(); + Thread indexingOnLeader = new Thread(() -> { + while (stopped.get() == false) { + try { + if (appendOnly) { + String id = Integer.toString(docID.incrementAndGet()); + leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get(); + } else if (frequently()) { + String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 100)); + leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get(); + } else { + String id = Integer.toString(between(0, docID.get())); + leaderClient().prepareDelete("leader-index", "doc", id).get(); + } + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }); + indexingOnLeader.start(); + Thread flushingOnFollower = new Thread(() -> { + while (stopped.get() == false) { + try { + if (rarely()) { + followerClient().admin().indices().prepareFlush("follower-index").get(); + } + if (rarely()) { + followerClient().admin().indices().prepareRefresh("follower-index").get(); + } + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }); + flushingOnFollower.start(); + atLeastDocsIndexed(followerClient(), "follower-index", 50); + followerClient().admin().indices().prepareUpdateSettings("follower-index") + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get(); + ensureFollowerGreen("follower-index"); + atLeastDocsIndexed(followerClient(), "follower-index", 100); + stopped.set(true); + flushingOnFollower.join(); + indexingOnLeader.join(); + assertSameDocCount("leader-index", "follower-index"); + unfollowIndex("follower-index"); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 7b95252c866..ad9aebade8f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.PlainActionFuture; @@ -31,6 +32,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -40,12 +42,14 @@ import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -281,6 +285,41 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } } + public void testAddNewFollowingReplica() throws Exception { + final byte[] source = "{}".getBytes(StandardCharsets.UTF_8); + final int numDocs = between(1, 100); + final List operations = new ArrayList<>(numDocs); + for (int i = 0; i < numDocs; i++) { + operations.add(new Translog.Index("type", Integer.toString(i), i, primaryTerm, 0, source, null, -1)); + } + Future recoveryFuture = null; + try (ReplicationGroup group = createFollowGroup(between(0, 1))) { + group.startAll(); + while (operations.isEmpty() == false) { + List bulkOps = randomSubsetOf(between(1, operations.size()), operations); + operations.removeAll(bulkOps); + BulkShardOperationsRequest bulkRequest = new BulkShardOperationsRequest(group.getPrimary().shardId(), + group.getPrimary().getHistoryUUID(), bulkOps, -1); + new CCRAction(bulkRequest, new PlainActionFuture<>(), group).execute(); + if (randomInt(100) < 10) { + group.getPrimary().flush(new FlushRequest()); + } + if (recoveryFuture == null && (randomInt(100) < 10 || operations.isEmpty())) { + group.getPrimary().sync(); + IndexShard newReplica = group.addReplica(); + // We need to recover the replica async to release the main thread for the following task to fill missing + // operations between the local checkpoint and max_seq_no which the recovering replica is waiting for. + recoveryFuture = group.asyncRecoverReplica(newReplica, + (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener, l -> {}) {}); + } + } + if (recoveryFuture != null) { + recoveryFuture.get(); + } + group.assertAllEqual(numDocs); + } + } + @Override protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)