Reset replica engine to global checkpoint on promotion (#33473)

When a replica starts following a newly promoted primary, it may have
some operations which don't exist on the new primary. Thus we need to
throw those operations to align a replica with the new primary. This can
be done by first resetting an engine from the safe commit, then replaying
the local translog up to the global checkpoint.

Relates #32867
This commit is contained in:
Nhat Nguyen 2018-09-11 22:09:37 -04:00 committed by GitHub
parent 27e07ec859
commit 743327efc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 274 additions and 121 deletions

View File

@ -678,12 +678,6 @@ public abstract class Engine implements Closeable {
*/
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;
/**
* Reset the local checkpoint in the tracker to the given local checkpoint
* @param localCheckpoint the new checkpoint to be set
*/
public abstract void resetLocalCheckpoint(long localCheckpoint);
/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
@ -1165,11 +1159,16 @@ public abstract class Engine implements Closeable {
PRIMARY,
REPLICA,
PEER_RECOVERY,
LOCAL_TRANSLOG_RECOVERY;
LOCAL_TRANSLOG_RECOVERY,
LOCAL_RESET;
public boolean isRecovery() {
return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY;
}
boolean isFromTranslog() {
return this == LOCAL_TRANSLOG_RECOVERY || this == LOCAL_RESET;
}
}
public Origin origin() {

View File

@ -729,6 +729,7 @@ public class InternalEngine extends Engine {
: "version: " + index.version() + " type: " + index.versionType();
return true;
case LOCAL_TRANSLOG_RECOVERY:
case LOCAL_RESET:
assert index.isRetry();
return true; // allow to optimize in order to update the max safe time stamp
default:
@ -827,7 +828,7 @@ public class InternalEngine extends Engine {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
@ -1167,7 +1168,7 @@ public class InternalEngine extends Engine {
deleteResult = new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
}
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (delete.origin().isFromTranslog() == false) {
final Translog.Location location;
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Delete(delete, deleteResult));
@ -1405,7 +1406,7 @@ public class InternalEngine extends Engine {
}
}
final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo());
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (noOp.origin().isFromTranslog() == false) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
}
@ -2324,11 +2325,6 @@ public class InternalEngine extends Engine {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}
@Override
public void resetLocalCheckpoint(long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}
@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);

View File

@ -257,10 +257,6 @@ public final class ReadOnlyEngine extends Engine {
public void waitForOpsToComplete(long seqNo) {
}
@Override
public void resetLocalCheckpoint(long newCheckpoint) {
}
@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);

View File

@ -109,6 +109,7 @@ public class LocalCheckpointTracker {
* @param checkpoint the local checkpoint to reset this tracker to
*/
public synchronized void resetCheckpoint(final long checkpoint) {
// TODO: remove this method as after we restore the local history on promotion.
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();

View File

@ -163,7 +163,6 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
@ -1273,16 +1272,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return result;
}
// package-private for testing
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
/**
* Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}.
* The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully.
*/
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin,
Runnable onOperationRecovered) throws IOException {
int opsRecovered = 0;
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
Engine.Result result = applyTranslogOperation(operation, origin);
switch (result.getResultType()) {
case FAILURE:
throw result.getFailure();
@ -1295,7 +1296,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
onOperationRecovered.run();
} catch (Exception e) {
if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
@ -1313,8 +1314,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations());
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE);
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}
/**
@ -1352,11 +1360,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
assertMaxUnsafeAutoIdInCommit();
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
trimUnsafeCommits();
createNewEngine(config);
verifyNotClosed();
@ -1367,6 +1371,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}
private void trimUnsafeCommits() throws IOException {
assert currentEngineReference.get() == null : "engine is running";
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
assertMaxUnsafeAutoIdInCommit();
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, indexSettings.getIndexVersionCreated());
}
private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
@ -1463,7 +1476,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (origin == Engine.Operation.Origin.PRIMARY) {
assert assertPrimaryMode();
} else {
assert origin == Engine.Operation.Origin.REPLICA;
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET;
assert assertReplicationTarget();
}
if (writeAllowedStates.contains(state) == false) {
@ -2166,9 +2179,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private Engine createNewEngine(EngineConfig config) {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed");
}
verifyNotClosed();
assert this.currentEngineReference.get() == null;
Engine engine = newEngine(config);
onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen
@ -2314,19 +2325,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) {
localCheckpoint = NO_OPS_PERFORMED;
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
localCheckpoint = currentGlobalCheckpoint;
getEngine().rollTranslogGeneration();
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
opPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
});
}
}
@ -2687,4 +2693,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
};
}
/**
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
sync(); // persist the global checkpoint to disk
final long globalCheckpoint = getGlobalCheckpoint();
final Engine newEngine;
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(null));
trimUnsafeCommits();
newEngine = createNewEngine(newEngineConfig());
active.set(true);
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
}
}

View File

@ -111,6 +111,7 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
super.beforeIndexDeletion();
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
assertSeqNos();
assertSameDocIdsOnShards();
}
}

View File

@ -4087,7 +4087,7 @@ public class InternalEngineTests extends EngineTestCase {
final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
final long resetLocalCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
actualEngine.resetLocalCheckpoint(resetLocalCheckpoint);
actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
completedSeqNos.clear();
actualEngine.restoreLocalCheckpointFromTranslog();
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);

View File

@ -27,7 +27,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.Set;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -43,7 +43,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 1000);
final SeqNoStats lastSeqNoStats;
final Set<String> lastDocIds;
final List<DocIdSeqNoAndTerm> lastDocIds;
try (InternalEngine engine = createEngine(config)) {
Engine.Get get = null;
for (int i = 0; i < numDocs; i++) {

View File

@ -519,18 +519,14 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
shards.promoteReplicaToPrimary(replica2).get();
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
expectedOps.add(op2);
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
}
// TODO: We should assert the content of shards in the ReplicationGroup.
// Without rollback replicas(current implementation), we don't have the same content across shards:
// - replica1 has {doc1}
// - replica2 has {doc1, doc2}
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery
shards.assertAllEqual(initDocs + 1);
}
}

View File

@ -55,10 +55,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@ -306,14 +304,6 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
}
// roll back the extra ops in the replica
shards.removeReplica(replica);
replica.close("resync", false);
replica.store().close();
newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
shards.assertAllEqual(totalDocs);
// Make sure that flushing on a recovering shard is ok.
shards.flush();
shards.assertAllEqual(totalDocs);
@ -406,31 +396,14 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
indexOnReplica(bulkShardRequest, shards, justReplica);
}
logger.info("--> seqNo primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats());
logger.info("--> resyncing replicas");
logger.info("--> resyncing replicas seqno_stats primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats());
PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get();
if (syncedGlobalCheckPoint) {
assertEquals(extraDocs, task.getResyncedOperations());
} else {
assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs));
}
List<IndexShard> replicas = shards.getReplicas();
// check all docs on primary are available on replica
Set<String> primaryIds = getShardDocUIDs(newPrimary);
assertThat(primaryIds.size(), equalTo(initialDocs + extraDocs));
for (IndexShard replica : replicas) {
Set<String> replicaIds = getShardDocUIDs(replica);
Set<String> temp = new HashSet<>(primaryIds);
temp.removeAll(replicaIds);
assertThat(replica.routingEntry() + " is missing docs", temp, empty());
temp = new HashSet<>(replicaIds);
temp.removeAll(primaryIds);
// yeah, replica has more docs as there is no Lucene roll back on it
assertThat(replica.routingEntry() + " has to have extra docs", temp,
extraDocsToBeTrimmed > 0 ? not(empty()) : empty());
}
shards.assertAllEqual(initialDocs + extraDocs);
// check translog on replica is trimmed
int translogOperations = 0;

View File

@ -106,6 +106,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogTests;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -181,6 +182,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
/**
* Simple unit-test IndexShard related operations.
@ -945,28 +947,24 @@ public class IndexShardTests extends IndexShardTestCase {
resyncLatch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
closeShards(indexShard);
closeShard(indexShard, false);
}
public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
final long globalCheckpointOnReplica =
randomIntBetween(
Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
Set<String> docsBelowGlobalCheckpoint = getShardDocUIDs(indexShard).stream()
.filter(id -> Long.parseLong(id) <= Math.max(globalCheckpointOnReplica, globalCheckpoint)).collect(Collectors.toSet());
final CountDownLatch latch = new CountDownLatch(1);
final boolean shouldRollback = Math.max(globalCheckpoint, globalCheckpointOnReplica) < indexShard.seqNoStats().getMaxSeqNo();
final Engine beforeRollbackEngine = indexShard.getEngine();
indexShard.acquireReplicaOperationPermit(
indexShard.pendingPrimaryTerm + 1,
globalCheckpoint,
@ -985,18 +983,21 @@ public class IndexShardTests extends IndexShardTestCase {
ThreadPool.Names.SAME, "");
latch.await();
if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO
&& globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
} else {
assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
}
assertThat(getShardDocUIDs(indexShard), equalTo(docsBelowGlobalCheckpoint));
if (shouldRollback) {
assertThat(indexShard.getEngine(), not(sameInstance(beforeRollbackEngine)));
} else {
assertThat(indexShard.getEngine(), sameInstance(beforeRollbackEngine));
}
// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));
closeShards(indexShard);
closeShard(indexShard, false);
}
public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
@ -1880,13 +1881,17 @@ public class IndexShardTests extends IndexShardTestCase {
SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON));
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
// Simulate resync (without rollback): Noop #1, index #2
acquireReplicaOperationPermitBlockingly(shard, shard.pendingPrimaryTerm + 1);
// Here we try to increase term (i.e. a new primary is promoted) without rolling back a replica so we can keep stale operations
// in the index commit; then verify that a recovery from store (started with the safe commit) will remove all stale operations.
shard.pendingPrimaryTerm++;
shard.operationPrimaryTerm++;
shard.getEngine().rollTranslogGeneration();
shard.markSeqNoAsNoop(1, "test");
shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON));
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2"));
closeShard(shard, false);
// Recovering from store should discard doc #1
final ShardRouting replicaRouting = shard.routingEntry();
IndexShard newShard = reinitShard(shard,
@ -2249,10 +2254,11 @@ public class IndexShardTests extends IndexShardTestCase {
null));
primary.recoverFromStore();
primary.recoveryState().getTranslog().totalOperations(snapshot.totalOperations());
primary.recoveryState().getTranslog().totalOperationsOnStart(snapshot.totalOperations());
primary.state = IndexShardState.RECOVERING; // translog recovery on the next line would otherwise fail as we are in POST_RECOVERY
primary.runTranslogRecovery(primary.getEngine(), snapshot);
assertThat(primary.recoveryState().getTranslog().totalOperationsOnStart(), equalTo(numTotalEntries));
assertThat(primary.recoveryState().getTranslog().totalOperations(), equalTo(numTotalEntries));
primary.runTranslogRecovery(primary.getEngine(), snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
primary.recoveryState().getTranslog()::incrementRecoveredOperations);
assertThat(primary.recoveryState().getTranslog().recoveredOperations(), equalTo(numTotalEntries - numCorruptEntries));
closeShards(primary);
@ -2865,6 +2871,9 @@ public class IndexShardTests extends IndexShardTestCase {
} else {
gap = true;
}
if (rarely()) {
indexShard.flush(new FlushRequest());
}
}
assert localCheckpoint == indexShard.getLocalCheckpoint();
assert !gap || (localCheckpoint != max);
@ -3402,4 +3411,19 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(shard);
}
public void testResetEngine() throws Exception {
IndexShard shard = newStartedShard(false);
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint());
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
Set<String> docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream()
.filter(id -> Long.parseLong(id) <= globalCheckpoint).collect(Collectors.toSet());
TranslogStats translogStats = shard.translogStats();
shard.resetEngineToGlobalCheckpoint();
assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint));
assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint));
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));
closeShard(shard, false);
}
}

View File

@ -103,6 +103,7 @@ public class RelocationIT extends ESIntegTestCase {
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
assertSeqNos();
assertSameDocIdsOnShards();
}
public void testSimpleRelocationNoIndexing() {

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import java.util.Objects;
/** A tuple of document id, sequence number and primary term of a document */
public final class DocIdSeqNoAndTerm {
private final String id;
private final long seqNo;
private final long primaryTerm;
public DocIdSeqNoAndTerm(String id, long seqNo, long primaryTerm) {
this.id = id;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}
public String getId() {
return id;
}
public long getSeqNo() {
return seqNo;
}
public long getPrimaryTerm() {
return primaryTerm;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DocIdSeqNoAndTerm that = (DocIdSeqNoAndTerm) o;
return Objects.equals(id, that.id) && seqNo == that.seqNo && primaryTerm == that.primaryTerm;
}
@Override
public int hashCode() {
return Objects.hash(id, seqNo, primaryTerm);
}
@Override
public String toString() {
return "DocIdSeqNoAndTerm{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm + "}";
}
}

