Do not trim unsafe commits when open readonly engine (#41041)

Today we always trim unsafe commits (whose max_seq_no >= global
checkpoint) before starting a read-write or read-only engine. This is
mandatory for read-write engines because they must start with the safe
commit. This is also fine for read-only engines since most of the cases
we should have exactly one commit after closing an index (trimming is a
noop). However, this is dangerous for following indices which might have
more than one commits when they are being closed.

With this change, we move the trimming logic to the ctor of InternalEngine
so we won't trim anything if we are going to open a read-only engine.
This commit is contained in:
Nhat Nguyen 2019-04-11 15:15:00 -04:00
parent f7e590ce0d
commit aa0c957a4a
6 changed files with 80 additions and 89 deletions

View File

@ -78,6 +78,7 @@ import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
@ -87,6 +88,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@ -185,10 +187,10 @@ public class InternalEngine extends Engine {
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
trimUnsafeCommits(engineConfig);
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
@ -2754,4 +2756,13 @@ public class InternalEngine extends Engine {
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}
private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException {
final Store store = engineConfig.getStore();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final Path translogPath = engineConfig.getTranslogConfig().getTranslogPath();
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated());
}
}

View File

@ -30,7 +30,6 @@ import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@ -102,23 +101,8 @@ public class ReadOnlyEngine extends Engine {
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos);
// Before 8.0 the global checkpoint is not known and up to date when the engine is created after
// peer recovery, so we only check the max seq no / global checkpoint coherency when the global
// checkpoint is different from the unassigned sequence number value.
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
final Version indexVersionCreated = engineConfig.getIndexSettings().getIndexVersionCreated();
if (indexVersionCreated.onOrAfter(Version.V_7_1_0) ||
(globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersionCreated.onOrAfter(Version.V_6_7_0))) {
if (seqNoStats.getMaxSeqNo() != globalCheckpoint) {
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint);
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()
+ "] from last commit does not match global checkpoint [" + globalCheckpoint + "]");
}
}
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
}
this.seqNoStats = seqNoStats;
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
@ -138,10 +122,27 @@ public class ReadOnlyEngine extends Engine {
}
}
protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
if (Assertions.ENABLED) {
assert false : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]";
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) {
// Before 8.0 the global checkpoint is not known and up to date when the engine is created after
// peer recovery, so we only check the max seq no / global checkpoint coherency when the global
// checkpoint is different from the unassigned sequence number value.
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
final Version indexVersionCreated = engineConfig.getIndexSettings().getIndexVersionCreated();
if (indexVersionCreated.onOrAfter(Version.V_8_0_0) ||
(seqNoStats.getGlobalCheckpoint() != SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersionCreated.onOrAfter(Version.V_6_7_0))) {
if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) {
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()
+ "] from last commit does not match global checkpoint [" + seqNoStats.getGlobalCheckpoint() + "]");
}
}
assert assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getMaxSeqNo());
}
protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
assert maxSeqNo == globalCheckpoint : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]";
return true;
}
@Override
@ -198,12 +199,12 @@ public class ReadOnlyEngine extends Engine {
}
}
public static SeqNoStats buildSeqNoStats(SegmentInfos infos) {
private static SeqNoStats buildSeqNoStats(EngineConfig config, SegmentInfos infos) {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(infos.userData.entrySet());
long maxSeqNo = seqNoStats.maxSeqNo;
long localCheckpoint = seqNoStats.localCheckpoint;
return new SeqNoStats(maxSeqNo, localCheckpoint, localCheckpoint);
return new SeqNoStats(maxSeqNo, localCheckpoint, config.getGlobalCheckpointSupplier().getAsLong());
}
@Override

View File

@ -1439,7 +1439,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
trimUnsafeCommits();
synchronized (mutex) {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
@ -1458,15 +1457,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}
private void trimUnsafeCommits() throws IOException {
assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running";
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
assertMaxUnsafeAutoIdInCommit();
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, indexSettings.getIndexVersionCreated());
}
private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
@ -1474,11 +1464,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid ["
+ userData.get(Engine.HISTORY_UUID_KEY) + "] is different than engine [" + getHistoryUUID() + "]";
return true;
}
private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
"opening index which was created post 5.5.0 but " + Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID
+ " is not found in commit";
@ -3107,8 +3092,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
Engine newEngine = null;
try {
final long globalCheckpoint = getGlobalCheckpoint();
trimUnsafeCommits();
synchronized (mutex) {
assert currentEngineReference.get() instanceof ReadOnlyEngine : "another write engine is running";
verifyNotClosed();
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
newEngine = engineFactory.newReadWriteEngine(newEngineConfig());

View File

@ -674,7 +674,6 @@ public class InternalEngineTests extends EngineTestCase {
InternalEngine engine = createEngine(store, translog);
engine.close();
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
assertTrue(engine.isRecovering());
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
@ -691,7 +690,6 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
engine.close();
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
@ -726,7 +724,6 @@ public class InternalEngineTests extends EngineTestCase {
} finally {
IOUtils.close(engine);
}
trimUnsafeCommits(engine.config());
try (Engine recoveringEngine = new InternalEngine(engine.config())) {
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@ -755,7 +752,6 @@ public class InternalEngineTests extends EngineTestCase {
Engine recoveringEngine = null;
try {
final AtomicBoolean committed = new AtomicBoolean();
trimUnsafeCommits(initialEngine.config());
recoveringEngine = new InternalEngine(initialEngine.config()) {
@Override
@ -798,7 +794,6 @@ public class InternalEngineTests extends EngineTestCase {
}
}
initialEngine.close();
trimUnsafeCommits(initialEngine.config());
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@ -834,14 +829,12 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
@ -1259,7 +1252,6 @@ public class InternalEngineTests extends EngineTestCase {
UNASSIGNED_SEQ_NO, shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
}
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@ -1280,7 +1272,6 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
EngineConfig config = engine.config();
engine.close();
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@ -2379,7 +2370,6 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(initialEngine);
}
trimUnsafeCommits(initialEngine.engineConfig);
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@ -2724,7 +2714,6 @@ public class InternalEngineTests extends EngineTestCase {
// open and recover tlog
{
for (int i = 0; i < 2; i++) {
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
@ -2751,7 +2740,6 @@ public class InternalEngineTests extends EngineTestCase {
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
@ -2766,7 +2754,6 @@ public class InternalEngineTests extends EngineTestCase {
// open and recover tlog with empty tlog
{
for (int i = 0; i < 2; i++) {
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
@ -2831,7 +2818,6 @@ public class InternalEngineTests extends EngineTestCase {
boolean started = false;
InternalEngine engine = null;
try {
trimUnsafeCommits(config(defaultSettings, store, translogPath, NoMergePolicy.INSTANCE, null));
engine = createEngine(store, translogPath);
started = true;
} catch (EngineException | IOException e) {
@ -2917,7 +2903,6 @@ public class InternalEngineTests extends EngineTestCase {
EngineConfig config = engine.config();
assertVisibleCount(engine, numDocs);
engine.close();
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
@ -2960,7 +2945,6 @@ public class InternalEngineTests extends EngineTestCase {
translogHandler.mappingUpdate = dynamicUpdate();
engine.close();
trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier));
// we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
@ -4093,7 +4077,6 @@ public class InternalEngineTests extends EngineTestCase {
} finally {
IOUtils.close(initialEngine);
}
trimUnsafeCommits(initialEngine.config());
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@ -4199,7 +4182,6 @@ public class InternalEngineTests extends EngineTestCase {
final BiFunction<Long, Long, LocalCheckpointTracker> supplier = (ms, lcp) -> new LocalCheckpointTracker(
maxSeqNo,
localCheckpoint);
trimUnsafeCommits(engine.config());
EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD,
() -> new MatchAllDocsQuery(), engine.config().getMergePolicy()));
noOpEngine = new InternalEngine(noopEngineConfig, supplier) {
@ -4443,7 +4425,6 @@ public class InternalEngineTests extends EngineTestCase {
prevDocs = getDocIds(engine, true);
totalTranslogOps = engine.getTranslog().totalOperations();
}
trimUnsafeCommits(engineConfig);
try (InternalEngine engine = new InternalEngine(engineConfig)) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
@ -4490,7 +4471,6 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(docs - 1, engine.getLocalCheckpoint());
assertEquals(maxSeqIDOnReplica, replicaEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint());
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
@ -4525,7 +4505,6 @@ public class InternalEngineTests extends EngineTestCase {
// now do it again to make sure we preserve values etc.
try {
trimUnsafeCommits(replicaEngine.config());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
if (flushed) {
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
@ -5401,7 +5380,6 @@ public class InternalEngineTests extends EngineTestCase {
} else {
engine.flushAndClose();
}
trimUnsafeCommits(engine.config());
try (InternalEngine recoveringEngine = new InternalEngine(engine.config())) {
assertThat(recoveringEngine.getMinRetainedSeqNo(), equalTo(lastMinRetainedSeqNo));
}
@ -5556,7 +5534,6 @@ public class InternalEngineTests extends EngineTestCase {
engine.syncTranslog();
docs = getDocIds(engine, true);
}
trimUnsafeCommits(config);
Set<Long> seqNosInSafeCommit = null;
for (int i = commits.size() - 1; i >= 0; i--) {
if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) {
@ -5600,7 +5577,6 @@ public class InternalEngineTests extends EngineTestCase {
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder()
.put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)).build());
EngineConfig config = config(softDeletesDisabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
trimUnsafeCommits(config);
try (InternalEngine engine = createEngine(config)) {
assertThat(getDocIds(engine, true), equalTo(docs));
}
@ -5620,15 +5596,6 @@ public class InternalEngineTests extends EngineTestCase {
}
}
static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
}
void assertLuceneOperations(InternalEngine engine, long expectedAppends, long expectedUpdates, long expectedDeletes) {
String message = "Lucene operations mismatched;" +
" appends [actual:" + engine.getNumDocAppends() + ", expected:" + expectedAppends + "]," +

View File

@ -48,9 +48,6 @@ public class ReadOnlyEngineTests extends EngineTestCase {
try (InternalEngine engine = createEngine(config)) {
Engine.Get get = null;
for (int i = 0; i < numDocs; i++) {
if (rarely()) {
continue; // gap in sequence number
}
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
@ -94,7 +91,6 @@ public class ReadOnlyEngineTests extends EngineTestCase {
}
// Close and reopen the main engine
InternalEngineTests.trimUnsafeCommits(config);
try (InternalEngine recoveringEngine = new InternalEngine(config)) {
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@ -117,17 +113,15 @@ public class ReadOnlyEngineTests extends EngineTestCase {
int numDocs = scaledRandomIntBetween(10, 1000);
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
if (rarely()) {
continue; // gap in sequence number
}
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(i);
globalCheckpoint.set(engine.getLocalCheckpoint());
}
globalCheckpoint.set(engine.getLocalCheckpoint());
engine.syncTranslog();
engine.flushAndClose();
readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity());
@ -141,7 +135,6 @@ public class ReadOnlyEngineTests extends EngineTestCase {
public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException {
IOUtils.close(engine, store);
Engine readOnlyEngine = null;
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
@ -159,16 +152,15 @@ public class ReadOnlyEngineTests extends EngineTestCase {
engine.flushAndClose();
IllegalStateException exception = expectThrows(IllegalStateException.class,
() -> new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity()) {
() -> new ReadOnlyEngine(config, null, null, true, Function.identity()) {
@Override
protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
// we don't want the assertion to trip in this test
return true;
}
});
assertThat(exception.getMessage(), equalTo("Maximum sequence number [" + maxSeqNo
+ "] from last commit does not match global checkpoint [" + globalCheckpoint.get() + "]"));
} finally {
IOUtils.close(readOnlyEngine);
}
}
}
@ -219,9 +211,6 @@ public class ReadOnlyEngineTests extends EngineTestCase {
int numDocs = scaledRandomIntBetween(10, 1000);
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
if (rarely()) {
continue; // gap in sequence number
}
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));

View File

@ -86,6 +86,7 @@ import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
@ -154,6 +155,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -3812,4 +3814,40 @@ public class IndexShardTests extends IndexShardTestCase {
indexShard.acquireAllReplicaOperationsPermits(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, timeout);
}
}
public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
final IndexShard shard = newStartedShard(false, Settings.EMPTY, new InternalEngineFactory());
long numDocs = randomLongBetween(1, 20);
long seqNo = 0;
for (long i = 0; i < numDocs; i++) {
if (rarely()) {
seqNo++; // create gaps in sequence numbers
}
shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(shard.shardId.getIndexName(), "_doc", Long.toString(i), new BytesArray("{}"), XContentType.JSON));
shard.updateGlobalCheckpointOnReplica(shard.getLocalCheckpoint(), "test");
if (randomInt(100) < 10) {
shard.flush(new FlushRequest());
}
seqNo++;
}
shard.flush(new FlushRequest());
assertThat(shard.docStats().getCount(), equalTo(numDocs));
final ShardRouting replicaRouting = shard.routingEntry();
ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true,
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE);
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting,
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, false, Function.identity()) {
@Override
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
// just like a following shard, we need to skip this check for now.
}
}
);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
readonlyShard.markAsRecovering("store", new RecoveryState(readonlyShard.routingEntry(), localNode, null));
assertTrue(readonlyShard.recoverFromStore());
assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs));
closeShards(readonlyShard);
}
}