Simplify the Translog constructor by always expecting an existing translog (#28676)

Currently the Translog constructor is capable both of opening an existing translog and creating a
new one (deleting existing files). This PR separates these two into separate code paths. The
constructors opens files and a dedicated static methods creates an empty translog.
This commit is contained in:
Boaz Leskes 2018-02-15 09:24:09 +01:00 committed by GitHub
parent fc406c9a5a
commit beb55d148a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 227 additions and 197 deletions

View File

@ -98,7 +98,7 @@ which returns something similar to:
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
"local_checkpoint" : "-1",
"translog_generation" : "1",
"translog_generation" : "2",
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1"

View File

@ -15,7 +15,7 @@ setup:
- do:
indices.stats:
metric: [ translog ]
- set: { indices.test.primaries.translog.size_in_bytes: empty_size }
- set: { indices.test.primaries.translog.size_in_bytes: creation_size }
- do:
index:
@ -27,9 +27,11 @@ setup:
- do:
indices.stats:
metric: [ translog ]
- gt: { indices.test.primaries.translog.size_in_bytes: $empty_size }
- gt: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.operations: 1 }
- gt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size }
# we can't check this yet as creation size will contain two empty translog generations. A single
# non empty generation with one op may be smaller or larger than that.
# - gt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 1 }
- do:
@ -39,9 +41,10 @@ setup:
- do:
indices.stats:
metric: [ translog ]
- gt: { indices.test.primaries.translog.size_in_bytes: $empty_size }
- gt: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.operations: 1 }
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size }
## creation translog size has some overhead due to an initial empty generation that will be trimmed later
- lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
- do:
@ -59,7 +62,8 @@ setup:
- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.size_in_bytes: $empty_size }
## creation translog size has some overhead due to an initial empty generation that will be trimmed later
- lte: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.operations: 0 }
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size }
- lte: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

View File