View File

@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
@ -95,11 +96,10 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -775,26 +775,41 @@ public abstract class EngineTestCase extends ESTestCase {
}
/**
* Gets all docId from the given engine.
* Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine.
*/
public static Set<String> getDocIds(Engine engine, boolean refresh) throws IOException {
public static List<DocIdSeqNoAndTerm> getDocIds(Engine engine, boolean refresh) throws IOException {
if (refresh) {
engine.refresh("test_get_doc_ids");
}
try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) {
Set<String> ids = new HashSet<>();
List<DocIdSeqNoAndTerm> docs = new ArrayList<>();
for (LeafReaderContext leafContext : searcher.reader().leaves()) {
LeafReader reader = leafContext.reader();
NumericDocValues seqNoDocValues = reader.getNumericDocValues(SeqNoFieldMapper.NAME);
NumericDocValues primaryTermDocValues = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
Bits liveDocs = reader.getLiveDocs();
for (int i = 0; i < reader.maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME));
BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME);
ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length)));
String id = Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length));
final long primaryTerm;
if (primaryTermDocValues.advanceExact(i)) {
primaryTerm = primaryTermDocValues.longValue();
} else {
primaryTerm = 0; // non-root documents of a nested document.
}
if (seqNoDocValues.advanceExact(i) == false) {
throw new AssertionError("seqNoDocValues not found for doc[" + i + "] id[" + id + "]");
}
final long seqNo = seqNoDocValues.longValue();
docs.add(new DocIdSeqNoAndTerm(id, seqNo, primaryTerm));
}
}
}
return ids;
docs.sort(Comparator.comparing(DocIdSeqNoAndTerm::getId)
.thenComparingLong(DocIdSeqNoAndTerm::getSeqNo).thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm));
return docs;
}
}

