Introduce translog size and age based retention policies (#25147)

This PR extends the TranslogDeletionPolicy to allow keeping the translog files longer than what is needed for recovery from lucene. Specifically, we allow specifying the total size of the files and their maximum age (i.e., keep up to 512MB but no longer than 12 hours). This will allow making ops based recoveries more common. 

Note that the default size and age still set to 0, maintaining current behavior. This is needed as the other components in the system are not yet ready for a longer translog retention. I will adapt those in follow up PRs.

Relates to #10708
This commit is contained in:
Boaz Leskes 2017-06-16 09:09:51 +02:00 committed by GitHub
parent 50db8cb351
commit 9ddea539f5
12 changed files with 418 additions and 52 deletions

View File

@ -127,6 +127,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,

View File

@ -37,6 +37,7 @@ import org.elasticsearch.node.Node;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
/**
@ -111,6 +112,24 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);
/**
* Controls how long translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. A longer retention policy is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic,
Property.IndexScope);
/**
* Controls how many translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);
/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
@ -168,6 +187,8 @@ public final class IndexSettings {
private final TimeValue syncInterval;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize;
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
@ -265,6 +286,8 @@ public final class IndexSettings {
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING);
translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
@ -302,6 +325,8 @@ public final class IndexSettings {
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
@ -311,6 +336,14 @@ public final class IndexSettings {
this.flushThresholdSize = byteSizeValue;
}
private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
this.translogRetentionSize = byteSizeValue;
}
private void setTranslogRetentionAge(TimeValue age) {
this.translogRetentionAge = age;
}
private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}
@ -469,6 +502,16 @@ public final class IndexSettings {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }
/**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; }
/**
* Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept around
*/
public TimeValue getTranslogRetentionAge() { return translogRetentionAge; }
/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from

View File

@ -150,7 +150,10 @@ public class InternalEngine extends Engine {
}
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.versionMap = new LiveVersionMap();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
@ -1854,6 +1857,10 @@ public class InternalEngine extends Engine {
// the setting will be re-interpreted if it's set to true
this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final IndexSettings indexSettings = engineConfig.getIndexSettings();
translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());
}
public MergeStats getMergeStats() {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
/**
@ -121,4 +122,8 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
public Path path() {
return path;
}
public long getLastModifiedTime() throws IOException {
return Files.getLastModifiedTime(path).toMillis();
}
}

View File

@ -1522,7 +1522,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = deletionPolicy.minTranslogGenRequired();
long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current);
assert minReferencedGen >= getMinFileGeneration() :
"deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is ["
+ getMinFileGeneration() + "]";

View File

@ -21,13 +21,17 @@ package org.elasticsearch.index.translog;
import org.apache.lucene.util.Counter;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TranslogDeletionPolicy {
/** Records how many views are held against each
* translog generation */
/**
* Records how many views are held against each
* translog generation
*/
private final Map<Long, Counter> translogRefCounts = new HashMap<>();
/**
@ -36,14 +40,31 @@ public class TranslogDeletionPolicy {
*/
private long minTranslogGenerationForRecovery = 1;
private long retentionSizeInBytes;
private long retentionAgeInMillis;
public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
}
public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
if (newGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
minTranslogGenerationForRecovery+ "]");
minTranslogGenerationForRecovery + "]");
}
minTranslogGenerationForRecovery = newGen;
}
public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}
public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}
/**
* acquires the basis generation for a new view. Any translog generation above, and including, the returned generation
* will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called.
@ -74,10 +95,59 @@ public class TranslogDeletionPolicy {
/**
* returns the minimum translog generation that is still required by the system. Any generation below
* the returned value may be safely deleted
*
* @param readers current translog readers
* @param writer current translog writer
*/
synchronized long minTranslogGenRequired() {
long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
return Math.min(viewRefs, minTranslogGenerationForRecovery);
synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByView = getMinTranslogGenRequiredByViews();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
minByAgeAndSize = Long.MAX_VALUE;
} else {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery));
}
static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
if (retentionSizeInBytes >= 0) {
long totalSize = writer.sizeInBytes();
long minGen = writer.getGeneration();
for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) {
final TranslogReader reader = readers.get(i);
totalSize += reader.sizeInBytes();
minGen = reader.getGeneration();
}
return minGen;
} else {
return Long.MIN_VALUE;
}
}
static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
throws IOException {
if (maxRetentionAgeInMillis >= 0) {
for (TranslogReader reader: readers) {
if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) {
return reader.getGeneration();
}
}
return writer.getGeneration();
} else {
return Long.MIN_VALUE;
}
}
protected long currentTime() {
return System.currentTimeMillis();
}
private long getMinTranslogGenRequiredByViews() {
return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
}
/** returns the translog generation that will be used as a basis of a future store/peer recovery */

