Clean up commits when global checkpoint advanced (#28140)

Today we keep multiple index commits based on the current global 
checkpoint, but only clean up unneeded index commits when we have a new 
index commit. However, we can release the old index commits earlier once
the global checkpoint has advanced enough. This commit makes an engine
revisit the index deletion policy whenever a new global checkpoint value
is persisted and advanced enough.

Relates #10708
This commit is contained in:
Nhat Nguyen 2018-01-18 15:45:06 -05:00 committed by GitHub
parent a6a57a71d3
commit 9db9bd52f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 119 additions and 14 deletions

View File

@ -700,7 +700,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
try {
Translog translog = shard.getTranslog();
if (translog.syncNeeded()) {
translog.sync();
shard.sync();
}
} catch (AlreadyClosedException ex) {
// fine - continue;

View File

@ -47,8 +47,8 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private IndexCommit lastCommit; // the most recent commit point
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
@ -214,6 +214,21 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
return 0;
}
/**
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
*/
boolean hasUnreferencedCommits() throws IOException {
final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
}
return false;
}
/**
* A wrapper of an index commit that prevents it from being deleted.
*/

View File

@ -91,6 +91,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Stream;
public abstract class Engine implements Closeable {
@ -549,6 +550,13 @@ public abstract class Engine implements Closeable {
/** returns the translog for this engine */
public abstract Translog getTranslog();
/**
* Ensures that all locations in the given stream have been written to the underlying storage.
*/
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;
public abstract void syncTranslog() throws IOException;
protected void ensureOpen() {
if (isClosed.get()) {
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());

View File

@ -31,7 +31,6 @@ import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
@ -94,6 +93,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.stream.Stream;
public class InternalEngine extends Engine {
@ -520,6 +520,27 @@ public class InternalEngine extends Engine {
return translog;
}
@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
final boolean synced = translog.ensureSynced(locations);
if (synced) {
revisitIndexDeletionPolicyOnTranslogSynced();
}
return synced;
}
@Override
public void syncTranslog() throws IOException {
translog.sync();
revisitIndexDeletionPolicyOnTranslogSynced();
}
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
}
}
@Override
public String getHistoryUUID() {
return historyUUID;

View File

@ -130,10 +130,9 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
}
private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
final Translog translog = indexShard.getTranslog();
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.getTranslog().sync();
indexShard.getTranslog().getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.sync();
}
}

View File

@ -2316,8 +2316,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
try {
final Engine engine = getEngine();
engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1));
getEngine().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
@ -2342,9 +2341,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
translogSyncProcessor.put(location, syncListener);
}
public final void sync() throws IOException {
public void sync() throws IOException {
verifyNotClosed();
getEngine().getTranslog().sync();
getEngine().syncTranslog();
}
/**

View File

@ -244,6 +244,44 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
}
public void testCheckUnreferencedCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);
long lastTranslogGen = between(1, 50);
for (int i = 0; i < totalCommits; i++) {
lastMaxSeqNo += between(1, 10000);
lastTranslogGen += between(1, 100);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
}
IndexCommit safeCommit = randomFrom(commitList);
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
indexPolicy.onCommit(commitList);
if (safeCommit == commitList.get(commitList.size() - 1)) {
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
} else {
// Advanced but not enough
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), lastMaxSeqNo - 1));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
// Advanced enough
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
indexPolicy.onCommit(commitList);
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
}
}
IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));

View File

@ -4414,4 +4414,29 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1"));
}
}
public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
engine.flush(false, randomBoolean());
List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Global checkpoint advanced but not enough - all commits are kept.
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint() - 1));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
// Global checkpoint advanced enough - only the last commit is kept.
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
}
}
}

View File

@ -657,7 +657,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected void performOnReplica(final GlobalCheckpointSyncAction.Request request, final IndexShard replica) throws IOException {
replica.getTranslog().sync();
replica.sync();
}
}

View File

@ -123,9 +123,9 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
}
if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
verify(translog, never()).sync();
verify(indexShard, never()).sync();
} else {
verify(translog).sync();
verify(indexShard).sync();
}
}