View File

@ -49,6 +49,7 @@ import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.EngineTestCase;
@ -82,12 +83,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.contains;
@ -451,15 +454,20 @@ public abstract class IndexShardTestCase extends ESTestCase {
closeShards(Arrays.asList(shards));
}
protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTranslogAndLucene) throws IOException {
try {
if (assertConsistencyBetweenTranslogAndLucene) {
assertConsistentHistoryBetweenTranslogAndLucene(shard);
}
} finally {
IOUtils.close(() -> shard.close("test", false), shard.store());
}
}
protected void closeShards(Iterable<IndexShard> shards) throws IOException {
for (IndexShard shard : shards) {
if (shard != null) {
try {
assertConsistentHistoryBetweenTranslogAndLucene(shard);
shard.close("test", false);
} finally {
IOUtils.close(shard.store());
}
closeShard(shard, true);
}
}
}
@ -635,7 +643,11 @@ public abstract class IndexShardTestCase extends ESTestCase {
return result;
}
protected Set<String> getShardDocUIDs(final IndexShard shard) throws IOException {
public static Set<String> getShardDocUIDs(final IndexShard shard) throws IOException {
return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toSet());
}
public static List<DocIdSeqNoAndTerm> getDocIdAndSeqNos(final IndexShard shard) throws IOException {
return EngineTestCase.getDocIds(shard.getEngine(), true);
}

View File

@ -125,6 +125,7 @@ import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
@ -132,6 +133,7 @@ import org.elasticsearch.index.mapper.MockFieldFilterPlugin;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
@ -2380,6 +2382,49 @@ public abstract class ESIntegTestCase extends ESTestCase {
});
}
/**
* Asserts that all shards with the same shardId should have document Ids.
*/
public void assertSameDocIdsOnShards() throws Exception {
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
continue;
}
DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName())
.indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
final List<DocIdSeqNoAndTerm> docsOnPrimary;
try {
docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard);
} catch (AlreadyClosedException ex) {
continue;
}
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
if (replicaShardRouting.assignedToNode() == false) {
continue;
}
DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName())
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
final List<DocIdSeqNoAndTerm> docsOnReplica;
try {
docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard);
} catch (AlreadyClosedException ex) {
continue;
}
assertThat("out of sync shards: primary=[" + primaryShardRouting + "] num_docs_on_primary=[" + docsOnPrimary.size()
+ "] vs replica=[" + replicaShardRouting + "] num_docs_on_replica=[" + docsOnReplica.size() + "]",
docsOnReplica, equalTo(docsOnPrimary));
}
}
}
});
}
public static boolean inFipsJvm() {
return Security.getProviders()[0].getName().toLowerCase(Locale.ROOT).contains("fips");
}