View File

@ -116,7 +116,7 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
case TranslogWriter.VERSION_CHECKPOINTS:
assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path;
assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps;
assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps;
assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint;
int len = headerStream.readInt();
if (len > channel.size()) {
@ -130,8 +130,8 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref +
" this translog file belongs to a different translog. path:" + path);
}
final long firstOperationOffset =
ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES;
final long firstOperationOffset;
firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES;
return new TranslogReader(checkpoint, channel, path, firstOperationOffset);
default:

View File

@ -88,6 +88,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
final ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException {
super(initialCheckpoint.generation, channel, path, channel.position());
assert initialCheckpoint.offset == channel.position() :
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion ["
+ channel.position() + "]";
this.shardId = shardId;
this.channelFactory = channelFactory;
this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
@ -116,18 +119,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
out.writeBytes(ref.bytes, ref.offset, ref.length);
}
public static TranslogWriter create(
ShardId shardId,
String translogUUID,
long fileGeneration,
Path file,
ChannelFactory channelFactory,
ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier,
final long initialMinTranslogGen,
final LongSupplier minTranslogGenerationSupplier) throws IOException {
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory,
ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier,
final long initialMinTranslogGen, final LongSupplier minTranslogGenerationSupplier)
throws IOException {
final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = getHeaderLength(ref.length);
final int firstOperationOffset = getHeaderLength(ref.length);
final FileChannel channel = channelFactory.open(file);
try {
// This OutputStreamDataOutput is intentionally not closed because
@ -135,12 +132,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel));
writeHeader(out, ref);
channel.force(true);
final Checkpoint checkpoint =
Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(),
initialMinTranslogGen);
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration,
globalCheckpointSupplier.getAsLong(), initialMinTranslogGen);
writeCheckpoint(channelFactory, file.getParent(), checkpoint);
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier,
minTranslogGenerationSupplier);
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize,
globalCheckpointSupplier, minTranslogGenerationSupplier);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -39,7 +40,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testPassThrough() throws IOException {
SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class);
CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(),
CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(),
EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
List<IndexCommit> commitList = new ArrayList<>();
long count = randomIntBetween(1, 3);

View File

