Use lastSyncedGlobalCheckpoint in deletion policy (#27826)

Today we use the in-memory global checkpoint from SequenceNumbersService
to clean up unneeded commit points, however the latest global checkpoint
may haven't fsynced to the disk yet. If the translog checkpoint fsync
failed and we already use a higher global checkpoint to clean up commit
points, then we may have removed a safe commit which we try to keep for
recovery.

This commit updates the deletion policy using lastSyncedGlobalCheckpoint
from Translog rather the in memory global checkpoint.

Relates #27606
This commit is contained in:
Nhat Nguyen 2017-12-16 11:03:31 -05:00 committed by GitHub
parent 43ff38c5da
commit 4f62b51c87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 20 deletions

View File

@ -182,8 +182,11 @@ public class InternalEngine extends Engine {
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
logger.trace("recovered [{}]", seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
translog = openTranslog(engineConfig, translogDeletionPolicy, seqNoService::getGlobalCheckpoint);
assert translog.getGeneration() != null;
this.translog = translog;
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint)
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
@ -195,9 +198,6 @@ public class InternalEngine extends Engine {
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService.getGlobalCheckpoint());
assert translog.getGeneration() != null;
this.translog = translog;
updateWriterOnOpen();
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@ -437,12 +437,12 @@ public class InternalEngine extends Engine {
translog.trimUnreferencedReaders();
}
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
assert openMode != null;
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
String translogUUID = null;
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
translogUUID = loadTranslogUUIDFromCommit(writer);
translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
if (translogUUID == null) {
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
@ -492,14 +492,13 @@ public class InternalEngine extends Engine {
}
/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException {
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
final Map<String, String> commitUserData = commitDataAsMap(writer);
private String loadTranslogUUIDFromLastCommit() throws IOException {
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG :
"Only reuse existing translogUUID with OPEN_INDEX_AND_TRANSLOG; openMode = [" + openMode + "]";
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY)) {
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");

View File

@ -168,7 +168,6 @@ import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
@ -4263,29 +4262,40 @@ public class InternalEngineTests extends EngineTestCase {
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomFrom("-1", "512b", "1gb")));
indexSettings.updateIndexMetaData(builder.build());
final Path translogPath = createTempDir();
store = createStore();
try (InternalEngine engine
= createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, seqNoServiceSupplier)) {
final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null);
try (Engine engine = new InternalEngine(engineConfig, seqNoServiceSupplier) {
@Override
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
// Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog
// (this value is visible to the deletion policy) and an in memory global checkpoint in the SequenceNumbersService.
if (rarely()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), seqNoService().getLocalCheckpoint()));
}
super.commitIndexWriter(writer, translog, syncId);
}
}){
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null)));
if (frequently()) {
globalCheckpoint.set(randomIntBetween(
Math.toIntExact(engine.seqNoService().getGlobalCheckpoint()),
Math.toIntExact(engine.seqNoService().getLocalCheckpoint())));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.seqNoService().getLocalCheckpoint()));
engine.getTranslog().sync();
}
if (frequently()) {
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath);
engine.flush(randomBoolean(), true);
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Keep only one safe commit as the oldest commit.
final IndexCommit safeCommit = commits.get(0);
assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
lessThanOrEqualTo(globalCheckpoint.get()));
lessThanOrEqualTo(lastSyncedGlobalCheckpoint));
for (int i = 1; i < commits.size(); i++) {
assertThat(Long.parseLong(commits.get(i).getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThan(globalCheckpoint.get()));
greaterThan(lastSyncedGlobalCheckpoint));
}
// Make sure we keep all translog operations after the local checkpoint of the safe commit.
long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));