diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 86db2dae2da..b384e45f56d 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -27,6 +27,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.document.LatLonDocValuesField; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; @@ -42,6 +43,7 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; @@ -82,6 +84,7 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import java.io.IOException; import java.text.ParseException; @@ -91,6 +94,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.LongConsumer; public class Lucene { public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70"; @@ -968,4 +972,39 @@ public class Lucene { public static NumericDocValuesField newSoftDeletesField() { return new NumericDocValuesField(SOFT_DELETES_FIELD, 1); } + + /** + * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive) + * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. + * + * @param directoryReader the directory reader to scan + * @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive) + * @param toSeqNo the upper bound of a range of seq_no to scan (inclusive) + * @param onNewSeqNo the callback to be called whenever a new valid sequence number is found + */ + public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo, + LongConsumer onNewSeqNo) throws IOException { + final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); + final IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + for (LeafReaderContext leaf : reader.leaves()) { + final Scorer scorer = weight.scorer(leaf); + if (scorer == null) { + continue; + } + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + int docId; + while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) { + throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId); + } + final long seqNo = seqNoDocValues.longValue(); + assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo; + onNewSeqNo.accept(seqNo); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 38ba0257491..2930fce4c02 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -102,6 +102,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Stream; public class InternalEngine extends Engine { @@ -189,7 +190,6 @@ public class InternalEngine extends Engine { translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier()); assert translog.getGeneration() != null; this.translog = translog; - this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled(); this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = @@ -223,6 +223,8 @@ public class InternalEngine extends Engine { for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) { this.internalSearcherManager.addListener(listener); } + this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger, + () -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier); this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); success = true; @@ -238,16 +240,29 @@ public class InternalEngine extends Engine { logger.trace("created new InternalEngine"); } - private LocalCheckpointTracker createLocalCheckpointTracker( - BiFunction localCheckpointTrackerSupplier) throws IOException { - final long maxSeqNo; - final long localCheckpoint; - final SequenceNumbers.CommitInfo seqNoStats = - SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet()); - maxSeqNo = seqNoStats.maxSeqNo; - localCheckpoint = seqNoStats.localCheckpoint; - logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); - return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); + private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos, + Logger logger, Supplier searcherSupplier, BiFunction localCheckpointTrackerSupplier) { + try { + final SequenceNumbers.CommitInfo seqNoStats = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet()); + final long maxSeqNo = seqNoStats.maxSeqNo; + final long localCheckpoint = seqNoStats.localCheckpoint; + logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); + final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); + // Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will + // create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed. + // Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and + // Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to + // disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker. + if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + try (Searcher searcher = searcherSupplier.get()) { + Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, tracker::markSeqNoAsCompleted); + } + } + return tracker; + } catch (IOException ex) { + throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex); + } } private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { @@ -665,6 +680,8 @@ public class InternalEngine extends Engine { } else if (op.seqNo() > docAndSeqNo.seqNo) { status = OpVsLuceneDocStatus.OP_NEWER; } else if (op.seqNo() == docAndSeqNo.seqNo) { + assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false : + "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id(); // load term to tie break final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field()); if (op.primaryTerm() > existingTerm) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 26c2453a271..f36476a9685 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5081,6 +5081,77 @@ public class InternalEngineTests extends EngineTestCase { } } + public void testRebuildLocalCheckpointTracker() throws Exception { + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Path translogPath = createTempDir(); + int numOps = scaledRandomIntBetween(1, 500); + List operations = new ArrayList<>(); + for (int i = 0; i < numOps; i++) { + long seqNo = i; + final ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null); + if (randomBoolean()) { + operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true)); + } else if (randomBoolean()) { + operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis())); + } else { + operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, + threadPool.relativeTimeInMillis(), "test-" + i)); + } + } + Randomness.shuffle(operations); + List> commits = new ArrayList<>(); + commits.add(new ArrayList<>()); + try (Store store = createStore()) { + EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get); + try (InternalEngine engine = createEngine(config)) { + List flushedOperations = new ArrayList<>(); + for (Engine.Operation op : operations) { + flushedOperations.add(op); + if (op instanceof Engine.Index) { + engine.index((Engine.Index) op); + } else if (op instanceof Engine.Delete) { + engine.delete((Engine.Delete) op); + } else { + engine.noOp((Engine.NoOp) op); + } + if (randomInt(100) < 10) { + engine.refresh("test"); + } + if (randomInt(100) < 5) { + engine.flush(); + commits.add(new ArrayList<>(flushedOperations)); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + } + } + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + engine.syncTranslog(); + } + trimUnsafeCommits(config); + List safeCommit = null; + for (int i = commits.size() - 1; i >= 0; i--) { + if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) { + safeCommit = commits.get(i); + break; + } + } + assertThat(safeCommit, notNullValue()); + try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog + final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker(); + for (Engine.Operation op : operations) { + assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(), + tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op))); + } + } + } + } + static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index e32161af7fe..05dac1ee324 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexableField; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -61,6 +62,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.anyOf; @@ -681,6 +683,50 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } } + public void testAddNewReplicas() throws Exception { + try (ReplicationGroup shards = createGroup(between(0, 1))) { + shards.startAll(); + Thread[] threads = new Thread[between(1, 3)]; + AtomicBoolean isStopped = new AtomicBoolean(); + boolean appendOnly = randomBoolean(); + AtomicInteger docId = new AtomicInteger(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + while (isStopped.get() == false) { + try { + if (appendOnly) { + String id = randomBoolean() ? Integer.toString(docId.incrementAndGet()) : null; + shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); + } else if (frequently()) { + String id = Integer.toString(frequently() ? docId.incrementAndGet() : between(0, 10)); + shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); + } else { + String id = Integer.toString(between(0, docId.get())); + shards.delete(new DeleteRequest(index.getName(), "type", id)); + } + if (randomInt(100) < 10) { + shards.getPrimary().flush(new FlushRequest()); + } + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }); + threads[i].start(); + } + assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(50))); + shards.getPrimary().sync(); + IndexShard newReplica = shards.addReplica(); + shards.recoverReplica(newReplica); + assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(100))); + isStopped.set(true); + for (Thread thread : threads) { + thread.join(); + } + assertBusy(() -> assertThat(getDocIdAndSeqNos(newReplica), equalTo(getDocIdAndSeqNos(shards.getPrimary())))); + } + } + public static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index fc6f0563e96..8a1ad645c36 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -691,6 +691,64 @@ public class IndexFollowingIT extends CCRIntegTestCase { assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]")); } + public void testAddNewReplicasOnFollower() throws Exception { + int numberOfReplicas = between(0, 1); + String leaderIndexSettings = getIndexSettings(1, numberOfReplicas, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON)); + PutFollowAction.Request follow = follow("leader-index", "follower-index"); + followerClient().execute(PutFollowAction.INSTANCE, follow).get(); + getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(2, 3)); + ensureFollowerGreen("follower-index"); + AtomicBoolean stopped = new AtomicBoolean(); + AtomicInteger docID = new AtomicInteger(); + boolean appendOnly = randomBoolean(); + Thread indexingOnLeader = new Thread(() -> { + while (stopped.get() == false) { + try { + if (appendOnly) { + String id = Integer.toString(docID.incrementAndGet()); + leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get(); + } else if (frequently()) { + String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 100)); + leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get(); + } else { + String id = Integer.toString(between(0, docID.get())); + leaderClient().prepareDelete("leader-index", "doc", id).get(); + } + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }); + indexingOnLeader.start(); + Thread flushingOnFollower = new Thread(() -> { + while (stopped.get() == false) { + try { + if (rarely()) { + followerClient().admin().indices().prepareFlush("follower-index").get(); + } + if (rarely()) { + followerClient().admin().indices().prepareRefresh("follower-index").get(); + } + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }); + flushingOnFollower.start(); + atLeastDocsIndexed(followerClient(), "follower-index", 50); + followerClient().admin().indices().prepareUpdateSettings("follower-index") + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get(); + ensureFollowerGreen("follower-index"); + atLeastDocsIndexed(followerClient(), "follower-index", 100); + stopped.set(true); + flushingOnFollower.join(); + indexingOnLeader.join(); + assertSameDocCount("leader-index", "follower-index"); + unfollowIndex("follower-index"); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 7b95252c866..ad9aebade8f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.PlainActionFuture; @@ -31,6 +32,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -40,12 +42,14 @@ import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -281,6 +285,41 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } } + public void testAddNewFollowingReplica() throws Exception { + final byte[] source = "{}".getBytes(StandardCharsets.UTF_8); + final int numDocs = between(1, 100); + final List operations = new ArrayList<>(numDocs); + for (int i = 0; i < numDocs; i++) { + operations.add(new Translog.Index("type", Integer.toString(i), i, primaryTerm, 0, source, null, -1)); + } + Future recoveryFuture = null; + try (ReplicationGroup group = createFollowGroup(between(0, 1))) { + group.startAll(); + while (operations.isEmpty() == false) { + List bulkOps = randomSubsetOf(between(1, operations.size()), operations); + operations.removeAll(bulkOps); + BulkShardOperationsRequest bulkRequest = new BulkShardOperationsRequest(group.getPrimary().shardId(), + group.getPrimary().getHistoryUUID(), bulkOps, -1); + new CCRAction(bulkRequest, new PlainActionFuture<>(), group).execute(); + if (randomInt(100) < 10) { + group.getPrimary().flush(new FlushRequest()); + } + if (recoveryFuture == null && (randomInt(100) < 10 || operations.isEmpty())) { + group.getPrimary().sync(); + IndexShard newReplica = group.addReplica(); + // We need to recover the replica async to release the main thread for the following task to fill missing + // operations between the local checkpoint and max_seq_no which the recovering replica is waiting for. + recoveryFuture = group.asyncRecoverReplica(newReplica, + (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener, l -> {}) {}); + } + } + if (recoveryFuture != null) { + recoveryFuture.get(); + } + group.assertAllEqual(numDocs); + } + } + @Override protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)