@ -180,6 +180,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
@ -336,7 +337,7 @@ public class InternalEngineTests extends ESTestCase {
protected Translog createTranslog(Path translogPath) throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
return new Translog(translogConfig, null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
}
protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
@ -2795,7 +2796,7 @@ public class InternalEngineTests extends ESTestCase {
Translog translog = new Translog(
new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close();

View File

@ -0,0 +1,214 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.mockito.Mockito;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class TranslogDeletionPolicyTests extends ESTestCase {
public static TranslogDeletionPolicy createTranslogDeletionPolicy() {
return new TranslogDeletionPolicy(
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis()
);
}
public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) {
return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis());
}
public void testNoRetention() throws IOException {
long now = System.currentTimeMillis();
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0);
assertMinGenRequired(deletionPolicy, readersAndWriter, 1L);
final int committedReader = randomIntBetween(0, allGens.size() - 1);
final long committedGen = allGens.get(committedReader).generation;
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen);
} finally {
IOUtils.close(readersAndWriter.v1());
IOUtils.close(readersAndWriter.v2());
}
}
public void testBytesRetention() throws IOException {
long now = System.currentTimeMillis();
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
final int selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGeneration = allGens.get(selectedReader).generation;
long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get();
assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), size),
equalTo(selectedGeneration));
assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), -1),
equalTo(Long.MIN_VALUE));
} finally {
IOUtils.close(readersAndWriter.v1());
IOUtils.close(readersAndWriter.v2());
}
}
public void testAgeRetention() throws IOException {
long now = System.currentTimeMillis();
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
final int selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGeneration = allGens.get(selectedReader).generation;
long maxAge = now - allGens.get(selectedReader).getLastModifiedTime();
assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), maxAge, now),
equalTo(selectedGeneration));
assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), -1, now),
equalTo(Long.MIN_VALUE));
} finally {
IOUtils.close(readersAndWriter.v1());
IOUtils.close(readersAndWriter.v2());
}
}
/**
* Tests that age trumps size but recovery trumps both.
*/
public void testRetentionHierarchy() throws IOException {
long now = System.currentTimeMillis();
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE);
deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE);
int selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGenerationByAge = allGens.get(selectedReader).generation;
long maxAge = now - allGens.get(selectedReader).getLastModifiedTime();
selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGenerationBySize = allGens.get(selectedReader).generation;
long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get();
selectedReader = randomIntBetween(0, allGens.size() - 1);
long committedGen = allGens.get(selectedReader).generation;
deletionPolicy.setRetentionAgeInMillis(maxAge);
deletionPolicy.setRetentionSizeInBytes(size);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize));
// make a new policy as committed gen can't go backwards (for now)
deletionPolicy = new MockDeletionPolicy(now, size, maxAge);
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
assertMinGenRequired(deletionPolicy, readersAndWriter,
Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize)));
long viewGen = deletionPolicy.acquireTranslogGenForView();
selectedReader = randomIntBetween(selectedReader, allGens.size() - 1);
committedGen = allGens.get(selectedReader).generation;
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
assertMinGenRequired(deletionPolicy, readersAndWriter,
Math.min(
Math.min(committedGen, viewGen),
Math.max(selectedGenerationByAge, selectedGenerationBySize)));
// disable age
deletionPolicy.setRetentionAgeInMillis(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize));
// disable size
deletionPolicy.setRetentionAgeInMillis(maxAge);
deletionPolicy.setRetentionSizeInBytes(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge));
// disable both
deletionPolicy.setRetentionAgeInMillis(-1);
deletionPolicy.setRetentionSizeInBytes(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen));
} finally {
IOUtils.close(readersAndWriter.v1());
IOUtils.close(readersAndWriter.v2());
}
}
private void assertMinGenRequired(TranslogDeletionPolicy deletionPolicy, Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter,
long expectedGen) throws IOException {
assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(expectedGen));
}
private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter(final long now) throws IOException {
final Path tempDir = createTempDir();
Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME));
TranslogWriter writer = null;
List<TranslogReader> readers = new ArrayList<>();
final int numberOfReaders = randomIntBetween(0, 10);
for (long gen = 1; gen <= numberOfReaders + 1; gen++) {
if (writer != null) {
final TranslogReader reader = Mockito.spy(writer.closeIntoReader());
Mockito.doReturn(writer.getLastModifiedTime()).when(reader).getLastModifiedTime();
readers.add(reader);
}
writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen,
tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L
);
writer = Mockito.spy(writer);
Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime();
byte[] bytes = new byte[4];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
for (int ops = randomIntBetween(0, 20); ops > 0; ops--) {
out.reset(bytes);
out.writeInt(ops);
writer.add(new BytesArray(bytes), ops);
}
}
return new Tuple<>(readers, writer);
}
private static class MockDeletionPolicy extends TranslogDeletionPolicy {
long now;
MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis) {
super(retentionSizeInBytes, maxRetentionAgeInMillis);
this.now = now;
}
@Override
protected long currentTime() {
return now;
}
}
}

View File

@ -106,6 +106,7 @@ import java.util.stream.LongStream;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ -141,7 +142,8 @@ public class TranslogTests extends ESTestCase {
}
protected Translog createTranslog(TranslogConfig config, String translogUUID) throws IOException {
return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()),
() -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
}
private void markCurrentGenAsCommitted(Translog translog) throws IOException {
@ -157,11 +159,6 @@ public class TranslogTests extends ESTestCase {
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit);
translog.trimUnreferencedReaders();
if (deletionPolicy.pendingViewsCount() == 0) {
assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(genToCommit));
}
// we may have some views closed concurrently causing the deletion policy to increase it's minTranslogGenRequired
assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(deletionPolicy.minTranslogGenRequired()));
}
@Override
@ -186,7 +183,9 @@ public class TranslogTests extends ESTestCase {
private Translog create(Path path) throws IOException {
globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO);
return new Translog(getTranslogConfig(path), null, new TranslogDeletionPolicy(), () -> globalCheckpoint.get());
final TranslogConfig translogConfig = getTranslogConfig(path);
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
return new Translog(translogConfig, null, deletionPolicy, () -> globalCheckpoint.get());
}
private TranslogConfig getTranslogConfig(final Path path) {
@ -1104,7 +1103,12 @@ public class TranslogTests extends ESTestCase {
}
writer.sync();
final Checkpoint writerCheckpoint = writer.getCheckpoint();
try (TranslogReader reader = writer.closeIntoReader()) {
TranslogReader reader = writer.closeIntoReader();
try {
if (randomBoolean()) {
reader.close();
reader = translog.openReader(reader.path(), writerCheckpoint);
}
for (int i = 0; i < numOps; i++) {
final ByteBuffer buffer = ByteBuffer.allocate(4);
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i);
@ -1114,6 +1118,8 @@ public class TranslogTests extends ESTestCase {
}
final Checkpoint readerCheckpoint = reader.getCheckpoint();
assertThat(readerCheckpoint, equalTo(writerCheckpoint));
} finally {
IOUtils.close(reader);
}
}
}
@ -1376,7 +1382,7 @@ public class TranslogTests extends ESTestCase {
final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1,
translogGeneration.translogUUID.length());
try {
new Translog(config, foreignTranslog, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
fail("translog doesn't belong to this UUID");
} catch (TranslogCorruptedException ex) {
@ -1602,7 +1608,7 @@ public class TranslogTests extends ESTestCase {
Path tempDir = createTempDir();
final FailSwitch fail = new FailSwitch();
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = getFailableTranslog(fail, config, false, true, null, new TranslogDeletionPolicy());
Translog translog = getFailableTranslog(fail, config, false, true, null, createTranslogDeletionPolicy());
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
fail.failAlways();
@ -1697,7 +1703,7 @@ public class TranslogTests extends ESTestCase {
iterator.remove();
}
}
try (Translog tlog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
try (Translog tlog = new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
Translog.Snapshot snapshot = tlog.newSnapshot();
if (writtenOperations.size() != snapshot.totalOperations()) {
for (int i = 0; i < threadCount; i++) {
@ -1740,7 +1746,7 @@ public class TranslogTests extends ESTestCase {
// engine blows up, after committing the above generation
translog.close();
TranslogConfig config = translog.getConfig();
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
assertThat(translog.getMinFileGeneration(), equalTo(1L));
@ -1789,7 +1795,7 @@ public class TranslogTests extends ESTestCase {
// expected...
}
}
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
// we don't know when things broke exactly
@ -1803,7 +1809,7 @@ public class TranslogTests extends ESTestCase {
}
private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException {
return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy());
return getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy());
}
private static class FailSwitch {
@ -1965,7 +1971,7 @@ public class TranslogTests extends ESTestCase {
translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8"))));
translog.close();
try {
new Translog(config, translog.getTranslogUUID(), new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) {
new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) {
@Override
protected TranslogWriter createWriter(long fileGeneration) throws IOException {
throw new MockDirectoryWrapper.FakeIOException();
@ -2083,7 +2089,7 @@ public class TranslogTests extends ESTestCase {
String generationUUID = null;
try {
boolean committing = false;
final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, new TranslogDeletionPolicy());
final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, createTranslogDeletionPolicy());
try {
LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
@ -2142,7 +2148,7 @@ public class TranslogTests extends ESTestCase {
// now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
if (randomBoolean()) {
try {
TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy();
deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy));
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
@ -2153,7 +2159,7 @@ public class TranslogTests extends ESTestCase {
}
fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file
TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy();
deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
Translog.Snapshot snapshot = translog.newSnapshot();
@ -2218,7 +2224,7 @@ public class TranslogTests extends ESTestCase {
translog.rollGeneration();
TranslogConfig config = translog.getConfig();
final String translogUUID = translog.getTranslogUUID();
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings());
translog.close();
translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "2", 1, new byte[]{2}));
@ -2293,7 +2299,14 @@ public class TranslogTests extends ESTestCase {
assertEquals("my_id", serializedDelete.id());
}
public void testRollGeneration() throws IOException {
public void testRollGeneration() throws Exception {
// make sure we keep some files around
final boolean longRetention = randomBoolean();
if (longRetention) {
translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000);
} else {
translog.getDeletionPolicy().setRetentionAgeInMillis(-1);
}
final long generation = translog.currentFileGeneration();
final int rolls = randomIntBetween(1, 16);
int totalOperations = 0;
@ -2316,8 +2329,22 @@ public class TranslogTests extends ESTestCase {
commit(translog, generation + rolls);
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls ));
assertThat(translog.totalOperations(), equalTo(0));
for (int i = 0; i < rolls; i++) {
assertFileDeleted(translog, generation + i);
if (longRetention) {
for (int i = 0; i <= rolls; i++) {
assertFileIsPresent(translog, generation + i);
}
translog.getDeletionPolicy().setRetentionAgeInMillis(randomBoolean() ? 100 : -1);
assertBusy(() -> {
translog.trimUnreferencedReaders();
for (int i = 0; i < rolls; i++) {
assertFileDeleted(translog, generation + i);
}
});
} else {
// immediate cleanup
for (int i = 0; i < rolls; i++) {
assertFileDeleted(translog, generation + i);
}
}
assertFileIsPresent(translog, generation + rolls);
}