@ -480,13 +480,18 @@ public class InternalEngine extends Engine {
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
assert openMode != null;
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
String translogUUID = null;
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
if (translogUUID == null) {
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
final String translogUUID;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
case OPEN_INDEX_CREATE_TRANSLOG:
translogUUID =
Translog.createEmptyTranslog(translogConfig.getTranslogPath(), globalCheckpointSupplier.getAsLong(), shardId);
break;
case OPEN_INDEX_AND_TRANSLOG:
translogUUID = loadTranslogUUIDFromLastCommit();
break;
default:
throw new AssertionError("Unknown openMode " + openMode);
}
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings;
@ -45,6 +46,7 @@ import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.io.Closeable;
import java.io.EOFException;
@ -132,23 +134,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* translog file referenced by this generation. The translog creation will fail if this generation can't be opened.
*
* @param config the configuration of this translog
* @param expectedTranslogUUID the translog uuid to open, null for a new translog
* @param translogUUID the translog uuid to open, null for a new translog
* @param deletionPolicy an instance of {@link TranslogDeletionPolicy} that controls when a translog file can be safely
* deleted
* @param globalCheckpointSupplier a supplier for the global checkpoint
*/
public Translog(
final TranslogConfig config, final String expectedTranslogUUID, TranslogDeletionPolicy deletionPolicy,
final TranslogConfig config, final String translogUUID, TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier) throws IOException {
super(config.getShardId(), config.getIndexSettings());
this.config = config;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.deletionPolicy = deletionPolicy;
if (expectedTranslogUUID == null) {
translogUUID = UUIDs.randomBase64UUID();
} else {
translogUUID = expectedTranslogUUID;
}
this.translogUUID = translogUUID;
bigArrays = config.getBigArrays();
ReadWriteLock rwl = new ReentrantReadWriteLock();
readLock = new ReleasableLock(rwl.readLock());
@ -157,53 +155,38 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Files.createDirectories(this.location);
try {
if (expectedTranslogUUID != null) {
final Checkpoint checkpoint = readCheckpoint(location);
final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1));
final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
// this is special handling for error condition when we create a new writer but we fail to bake
// the newly written file (generation+1) into the checkpoint. This is still a valid state
// we just need to cleanup before we continue
// we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this:
// https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example
//
// For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists
// if not we don't even try to clean it up and wait until we fail creating it
assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(expectedTranslogUUID) : "unexpected translog file: [" + nextTranslogFile + "]";
if (Files.exists(currentCheckpointFile) // current checkpoint is already copied
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName());
final Checkpoint checkpoint = readCheckpoint(location);
final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1));
final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
// this is special handling for error condition when we create a new writer but we fail to bake
// the newly written file (generation+1) into the checkpoint. This is still a valid state
// we just need to cleanup before we continue
// we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this:
// https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example
//
// For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists
// if not we don't even try to clean it up and wait until we fail creating it
assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]";
if (Files.exists(currentCheckpointFile) // current checkpoint is already copied
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName());
}
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
}
boolean success = false;
current = null;
try {
current = createWriter(checkpoint.generation + 1, getMinFileGeneration(), checkpoint.globalCheckpoint);
success = true;
} finally {
// we have to close all the recovered ones otherwise we leak file handles here
// for instance if we have a lot of tlog and we can't create the writer we keep on holding
// on to all the uncommitted tlog files if we don't close
if (success == false) {
IOUtils.closeWhileHandlingException(readers);
}
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
}
boolean success = false;
current = null;
try {
current = createWriter(checkpoint.generation + 1, getMinFileGeneration(), checkpoint.globalCheckpoint);
success = true;
} finally {
// we have to close all the recovered ones otherwise we leak file handles here
// for instance if we have a lot of tlog and we can't create the writer we keep on holding
// on to all the uncommitted tlog files if we don't close
if (success == false) {
IOUtils.closeWhileHandlingException(readers);
}
}
} else {
IOUtils.rm(location);
// start from whatever generation lucene points to
final long generation = deletionPolicy.getMinTranslogGenerationForRecovery();
logger.debug("wipe translog location - creating new translog, starting generation [{}]", generation);
Files.createDirectories(location);
final long initialGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, generation);
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
IOUtils.fsync(checkpointFile, false);
current = createWriter(generation, generation, initialGlobalCheckpoint);
readers.clear();
}
} catch (Exception e) {
// close the opened translog files if we fail to create a new translog...
@ -409,9 +392,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
.filter(r -> r.getGeneration() >= minGeneration)
.mapToInt(BaseTranslogReader::totalOperations)
.sum();
.filter(r -> r.getGeneration() >= minGeneration)
.mapToInt(BaseTranslogReader::totalOperations)
.sum();
}
}
@ -432,9 +415,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
.filter(r -> r.getGeneration() >= minGeneration)
.mapToLong(BaseTranslogReader::sizeInBytes)
.sum();
.filter(r -> r.getGeneration() >= minGeneration)
.mapToLong(BaseTranslogReader::sizeInBytes)
.sum();
}
}
@ -588,8 +571,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
"Min referenced generation is [" + getMinFileGeneration() + "]");
}
TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> reader.getGeneration() >= minGeneration)
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
.filter(reader -> reader.getGeneration() >= minGeneration)
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
return newMultiSnapshot(snapshots);
}
}
@ -626,8 +609,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
return Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> {
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
@ -1113,14 +1096,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Index index = (Index) o;
if (version != index.version ||
seqNo != index.seqNo ||
primaryTerm != index.primaryTerm ||
id.equals(index.id) == false ||
type.equals(index.type) == false ||
versionType != index.versionType ||
autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp ||
source.equals(index.source) == false) {
return false;
seqNo != index.seqNo ||
primaryTerm != index.primaryTerm ||
id.equals(index.id) == false ||
type.equals(index.type) == false ||
versionType != index.versionType ||
autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp ||
source.equals(index.source) == false) {
return false;
}
if (routing != null ? !routing.equals(index.routing) : index.routing != null) {
return false;
@ -1293,10 +1276,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Delete delete = (Delete) o;
return version == delete.version &&
seqNo == delete.seqNo &&
primaryTerm == delete.primaryTerm &&
uid.equals(delete.uid) &&
versionType == delete.versionType;
seqNo == delete.seqNo &&
primaryTerm == delete.primaryTerm &&
uid.equals(delete.uid) &&
versionType == delete.versionType;
}
@Override
@ -1421,7 +1404,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
long readChecksum = in.readInt() & 0xFFFF_FFFFL;
if (readChecksum != expectedChecksum) {
throw new TranslogCorruptedException("translog stream is corrupted, expected: 0x" +
Long.toHexString(expectedChecksum) + ", got: 0x" + Long.toHexString(readChecksum));
Long.toHexString(expectedChecksum) + ", got: 0x" + Long.toHexString(readChecksum));
}
}
@ -1543,7 +1526,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == current.getGeneration();
final Path generationCheckpoint =
location.resolve(getCommitCheckpointFileName(current.getGeneration()));
location.resolve(getCommitCheckpointFileName(current.getGeneration()));
Files.copy(checkpoint, generationCheckpoint);
IOUtils.fsync(generationCheckpoint, false);
IOUtils.fsync(generationCheckpoint.getParent(), true);
@ -1728,4 +1711,26 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
List<TranslogReader> getReaders() {
return readers;
}
public static String createEmptyTranslog(final Path location, final long initialGlobalCheckpoint, final ShardId shardId)
throws IOException {
final ChannelFactory channelFactory = FileChannel::open;
return createEmptyTranslog(location, initialGlobalCheckpoint, shardId, channelFactory);
}
static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId, ChannelFactory channelFactory) throws IOException {
IOUtils.rm(location);
Files.createDirectories(location);
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1);
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
IOUtils.fsync(checkpointFile, false);
final String translogUUID = UUIDs.randomBase64UUID();
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory,
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }
);
writer.close();
return translogUUID;
}
}

