Exposed engine must have all ops below gcp during rollback (#36159)
Today we expose a new engine immediately during Lucene rollback. The new engine is started with a safe commit which might not include all acknowledged operation. With this change, we won't expose the new engine until it has recovered from the local translog. Note that this solution is not complete since it's able to reserve only acknowledged operations before the global checkpoint. This is because we replay translog up to the global checkpoint during rollback. A per-doc Lucene rollback would solve this issue entirely. Relates #32867
This commit is contained in:
parent
d41cf6ac9f
commit
902d6f579a
|
@ -81,6 +81,7 @@ import org.elasticsearch.index.engine.Engine.GetResult;
|
|||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.engine.ReadOnlyEngine;
|
||||
import org.elasticsearch.index.engine.RefreshFailedEngineException;
|
||||
import org.elasticsearch.index.engine.Segment;
|
||||
import org.elasticsearch.index.engine.SegmentsStats;
|
||||
|
@ -155,6 +156,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -686,20 +688,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
|
||||
long autoGeneratedTimestamp, boolean isRetry) throws IOException {
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
return applyIndexOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp,
|
||||
return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp,
|
||||
isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
|
||||
}
|
||||
|
||||
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
|
||||
boolean isRetry, SourceToParse sourceToParse)
|
||||
throws IOException {
|
||||
return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
|
||||
return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
|
||||
Engine.Operation.Origin.REPLICA, sourceToParse);
|
||||
}
|
||||
|
||||
private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType,
|
||||
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
|
||||
SourceToParse sourceToParse) throws IOException {
|
||||
private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version,
|
||||
@Nullable VersionType versionType, long autoGeneratedTimeStamp, boolean isRetry,
|
||||
Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException {
|
||||
assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
|
||||
+ "]";
|
||||
ensureWriteAllowed(origin);
|
||||
|
@ -721,7 +723,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
|
||||
}
|
||||
|
||||
return index(getEngine(), operation);
|
||||
return index(engine, operation);
|
||||
}
|
||||
|
||||
public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version indexCreatedVersion, SourceToParse source, long seqNo,
|
||||
|
@ -755,17 +757,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException {
|
||||
return markSeqNoAsNoop(seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA);
|
||||
return markSeqNoAsNoop(getEngine(), seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA);
|
||||
}
|
||||
|
||||
private Engine.NoOpResult markSeqNoAsNoop(long seqNo, long opPrimaryTerm, String reason,
|
||||
private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason,
|
||||
Engine.Operation.Origin origin) throws IOException {
|
||||
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
|
||||
+ "]";
|
||||
long startTime = System.nanoTime();
|
||||
ensureWriteAllowed(origin);
|
||||
final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason);
|
||||
return noOp(getEngine(), noOp);
|
||||
return noOp(engine, noOp);
|
||||
}
|
||||
|
||||
private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
|
||||
|
@ -787,15 +789,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType)
|
||||
throws IOException {
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
return applyDeleteOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
|
||||
return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
|
||||
Engine.Operation.Origin.PRIMARY);
|
||||
}
|
||||
|
||||
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
|
||||
return applyDeleteOperation(seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
|
||||
return applyDeleteOperation(getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
|
||||
}
|
||||
|
||||
private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
|
||||
private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id,
|
||||
@Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException {
|
||||
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
|
||||
+ "]";
|
||||
|
@ -826,7 +828,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
|
||||
final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version,
|
||||
versionType, origin);
|
||||
return delete(getEngine(), delete);
|
||||
return delete(engine, delete);
|
||||
}
|
||||
|
||||
private Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
|
||||
|
@ -1265,6 +1267,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
|
||||
return applyTranslogOperation(getEngine(), operation, origin);
|
||||
}
|
||||
|
||||
private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation operation,
|
||||
Engine.Operation.Origin origin) throws IOException {
|
||||
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
|
||||
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
|
||||
final Engine.Result result;
|
||||
|
@ -1273,19 +1280,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final Translog.Index index = (Translog.Index) operation;
|
||||
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
|
||||
// autoGeneratedID docs that are coming from the primary are updated correctly.
|
||||
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
|
||||
result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(),
|
||||
versionType, index.getAutoGeneratedIdTimestamp(), true, origin,
|
||||
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
|
||||
XContentHelper.xContentType(index.source())).routing(index.routing()));
|
||||
break;
|
||||
case DELETE:
|
||||
final Translog.Delete delete = (Translog.Delete) operation;
|
||||
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
|
||||
result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
|
||||
versionType, origin);
|
||||
break;
|
||||
case NO_OP:
|
||||
final Translog.NoOp noOp = (Translog.NoOp) operation;
|
||||
result = markSeqNoAsNoop(noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin);
|
||||
result = markSeqNoAsNoop(engine, noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("No operation defined for [" + operation + "]");
|
||||
|
@ -1304,7 +1311,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
while ((operation = snapshot.next()) != null) {
|
||||
try {
|
||||
logger.trace("[translog] recover op {}", operation);
|
||||
Engine.Result result = applyTranslogOperation(operation, origin);
|
||||
Engine.Result result = applyTranslogOperation(engine, operation, origin);
|
||||
switch (result.getResultType()) {
|
||||
case FAILURE:
|
||||
throw result.getFailure();
|
||||
|
@ -1384,18 +1391,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
|
||||
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
|
||||
trimUnsafeCommits();
|
||||
|
||||
createNewEngine(config);
|
||||
synchronized (mutex) {
|
||||
verifyNotClosed();
|
||||
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
|
||||
// we still give sync'd flush a chance to run:
|
||||
assert currentEngineReference.get() == null : "engine is running";
|
||||
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
|
||||
final Engine newEngine = engineFactory.newReadWriteEngine(config);
|
||||
onNewEngine(newEngine);
|
||||
currentEngineReference.set(newEngine);
|
||||
// We set active because we are now writing operations to the engine; this way,
|
||||
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
|
||||
active.set(true);
|
||||
}
|
||||
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
|
||||
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
|
||||
onSettingsChanged();
|
||||
assertSequenceNumbersInCommit();
|
||||
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
|
||||
}
|
||||
|
||||
private void trimUnsafeCommits() throws IOException {
|
||||
assert currentEngineReference.get() == null : "engine is running";
|
||||
assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running";
|
||||
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
|
||||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
|
||||
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
|
||||
|
@ -2224,31 +2239,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
}
|
||||
|
||||
private Engine createNewEngine(EngineConfig config) {
|
||||
synchronized (mutex) {
|
||||
verifyNotClosed();
|
||||
assert this.currentEngineReference.get() == null;
|
||||
Engine engine = newEngine(config);
|
||||
onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen
|
||||
// inside the callback are not visible. This one enforces happens-before
|
||||
this.currentEngineReference.set(engine);
|
||||
}
|
||||
|
||||
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which
|
||||
// settings changes could possibly have happened, so here we forcefully push any config changes to the new engine:
|
||||
Engine engine = getEngineOrNull();
|
||||
|
||||
// engine could perhaps be null if we were e.g. concurrently closed:
|
||||
if (engine != null) {
|
||||
engine.onSettingsChanged();
|
||||
}
|
||||
return engine;
|
||||
}
|
||||
|
||||
protected Engine newEngine(EngineConfig config) {
|
||||
return engineFactory.newReadWriteEngine(config);
|
||||
}
|
||||
|
||||
private static void persistMetadata(
|
||||
final ShardPath shardPath,
|
||||
final IndexSettings indexSettings,
|
||||
|
@ -2852,14 +2842,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
void resetEngineToGlobalCheckpoint() throws IOException {
|
||||
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
|
||||
sync(); // persist the global checkpoint to disk
|
||||
final long globalCheckpoint = getGlobalCheckpoint();
|
||||
final Engine newEngine;
|
||||
final SeqNoStats seqNoStats = seqNoStats();
|
||||
final TranslogStats translogStats = translogStats();
|
||||
// flush to make sure the latest commit, which will be opened by the read-only engine, includes all operations.
|
||||
flush(new FlushRequest().waitIfOngoing(true));
|
||||
synchronized (mutex) {
|
||||
verifyNotClosed();
|
||||
IOUtils.close(currentEngineReference.getAndSet(null));
|
||||
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
|
||||
final Engine readOnlyEngine = new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity());
|
||||
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
|
||||
}
|
||||
|
||||
Engine newEngine = null;
|
||||
try {
|
||||
final long globalCheckpoint = getGlobalCheckpoint();
|
||||
trimUnsafeCommits();
|
||||
newEngine = createNewEngine(newEngineConfig());
|
||||
active.set(true);
|
||||
synchronized (mutex) {
|
||||
verifyNotClosed();
|
||||
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
|
||||
newEngine = engineFactory.newReadWriteEngine(newEngineConfig());
|
||||
onNewEngine(newEngine);
|
||||
}
|
||||
newEngine.advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint);
|
||||
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
|
||||
|
@ -2867,6 +2869,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
// TODO: add a dedicate recovery stats for the reset translog
|
||||
});
|
||||
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
|
||||
synchronized (mutex) {
|
||||
verifyNotClosed();
|
||||
IOUtils.close(currentEngineReference.getAndSet(newEngine));
|
||||
// We set active because we are now writing operations to the engine; this way,
|
||||
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
|
||||
active.set(true);
|
||||
newEngine = null;
|
||||
}
|
||||
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
|
||||
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
|
||||
onSettingsChanged();
|
||||
} finally {
|
||||
IOUtils.close(newEngine);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.replication;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||
|
@ -37,9 +38,11 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.engine.EngineTestCase;
|
||||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||
import org.elasticsearch.index.engine.InternalEngineTests;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
|
@ -64,13 +67,16 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.both;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.everyItem;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.isIn;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
@ -727,6 +733,64 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
}
|
||||
}
|
||||
|
||||
public void testRollbackOnPromotion() throws Exception {
|
||||
try (ReplicationGroup shards = createGroup(between(2, 3))) {
|
||||
shards.startAll();
|
||||
IndexShard newPrimary = randomFrom(shards.getReplicas());
|
||||
int initDocs = shards.indexDocs(randomInt(100));
|
||||
int inFlightOpsOnNewPrimary = 0;
|
||||
int inFlightOps = scaledRandomIntBetween(10, 200);
|
||||
for (int i = 0; i < inFlightOps; i++) {
|
||||
String id = "extra-" + i;
|
||||
IndexRequest primaryRequest = new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON);
|
||||
BulkShardRequest replicationRequest = indexOnPrimary(primaryRequest, shards.getPrimary());
|
||||
for (IndexShard replica : shards.getReplicas()) {
|
||||
if (randomBoolean()) {
|
||||
indexOnReplica(replicationRequest, shards, replica);
|
||||
if (replica == newPrimary) {
|
||||
inFlightOpsOnNewPrimary++;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
shards.syncGlobalCheckpoint();
|
||||
}
|
||||
if (rarely()) {
|
||||
shards.flush();
|
||||
}
|
||||
}
|
||||
shards.refresh("test");
|
||||
List<DocIdSeqNoAndTerm> docsBelowGlobalCheckpoint = EngineTestCase.getDocIds(getEngine(newPrimary), randomBoolean())
|
||||
.stream().filter(doc -> doc.getSeqNo() <= newPrimary.getGlobalCheckpoint()).collect(Collectors.toList());
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicBoolean done = new AtomicBoolean();
|
||||
Thread thread = new Thread(() -> {
|
||||
List<IndexShard> replicas = new ArrayList<>(shards.getReplicas());
|
||||
replicas.remove(newPrimary);
|
||||
latch.countDown();
|
||||
while (done.get() == false) {
|
||||
try {
|
||||
List<DocIdSeqNoAndTerm> exposedDocs = EngineTestCase.getDocIds(getEngine(randomFrom(replicas)), randomBoolean());
|
||||
assertThat(docsBelowGlobalCheckpoint, everyItem(isIn(exposedDocs)));
|
||||
assertThat(randomFrom(replicas).getLocalCheckpoint(), greaterThanOrEqualTo(initDocs - 1L));
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
// replica swaps engine during rollback
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
latch.await();
|
||||
shards.promoteReplicaToPrimary(newPrimary).get();
|
||||
shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary);
|
||||
int moreDocsAfterRollback = shards.indexDocs(scaledRandomIntBetween(1, 20));
|
||||
shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback);
|
||||
done.set(true);
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
public static class BlockingTarget extends RecoveryTarget {
|
||||
|
||||
private final CountDownLatch recoveryBlocked;
|
||||
|
|
|
@ -83,6 +83,7 @@ import org.elasticsearch.env.NodeEnvironment;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.CommitStats;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.Engine.DeleteResult;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
|
@ -175,13 +176,16 @@ import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
|
|||
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.everyItem;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.isIn;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
@ -906,7 +910,10 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
} else {
|
||||
assertTrue(onResponse.get());
|
||||
assertNull(onFailure.get());
|
||||
assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, equalTo(translogGen + 1));
|
||||
assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration,
|
||||
// if rollback happens we roll translog twice: one when we flush a commit before opening a read-only engine
|
||||
// and one after replaying translog (upto the global checkpoint); otherwise we roll translog once.
|
||||
either(equalTo(translogGen + 1)).or(equalTo(translogGen + 2)));
|
||||
assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
|
||||
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
|
||||
}
|
||||
|
@ -3592,11 +3599,35 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
Set<String> docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream()
|
||||
.filter(id -> Long.parseLong(id) <= globalCheckpoint).collect(Collectors.toSet());
|
||||
TranslogStats translogStats = shard.translogStats();
|
||||
AtomicBoolean done = new AtomicBoolean();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Thread thread = new Thread(() -> {
|
||||
latch.countDown();
|
||||
int hitClosedExceptions = 0;
|
||||
while (done.get() == false) {
|
||||
try {
|
||||
List<String> exposedDocIds = EngineTestCase.getDocIds(getEngine(shard), rarely())
|
||||
.stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toList());
|
||||
assertThat("every operations before the global checkpoint must be reserved",
|
||||
docBelowGlobalCheckpoint, everyItem(isIn(exposedDocIds)));
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
hitClosedExceptions++;
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
// engine reference was switched twice: current read/write engine -> ready-only engine -> new read/write engine
|
||||
assertThat(hitClosedExceptions, lessThanOrEqualTo(2));
|
||||
});
|
||||
thread.start();
|
||||
latch.await();
|
||||
shard.resetEngineToGlobalCheckpoint();
|
||||
assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint));
|
||||
assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint));
|
||||
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));
|
||||
assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(globalCheckpoint));
|
||||
done.set(true);
|
||||
thread.join();
|
||||
closeShard(shard, false);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue