Encapsulate Translog in Engine (#31220)

This removes the abstract `getTranslog` method in `Engine`, instead leaving it
to the abstract implementations of the other methods that use the translog. This
allows future Engines not to have a Translog, as instead they must implement the
methods that use the translog pieces to return necessary values.
This commit is contained in:
Lee Hinman 2018-06-11 09:44:50 -06:00 committed by GitHub
parent 99e04582de
commit c064b507df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 40 deletions

View File

@ -565,18 +565,10 @@ public abstract class Engine implements Closeable {
EXTERNAL, INTERNAL
}
/**
* Returns the translog associated with this engine.
* Prefer to keep the translog package-private, so that an engine can control all accesses to the translog.
*/
abstract Translog getTranslog();
/**
* Checks if the underlying storage sync is required.
*/
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
}
public abstract boolean isTranslogSyncNeeded();
/**
* Ensures that all locations in the given stream have been written to the underlying storage.
@ -585,35 +577,25 @@ public abstract class Engine implements Closeable {
public abstract void syncTranslog() throws IOException;
public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
}
public abstract Closeable acquireTranslogRetentionLock();
/**
* Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#.
* The caller has to close the returned snapshot after finishing the reading.
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}
public abstract Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException;
/**
* Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#.
*/
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
}
public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo);
public TranslogStats getTranslogStats() {
return getTranslog().stats();
}
public abstract TranslogStats getTranslogStats();
/**
* Returns the last location that the translog of this engine has written into.
*/
public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}
public abstract Translog.Location getTranslogLastWriteLocation();
protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
@ -661,9 +643,7 @@ public abstract class Engine implements Closeable {
/**
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)
*/
public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}
public abstract long getLastSyncedGlobalCheckpoint();
/**
* Global stats on segments.
@ -935,9 +915,7 @@ public abstract class Engine implements Closeable {
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public boolean shouldRollTranslogGeneration() {
return getTranslog().shouldRollGeneration();
}
public abstract boolean shouldRollTranslogGeneration();
/**
* Rolls the translog generation and cleans unneeded.

View File

@ -73,8 +73,10 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@ -422,12 +424,17 @@ public class InternalEngine extends Engine {
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
}
@Override
// Package private for testing purposes only
Translog getTranslog() {
ensureOpen();
return translog;
}
@Override
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
}
@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
final boolean synced = translog.ensureSynced(locations);
@ -443,6 +450,31 @@ public class InternalEngine extends Engine {
revisitIndexDeletionPolicyOnTranslogSynced();
}
@Override
public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
}
@Override
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}
@Override
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
}
@Override
public TranslogStats getTranslogStats() {
return getTranslog().stats();
}
@Override
public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
@ -1570,6 +1602,11 @@ public class InternalEngine extends Engine {
}
}
@Override
public boolean shouldRollTranslogGeneration() {
return getTranslog().shouldRollGeneration();
}
@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
@ -2191,6 +2228,11 @@ public class InternalEngine extends Engine {
return localCheckpointTracker;
}
@Override
public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}
@Override
public long getLocalCheckpoint() {
return localCheckpointTracker.getCheckpoint();

View File

@ -731,7 +731,7 @@ public class InternalEngineTests extends EngineTestCase {
super.commitIndexWriter(writer, translog, syncId);
}
};
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs));
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.recoverFromTranslog();
assertTrue(committed.get());
} finally {
@ -758,7 +758,7 @@ public class InternalEngineTests extends EngineTestCase {
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
initialEngine.index(indexForDoc(doc));
if (rarely()) {
initialEngine.getTranslog().rollGeneration();
getTranslog(initialEngine).rollGeneration();
} else if (rarely()) {
initialEngine.flush();
}
@ -3983,14 +3983,14 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint());
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));
// now snapshot the tlog and ensure the primary term is updated
try (Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(recoveringEngine).newSnapshot()) {
assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations());
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
@ -4005,7 +4005,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
if ((flushed = randomBoolean())) {
globalCheckpoint.set(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
recoveringEngine.getTranslog().sync();
getTranslog(recoveringEngine).sync();
recoveringEngine.flush(true, true);
}
}
@ -4018,7 +4018,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(replicaEngine.config());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
if (flushed) {
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
@ -4221,7 +4221,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null)));
if (frequently()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.getTranslog().sync();
engine.syncTranslog();
}
if (frequently()) {
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
@ -4243,7 +4243,7 @@ public class InternalEngineTests extends EngineTestCase {
}
// Make sure we keep all translog operations after the local checkpoint of the safe commit.
long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(localCheckpointFromSafeCommit + 1, docId));
}
}

View File

@ -507,6 +507,8 @@ public abstract class EngineTestCase extends ESTestCase {
* Exposes a translog associated with the given engine for testing purpose.
*/
public static Translog getTranslog(Engine engine) {
return engine.getTranslog();
assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.getTranslog();
}
}