View File

@ -109,8 +109,8 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
@ -1021,25 +1021,25 @@ public class InternalEngineTests extends EngineTestCase {
}
engine.flush();
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 2L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(2L));
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 2L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L));
engine.flush();
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 2L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(2L));
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 2L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L));
engine.flush(true, true);
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L));
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 2L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.flush(true, true);
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(4L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L));
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(5L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(5L));
}
public void testSyncedFlush() throws IOException {
@ -2611,9 +2611,11 @@ public class InternalEngineTests extends EngineTestCase {
Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
engine.close();
final Path badTranslogLog = createTempDir();
final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId);
Translog translog = new Translog(
new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
badUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED);
translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close();
@ -2835,7 +2837,7 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
@ -2846,14 +2848,14 @@ public class InternalEngineTests extends EngineTestCase {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
} else {
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals("4", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals("4", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
@ -2862,10 +2864,10 @@ public class InternalEngineTests extends EngineTestCase {
{
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
assertEquals(1, engine.getTranslog().currentFileGeneration());
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().uncommittedOperations());
}
}
@ -2875,11 +2877,11 @@ public class InternalEngineTests extends EngineTestCase {
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals("no changes - nothing to commit", "2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
@ -4421,7 +4423,7 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(lastCommit.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
// Translog tags should be fresh.
assertThat(userData.get(Translog.TRANSLOG_UUID_KEY), not(equalTo(lastCommit.get(Translog.TRANSLOG_UUID_KEY))));
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1"));
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("2"));
}
}

View File

@ -147,11 +147,19 @@ public class TranslogTests extends ESTestCase {
}
protected Translog createTranslog(TranslogConfig config, String translogUUID) throws IOException {
protected Translog createTranslog(TranslogConfig config) throws IOException {
String translogUUID =
Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()),
() -> SequenceNumbers.UNASSIGNED_SEQ_NO);
() -> SequenceNumbers.NO_OPS_PERFORMED);
}
protected Translog openTranslog(TranslogConfig config, String translogUUID) throws IOException {
return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()),
() -> SequenceNumbers.NO_OPS_PERFORMED);
}
private void markCurrentGenAsCommitted(Translog translog) throws IOException {
long genToCommit = translog.currentFileGeneration();
long genToRetain = randomLongBetween(translog.getDeletionPolicy().getMinTranslogGenerationForRecovery(), genToCommit);
@ -194,10 +202,11 @@ public class TranslogTests extends ESTestCase {
}
private Translog create(Path path) throws IOException {
globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final TranslogConfig translogConfig = getTranslogConfig(path);
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
return new Translog(translogConfig, null, deletionPolicy, () -> globalCheckpoint.get());
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId);
return new Translog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get());
}
private TranslogConfig getTranslogConfig(final Path path) {
@ -220,7 +229,7 @@ public class TranslogTests extends ESTestCase {
}
final IndexSettings indexSettings =
IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
}
@ -372,39 +381,39 @@ public class TranslogTests extends ESTestCase {
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(1));
assertThat(stats.getTranslogSizeInBytes(), equalTo(97L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(140L));
assertThat(stats.getUncommittedOperations(), equalTo(1));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(97L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(140L));
}
translog.add(new Translog.Delete("test", "2", 1, newUid("2")));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(2));
assertThat(stats.getTranslogSizeInBytes(), equalTo(146L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(189L));
assertThat(stats.getUncommittedOperations(), equalTo(2));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(146L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(189L));
}
translog.add(new Translog.Delete("test", "3", 2, newUid("3")));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(3));
assertThat(stats.getTranslogSizeInBytes(), equalTo(195L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(238L));
assertThat(stats.getUncommittedOperations(), equalTo(3));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(195L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(238L));
}
translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16)));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
assertThat(stats.getTranslogSizeInBytes(), equalTo(237L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(280L));
assertThat(stats.getUncommittedOperations(), equalTo(4));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(237L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(280L));
}
final long expectedSizeInBytes = 280L;
final long expectedSizeInBytes = 323L;
translog.rollGeneration();
{
final TranslogStats stats = stats();
@ -521,7 +530,7 @@ public class TranslogTests extends ESTestCase {
}
try (Translog.Snapshot snapshot = translog.newSnapshot();
Translog.Snapshot snapshot1 = translog.newSnapshot()) {
Translog.Snapshot snapshot1 = translog.newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(1));
@ -1235,15 +1244,15 @@ public class TranslogTests extends ESTestCase {
translog.close();
if (translogGeneration == null) {
translog = createTranslog(config, null);
translog = createTranslog(config);
assertEquals(0, translog.stats().estimatedNumberOfOperations());
assertEquals(1, translog.currentFileGeneration());
assertEquals(2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try(Translog.Snapshot snapshot = translog.newSnapshot()) {
assertNull(snapshot.next());
}
} else {
translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED);
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration.translogFileGeneration)) {
@ -1269,7 +1278,8 @@ public class TranslogTests extends ESTestCase {
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.rollGeneration();
assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration);
assertEquals("expected this to be the first roll (1 gen is on creation, 2 when opened)",
2L, translogGeneration.translogFileGeneration);
assertNotNull(translogGeneration.translogUUID);
}
}
@ -1281,7 +1291,7 @@ public class TranslogTests extends ESTestCase {
TranslogConfig config = translog.getConfig();
final String translogUUID = translog.getTranslogUUID();
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
@ -1295,9 +1305,10 @@ public class TranslogTests extends ESTestCase {
}
}
if (randomBoolean()) { // recover twice
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice",
translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
int upTo = sync ? translogOperations : prepareOp;
@ -1323,7 +1334,8 @@ public class TranslogTests extends ESTestCase {
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.rollGeneration();
assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration);
assertEquals("expected this to be the first roll (1 gen is on creation, 2 when opened)",
2L, translogGeneration.translogFileGeneration);
assertNotNull(translogGeneration.translogUUID);
}
}
@ -1339,7 +1351,7 @@ public class TranslogTests extends ESTestCase {
final String translogUUID = translog.getTranslogUUID();
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
@ -1354,9 +1366,10 @@ public class TranslogTests extends ESTestCase {
}
if (randomBoolean()) { // recover twice
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice",
translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
int upTo = sync ? translogOperations : prepareOp;
@ -1381,7 +1394,8 @@ public class TranslogTests extends ESTestCase {
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.rollGeneration();
assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration);
assertEquals("expected this to be the first roll (1 gen is on creation, 2 when opened)",
2L, translogGeneration.translogFileGeneration);
assertNotNull(translogGeneration.translogUUID);
}
}
@ -1391,19 +1405,19 @@ public class TranslogTests extends ESTestCase {
TranslogConfig config = translog.getConfig();
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbers.NO_OPS_PERFORMED, 0);
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
final String translogUUID = translog.getTranslogUUID();
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " +
"numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " +
"generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2, minTranslogGeneration=0}", ex.getMessage());
assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " +
"numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " +
"generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0}", ex.getMessage());
}
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
@ -1480,12 +1494,12 @@ public class TranslogTests extends ESTestCase {
final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1,
translogGeneration.translogUUID.length());
try {
new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED);
fail("translog doesn't belong to this UUID");
} catch (TranslogCorruptedException ex) {
}
this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED);
try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration.translogFileGeneration)) {
for (int i = firstUncommitted; i < translogOperations; i++) {
Translog.Operation next = snapshot.next();
@ -1671,7 +1685,7 @@ public class TranslogTests extends ESTestCase {
translog.close(); // we are closed
final String translogUUID = translog.getTranslogUUID();
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
assertFalse(tlog.syncNeeded());
@ -1807,7 +1821,7 @@ public class TranslogTests extends ESTestCase {
}
}
try (Translog tlog =
new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED);
Translog.Snapshot snapshot = tlog.newSnapshot()) {
if (writtenOperations.size() != snapshot.totalOperations()) {
for (int i = 0; i < threadCount; i++) {
@ -1853,7 +1867,7 @@ public class TranslogTests extends ESTestCase {
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED);
assertThat(translog.getMinFileGeneration(), equalTo(1L));
// no trimming done yet, just recovered
for (long gen = 1; gen < translog.currentFileGeneration(); gen++) {
@ -1909,7 +1923,7 @@ public class TranslogTests extends ESTestCase {
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
// we don't know when things broke exactly
assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L));
assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration));
@ -1957,25 +1971,28 @@ public class TranslogTests extends ESTestCase {
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites,
final boolean throwUnknownException, String translogUUID,
final TranslogDeletionPolicy deletionPolicy) throws IOException {
return new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO) {
final ChannelFactory channelFactory = (file, openOption) -> {
FileChannel channel = FileChannel.open(file, openOption);
boolean success = false;
try {
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : partialWrites, throwUnknownException, channel);
success = true;
return throwingFileChannel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channel);
}
}
};
if (translogUUID == null) {
translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory);
}
return new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED) {
@Override
ChannelFactory getChannelFactory() {
final ChannelFactory factory = super.getChannelFactory();
return (file, openOption) -> {
FileChannel channel = factory.open(file, openOption);
boolean success = false;
try {
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : partialWrites, throwUnknownException, channel);
success = true;
return throwingFileChannel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channel);
}
}
};
return channelFactory;
}
@Override
@ -2079,11 +2096,11 @@ public class TranslogTests extends ESTestCase {
public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException {
Path tempDir = createTempDir();
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = createTranslog(config, null);
Translog translog = createTranslog(config);
translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8"))));
translog.close();
try {
new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO) {
new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED) {
@Override
protected TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint)
throws IOException {
@ -2106,7 +2123,7 @@ public class TranslogTests extends ESTestCase {
Checkpoint read = Checkpoint.read(ckp);
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
try (Translog tlog = createTranslog(config, translog.getTranslogUUID());
try (Translog tlog = openTranslog(config, translog.getTranslogUUID());
Translog.Snapshot snapshot = tlog.newSnapshot()) {
assertFalse(tlog.syncNeeded());
@ -2117,7 +2134,7 @@ public class TranslogTests extends ESTestCase {
tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(2).getBytes(Charset.forName("UTF-8"))));
}
try (Translog tlog = createTranslog(config, translog.getTranslogUUID());
try (Translog tlog = openTranslog(config, translog.getTranslogUUID());
Translog.Snapshot snapshot = tlog.newSnapshot()) {
assertFalse(tlog.syncNeeded());
@ -2141,7 +2158,7 @@ public class TranslogTests extends ESTestCase {
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
try {
Translog tlog = new Translog(config, translog.getTranslogUUID(), translog.getDeletionPolicy(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
Translog tlog = new Translog(config, translog.getTranslogUUID(), translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED);
fail("file already exists?");
} catch (TranslogException ex) {
// all is well
@ -2163,7 +2180,7 @@ public class TranslogTests extends ESTestCase {
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
// we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) {
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
for (int i = 0; i < 1; i++) {
@ -2176,7 +2193,7 @@ public class TranslogTests extends ESTestCase {
}
try {
Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED);
fail("file already exists?");
} catch (TranslogException ex) {
// all is well
@ -2282,7 +2299,11 @@ public class TranslogTests extends ESTestCase {
TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy();
deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery);
deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
if (generationUUID == null) {
// we never managed to successfully create a translog, make it
generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
}
try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED);
Translog.Snapshot snapshot = translog.newSnapshotFromGen(minGenForRecovery)) {
assertEquals(syncedDocs.size(), snapshot.totalOperations());
for (int i = 0; i < syncedDocs.size(); i++) {
@ -2347,14 +2368,14 @@ public class TranslogTests extends ESTestCase {
final String translogUUID = translog.getTranslogUUID();
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings());
translog.close();
translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED);
translog.add(new Translog.Index("test", "2", 1, new byte[]{2}));
translog.rollGeneration();
Closeable lock = translog.acquireRetentionLock();
translog.add(new Translog.Index("test", "3", 2, new byte[]{3}));
translog.close();
IOUtils.close(lock);
translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED);
}
public static Translog.Location randomTranslogLocation() {
@ -2382,7 +2403,7 @@ public class TranslogTests extends ESTestCase {
null);
Engine.Index eIndex = new Engine.Index(newUid(doc), doc, randomSeqNum, randomPrimaryTerm,
1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false);
1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false);
Engine.IndexResult eIndexResult = new Engine.IndexResult(1, randomSeqNum, true);
Translog.Index index = new Translog.Index(eIndex, eIndexResult);
@ -2393,7 +2414,7 @@ public class TranslogTests extends ESTestCase {
assertEquals(index, serializedIndex);
Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm,
2, VersionType.INTERNAL, Origin.PRIMARY, 0);
2, VersionType.INTERNAL, Origin.PRIMARY, 0);
Engine.DeleteResult eDeleteResult = new Engine.DeleteResult(2, randomSeqNum, true);
Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
@ -44,7 +43,6 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import java.util.HashMap;
import java.util.List;
@ -52,7 +50,6 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -186,7 +183,6 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
shards.indexDocs(nonFlushedDocs);
IndexShard replica = shards.getReplicas().get(0);
final String translogUUID = replica.getTranslog().getTranslogUUID();
final String historyUUID = replica.getHistoryUUID();
Translog.TranslogGeneration translogGeneration = replica.getTranslog().getGeneration();
shards.removeReplica(replica);
@ -204,13 +200,8 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
final String historyUUIDtoUse = UUIDs.randomBase64UUID(random());
if (randomBoolean()) {
// create a new translog
final TranslogConfig translogConfig =
new TranslogConfig(replica.shardId(), replica.shardPath().resolveTranslog(), replica.indexSettings(),
BigArrays.NON_RECYCLING_INSTANCE);
try (Translog translog = new Translog(translogConfig, null, createTranslogDeletionPolicy(), () -> flushedDocs)) {
translogUUIDtoUse = translog.getTranslogUUID();
translogGenToUse = translog.currentFileGeneration();
}
translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs, replica.shardId());
translogGenToUse = 1;
} else {
translogUUIDtoUse = translogGeneration.translogUUID;
translogGenToUse = translogGeneration.translogFileGeneration;

View File

@ -60,8 +60,8 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
@ -253,7 +253,9 @@ public abstract class EngineTestCase extends ESTestCase {
protected Translog createTranslog(Path translogPath) throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
final String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId);
return new Translog(translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS),
() -> SequenceNumbers.NO_OPS_PERFORMED);
}
protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {