InternalEngine should use global checkpoint when committing the translog

relates to #22212
This commit is contained in:
Boaz Leskes 2016-12-18 08:05:59 +01:00
parent 58d73bae74
commit b78f7bc51d
2 changed files with 7 additions and 5 deletions

View File

@ -188,7 +188,7 @@ public class InternalEngine extends Engine {
seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1);
}
indexWriter = writer;
translog = openTranslog(engineConfig, writer, () -> seqNoService().getLocalCheckpoint());
translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint());
assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);

View File

@ -154,13 +154,12 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -1694,8 +1693,10 @@ public class InternalEngineTests extends ESTestCase {
}
}
replicaLocalCheckpoint =
rarely() ? replicaLocalCheckpoint : randomIntBetween(Math.toIntExact(replicaLocalCheckpoint), Math.toIntExact(primarySeqNo));
if (randomInt(10) < 3) {
// only update rarely as we do it every doc
replicaLocalCheckpoint = randomIntBetween(Math.toIntExact(replicaLocalCheckpoint), Math.toIntExact(primarySeqNo));
}
initialEngine.seqNoService().updateLocalCheckpointForShard("primary", initialEngine.seqNoService().getLocalCheckpoint());
initialEngine.seqNoService().updateLocalCheckpointForShard("replica", replicaLocalCheckpoint);
@ -1707,6 +1708,7 @@ public class InternalEngineTests extends ESTestCase {
}
}
logger.info("localcheckpoint {}, global {}", replicaLocalCheckpoint, primarySeqNo);
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
globalCheckpoint = initialEngine.seqNoService().getGlobalCheckpoint();