diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index ea8161c1589..5ebe13577f4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -678,12 +678,6 @@ public abstract class Engine implements Closeable { */ public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException; - /** - * Reset the local checkpoint in the tracker to the given local checkpoint - * @param localCheckpoint the new checkpoint to be set - */ - public abstract void resetLocalCheckpoint(long localCheckpoint); - /** * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint */ @@ -1165,11 +1159,16 @@ public abstract class Engine implements Closeable { PRIMARY, REPLICA, PEER_RECOVERY, - LOCAL_TRANSLOG_RECOVERY; + LOCAL_TRANSLOG_RECOVERY, + LOCAL_RESET; public boolean isRecovery() { return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY; } + + boolean isFromTranslog() { + return this == LOCAL_TRANSLOG_RECOVERY || this == LOCAL_RESET; + } } public Origin origin() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e8f5e415908..52dd4d3fcd0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -729,6 +729,7 @@ public class InternalEngine extends Engine { : "version: " + index.version() + " type: " + index.versionType(); return true; case LOCAL_TRANSLOG_RECOVERY: + case LOCAL_RESET: assert index.isRetry(); return true; // allow to optimize in order to update the max safe time stamp default: @@ -827,7 +828,7 @@ public class InternalEngine extends Engine { indexResult = new IndexResult( plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } - if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + if (index.origin().isFromTranslog() == false) { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Index(index, indexResult)); @@ -1167,7 +1168,7 @@ public class InternalEngine extends Engine { deleteResult = new DeleteResult( plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); } - if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + if (delete.origin().isFromTranslog() == false) { final Translog.Location location; if (deleteResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Delete(delete, deleteResult)); @@ -1405,7 +1406,7 @@ public class InternalEngine extends Engine { } } final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo()); - if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + if (noOp.origin().isFromTranslog() == false) { final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); } @@ -2324,11 +2325,6 @@ public class InternalEngine extends Engine { localCheckpointTracker.waitForOpsToComplete(seqNo); } - @Override - public void resetLocalCheckpoint(long localCheckpoint) { - localCheckpointTracker.resetCheckpoint(localCheckpoint); - } - @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return localCheckpointTracker.getStats(globalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index a55987d0a00..b958bd84b76 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -257,10 +257,6 @@ public final class ReadOnlyEngine extends Engine { public void waitForOpsToComplete(long seqNo) { } - @Override - public void resetLocalCheckpoint(long newCheckpoint) { - } - @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index cd33c1bf046..9fad96940b8 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -109,6 +109,7 @@ public class LocalCheckpointTracker { * @param checkpoint the local checkpoint to reset this tracker to */ public synchronized void resetCheckpoint(final long checkpoint) { + // TODO: remove this method as after we restore the local history on promotion. assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; assert checkpoint <= this.checkpoint; processedSeqNo.clear(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bceb106aeef..4bb56c8b0d3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -163,7 +163,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.index.mapper.SourceToParse.source; -import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { @@ -1273,16 +1272,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return result; } - // package-private for testing - int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { - recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); - recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); + /** + * Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}. + * The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully. + */ + int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, + Runnable onOperationRecovered) throws IOException { int opsRecovered = 0; Translog.Operation operation; while ((operation = snapshot.next()) != null) { try { logger.trace("[translog] recover op {}", operation); - Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); + Engine.Result result = applyTranslogOperation(operation, origin); switch (result.getResultType()) { case FAILURE: throw result.getFailure(); @@ -1295,7 +1296,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } opsRecovered++; - recoveryState.getTranslog().incrementRecoveredOperations(); + onOperationRecovered.run(); } catch (Exception e) { if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { // mainly for MapperParsingException and Failure to detect xcontent @@ -1313,8 +1314,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * Operations from the translog will be replayed to bring lucene up to date. **/ public void openEngineAndRecoverFromTranslog() throws IOException { + final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); + final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { + translogRecoveryStats.totalOperations(snapshot.totalOperations()); + translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); + return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, + translogRecoveryStats::incrementRecoveredOperations); + }; innerOpenEngineAndTranslog(); - getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE); + getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } /** @@ -1352,11 +1360,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); - - assertMaxUnsafeAutoIdInCommit(); - - final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); - store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); + trimUnsafeCommits(); createNewEngine(config); verifyNotClosed(); @@ -1367,6 +1371,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } + private void trimUnsafeCommits() throws IOException { + assert currentEngineReference.get() == null : "engine is running"; + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); + assertMaxUnsafeAutoIdInCommit(); + store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, indexSettings.getIndexVersionCreated()); + } + private boolean assertSequenceNumbersInCommit() throws IOException { final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; @@ -1463,7 +1476,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (origin == Engine.Operation.Origin.PRIMARY) { assert assertPrimaryMode(); } else { - assert origin == Engine.Operation.Origin.REPLICA; + assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET; assert assertReplicationTarget(); } if (writeAllowedStates.contains(state) == false) { @@ -2166,9 +2179,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private Engine createNewEngine(EngineConfig config) { synchronized (mutex) { - if (state == IndexShardState.CLOSED) { - throw new AlreadyClosedException(shardId + " can't create engine - shard is closed"); - } + verifyNotClosed(); assert this.currentEngineReference.get() == null; Engine engine = newEngine(config); onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen @@ -2314,19 +2325,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl bumpPrimaryTerm(opPrimaryTerm, () -> { updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); final long currentGlobalCheckpoint = getGlobalCheckpoint(); - final long localCheckpoint; - if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) { - localCheckpoint = NO_OPS_PERFORMED; + final long maxSeqNo = seqNoStats().getMaxSeqNo(); + logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", + opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); + if (currentGlobalCheckpoint < maxSeqNo) { + resetEngineToGlobalCheckpoint(); } else { - localCheckpoint = currentGlobalCheckpoint; + getEngine().rollTranslogGeneration(); } - logger.trace( - "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", - opPrimaryTerm, - getLocalCheckpoint(), - localCheckpoint); - getEngine().resetLocalCheckpoint(localCheckpoint); - getEngine().rollTranslogGeneration(); }); } } @@ -2687,4 +2693,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } }; } + + /** + * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. + */ + void resetEngineToGlobalCheckpoint() throws IOException { + assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; + sync(); // persist the global checkpoint to disk + final long globalCheckpoint = getGlobalCheckpoint(); + final Engine newEngine; + synchronized (mutex) { + verifyNotClosed(); + IOUtils.close(currentEngineReference.getAndSet(null)); + trimUnsafeCommits(); + newEngine = createNewEngine(newEngineConfig()); + active.set(true); + } + final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( + engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { + // TODO: add a dedicate recovery stats for the reset translog + }); + newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); + } } diff --git a/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java b/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java index ac2f2b0d4f3..c0b01eb5ec5 100644 --- a/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java +++ b/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java @@ -111,6 +111,7 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase { super.beforeIndexDeletion(); internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); assertSeqNos(); + assertSameDocIdsOnShards(); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 39132d805b2..8f9d90154f8 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4087,7 +4087,7 @@ public class InternalEngineTests extends EngineTestCase { final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint(); final long resetLocalCheckpoint = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint)); - actualEngine.resetLocalCheckpoint(resetLocalCheckpoint); + actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint); completedSeqNos.clear(); actualEngine.restoreLocalCheckpointFromTranslog(); final Set intersection = new HashSet<>(expectedCompletedSeqNos); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 4a5b89351bd..4080dd33d53 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import java.io.IOException; -import java.util.Set; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -43,7 +43,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); int numDocs = scaledRandomIntBetween(10, 1000); final SeqNoStats lastSeqNoStats; - final Set lastDocIds; + final List lastDocIds; try (InternalEngine engine = createEngine(config)) { Engine.Get get = null; for (int i = 0; i < numDocs; i++) { diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index e471874f6d6..f2cdfbf8fc5 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -519,18 +519,14 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase shards.promoteReplicaToPrimary(replica2).get(); logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2, true); - try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { + try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); final List expectedOps = new ArrayList<>(initOperations); expectedOps.add(op2); assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); } - // TODO: We should assert the content of shards in the ReplicationGroup. - // Without rollback replicas(current implementation), we don't have the same content across shards: - // - replica1 has {doc1} - // - replica2 has {doc1, doc2} - // - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery + shards.assertAllEqual(initDocs + 1); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 28122665e9b..a73d7385d9d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -55,10 +55,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -306,14 +304,6 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); } - - // roll back the extra ops in the replica - shards.removeReplica(replica); - replica.close("resync", false); - replica.store().close(); - newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); - shards.recoverReplica(newReplica); - shards.assertAllEqual(totalDocs); // Make sure that flushing on a recovering shard is ok. shards.flush(); shards.assertAllEqual(totalDocs); @@ -406,31 +396,14 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC indexOnReplica(bulkShardRequest, shards, justReplica); } - logger.info("--> seqNo primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats()); - - logger.info("--> resyncing replicas"); + logger.info("--> resyncing replicas seqno_stats primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats()); PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get(); if (syncedGlobalCheckPoint) { assertEquals(extraDocs, task.getResyncedOperations()); } else { assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs)); } - List replicas = shards.getReplicas(); - - // check all docs on primary are available on replica - Set primaryIds = getShardDocUIDs(newPrimary); - assertThat(primaryIds.size(), equalTo(initialDocs + extraDocs)); - for (IndexShard replica : replicas) { - Set replicaIds = getShardDocUIDs(replica); - Set temp = new HashSet<>(primaryIds); - temp.removeAll(replicaIds); - assertThat(replica.routingEntry() + " is missing docs", temp, empty()); - temp = new HashSet<>(replicaIds); - temp.removeAll(primaryIds); - // yeah, replica has more docs as there is no Lucene roll back on it - assertThat(replica.routingEntry() + " has to have extra docs", temp, - extraDocsToBeTrimmed > 0 ? not(empty()) : empty()); - } + shards.assertAllEqual(initialDocs + extraDocs); // check translog on replica is trimmed int translogOperations = 0; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 7f37846d3f0..d6b80a6f6d7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -106,6 +106,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -181,6 +182,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; /** * Simple unit-test IndexShard related operations. @@ -945,28 +947,24 @@ public class IndexShardTests extends IndexShardTestCase { resyncLatch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); - - closeShards(indexShard); + closeShard(indexShard, false); } - public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException { + public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); - final long globalCheckpointOnReplica = - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); - - final int globalCheckpoint = - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + Set docsBelowGlobalCheckpoint = getShardDocUIDs(indexShard).stream() + .filter(id -> Long.parseLong(id) <= Math.max(globalCheckpointOnReplica, globalCheckpoint)).collect(Collectors.toSet()); final CountDownLatch latch = new CountDownLatch(1); + final boolean shouldRollback = Math.max(globalCheckpoint, globalCheckpointOnReplica) < indexShard.seqNoStats().getMaxSeqNo(); + final Engine beforeRollbackEngine = indexShard.getEngine(); indexShard.acquireReplicaOperationPermit( indexShard.pendingPrimaryTerm + 1, globalCheckpoint, @@ -985,18 +983,21 @@ public class IndexShardTests extends IndexShardTestCase { ThreadPool.Names.SAME, ""); latch.await(); - if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO - && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { + if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); } else { assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); } - + assertThat(getShardDocUIDs(indexShard), equalTo(docsBelowGlobalCheckpoint)); + if (shouldRollback) { + assertThat(indexShard.getEngine(), not(sameInstance(beforeRollbackEngine))); + } else { + assertThat(indexShard.getEngine(), sameInstance(beforeRollbackEngine)); + } // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); - - closeShards(indexShard); + closeShard(indexShard, false); } public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { @@ -1880,13 +1881,17 @@ public class IndexShardTests extends IndexShardTestCase { SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); - // Simulate resync (without rollback): Noop #1, index #2 - acquireReplicaOperationPermitBlockingly(shard, shard.pendingPrimaryTerm + 1); + // Here we try to increase term (i.e. a new primary is promoted) without rolling back a replica so we can keep stale operations + // in the index commit; then verify that a recovery from store (started with the safe commit) will remove all stale operations. + shard.pendingPrimaryTerm++; + shard.operationPrimaryTerm++; + shard.getEngine().rollTranslogGeneration(); shard.markSeqNoAsNoop(1, "test"); shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2")); + closeShard(shard, false); // Recovering from store should discard doc #1 final ShardRouting replicaRouting = shard.routingEntry(); IndexShard newShard = reinitShard(shard, @@ -2249,10 +2254,11 @@ public class IndexShardTests extends IndexShardTestCase { null)); primary.recoverFromStore(); + primary.recoveryState().getTranslog().totalOperations(snapshot.totalOperations()); + primary.recoveryState().getTranslog().totalOperationsOnStart(snapshot.totalOperations()); primary.state = IndexShardState.RECOVERING; // translog recovery on the next line would otherwise fail as we are in POST_RECOVERY - primary.runTranslogRecovery(primary.getEngine(), snapshot); - assertThat(primary.recoveryState().getTranslog().totalOperationsOnStart(), equalTo(numTotalEntries)); - assertThat(primary.recoveryState().getTranslog().totalOperations(), equalTo(numTotalEntries)); + primary.runTranslogRecovery(primary.getEngine(), snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, + primary.recoveryState().getTranslog()::incrementRecoveredOperations); assertThat(primary.recoveryState().getTranslog().recoveredOperations(), equalTo(numTotalEntries - numCorruptEntries)); closeShards(primary); @@ -2865,6 +2871,9 @@ public class IndexShardTests extends IndexShardTestCase { } else { gap = true; } + if (rarely()) { + indexShard.flush(new FlushRequest()); + } } assert localCheckpoint == indexShard.getLocalCheckpoint(); assert !gap || (localCheckpoint != max); @@ -3402,4 +3411,19 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(shard); } + + public void testResetEngine() throws Exception { + IndexShard shard = newStartedShard(false); + indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); + final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()); + shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + Set docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream() + .filter(id -> Long.parseLong(id) <= globalCheckpoint).collect(Collectors.toSet()); + TranslogStats translogStats = shard.translogStats(); + shard.resetEngineToGlobalCheckpoint(); + assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint)); + assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint)); + assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations())); + closeShard(shard, false); + } } diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index cb93d803bb7..8d0f1845be6 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -103,6 +103,7 @@ public class RelocationIT extends ESIntegTestCase { protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); assertSeqNos(); + assertSameDocIdsOnShards(); } public void testSimpleRelocationNoIndexing() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java b/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java new file mode 100644 index 00000000000..b24a010c1a0 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + + +import java.util.Objects; + +/** A tuple of document id, sequence number and primary term of a document */ +public final class DocIdSeqNoAndTerm { + private final String id; + private final long seqNo; + private final long primaryTerm; + + public DocIdSeqNoAndTerm(String id, long seqNo, long primaryTerm) { + this.id = id; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + + public String getId() { + return id; + } + + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DocIdSeqNoAndTerm that = (DocIdSeqNoAndTerm) o; + return Objects.equals(id, that.id) && seqNo == that.seqNo && primaryTerm == that.primaryTerm; + } + + @Override + public int hashCode() { + return Objects.hash(id, seqNo, primaryTerm); + } + + @Override + public String toString() { + return "DocIdSeqNoAndTerm{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm + "}"; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 283a7b13753..f9377afe6ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; @@ -95,11 +96,10 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -775,26 +775,41 @@ public abstract class EngineTestCase extends ESTestCase { } /** - * Gets all docId from the given engine. + * Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine. */ - public static Set getDocIds(Engine engine, boolean refresh) throws IOException { + public static List getDocIds(Engine engine, boolean refresh) throws IOException { if (refresh) { engine.refresh("test_get_doc_ids"); } try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) { - Set ids = new HashSet<>(); + List docs = new ArrayList<>(); for (LeafReaderContext leafContext : searcher.reader().leaves()) { LeafReader reader = leafContext.reader(); + NumericDocValues seqNoDocValues = reader.getNumericDocValues(SeqNoFieldMapper.NAME); + NumericDocValues primaryTermDocValues = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); Bits liveDocs = reader.getLiveDocs(); for (int i = 0; i < reader.maxDoc(); i++) { if (liveDocs == null || liveDocs.get(i)) { Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME)); BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME); - ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length))); + String id = Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length)); + final long primaryTerm; + if (primaryTermDocValues.advanceExact(i)) { + primaryTerm = primaryTermDocValues.longValue(); + } else { + primaryTerm = 0; // non-root documents of a nested document. + } + if (seqNoDocValues.advanceExact(i) == false) { + throw new AssertionError("seqNoDocValues not found for doc[" + i + "] id[" + id + "]"); + } + final long seqNo = seqNoDocValues.longValue(); + docs.add(new DocIdSeqNoAndTerm(id, seqNo, primaryTerm)); } } } - return ids; + docs.sort(Comparator.comparing(DocIdSeqNoAndTerm::getId) + .thenComparingLong(DocIdSeqNoAndTerm::getSeqNo).thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)); + return docs; } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index ca2156144b3..a9e715a1129 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.query.DisabledQueryCache; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; @@ -82,12 +83,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; @@ -451,15 +454,20 @@ public abstract class IndexShardTestCase extends ESTestCase { closeShards(Arrays.asList(shards)); } + protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTranslogAndLucene) throws IOException { + try { + if (assertConsistencyBetweenTranslogAndLucene) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + } finally { + IOUtils.close(() -> shard.close("test", false), shard.store()); + } + } + protected void closeShards(Iterable shards) throws IOException { for (IndexShard shard : shards) { if (shard != null) { - try { - assertConsistentHistoryBetweenTranslogAndLucene(shard); - shard.close("test", false); - } finally { - IOUtils.close(shard.store()); - } + closeShard(shard, true); } } } @@ -635,7 +643,11 @@ public abstract class IndexShardTestCase extends ESTestCase { return result; } - protected Set getShardDocUIDs(final IndexShard shard) throws IOException { + public static Set getShardDocUIDs(final IndexShard shard) throws IOException { + return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toSet()); + } + + public static List getDocIdAndSeqNos(final IndexShard shard) throws IOException { return EngineTestCase.getDocIds(shard.getEngine(), true); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 68a862c109d..52ed2205ab5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -125,6 +125,7 @@ import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; @@ -132,6 +133,7 @@ import org.elasticsearch.index.mapper.MockFieldFilterPlugin; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -2380,6 +2382,49 @@ public abstract class ESIntegTestCase extends ESTestCase { }); } + /** + * Asserts that all shards with the same shardId should have document Ids. + */ + public void assertSameDocIdsOnShards() throws Exception { + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + for (ObjectObjectCursor indexRoutingTable : state.routingTable().indicesRouting()) { + for (IntObjectCursor indexShardRoutingTable : indexRoutingTable.value.shards()) { + ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard(); + if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) { + continue; + } + DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId()); + IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName()) + .indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id()); + final List docsOnPrimary; + try { + docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard); + } catch (AlreadyClosedException ex) { + continue; + } + for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) { + if (replicaShardRouting.assignedToNode() == false) { + continue; + } + DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId()); + IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName()) + .indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id()); + final List docsOnReplica; + try { + docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard); + } catch (AlreadyClosedException ex) { + continue; + } + assertThat("out of sync shards: primary=[" + primaryShardRouting + "] num_docs_on_primary=[" + docsOnPrimary.size() + + "] vs replica=[" + replicaShardRouting + "] num_docs_on_replica=[" + docsOnReplica.size() + "]", + docsOnReplica, equalTo(docsOnPrimary)); + } + } + } + }); + } + public static boolean inFipsJvm() { return Security.getProviders()[0].getName().toLowerCase(Locale.ROOT).contains("fips"); }