diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 9fcafcea3b2..ae4cf6cd41a 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -127,6 +127,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 2764ffd38cc..43ddb09e61f 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -37,6 +37,7 @@ import org.elasticsearch.node.Node; import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -111,6 +112,24 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + /** + * Controls how long translog files that are no longer needed for persistence reasons + * will be kept around before being deleted. A longer retention policy is useful to increase + * the chance of ops based recoveries. + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = + Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic, + Property.IndexScope); + + /** + * Controls how many translog files that are no longer needed for persistence reasons + * will be kept around before being deleted. Keeping more files is useful to increase + * the chance of ops based recoveries. + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = + Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic, + Property.IndexScope); + /** * The maximum size of a translog generation. This is independent of the maximum size of * translog operations that have not been flushed. @@ -168,6 +187,8 @@ public final class IndexSettings { private final TimeValue syncInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; + private volatile TimeValue translogRetentionAge; + private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue generationThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; @@ -265,6 +286,8 @@ public final class IndexSettings { syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); + translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING); + translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); @@ -302,6 +325,8 @@ public final class IndexSettings { scopedSettings.addSettingsUpdateConsumer( INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, this::setGenerationThresholdSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -311,6 +336,14 @@ public final class IndexSettings { this.flushThresholdSize = byteSizeValue; } + private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { + this.translogRetentionSize = byteSizeValue; + } + + private void setTranslogRetentionAge(TimeValue age) { + this.translogRetentionAge = age; + } + private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { this.generationThresholdSize = generationThresholdSize; } @@ -469,6 +502,16 @@ public final class IndexSettings { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + /** + * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries + */ + public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; } + + /** + * Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept around + */ + public TimeValue getTranslogRetentionAge() { return translogRetentionAge; } + /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f84f76b537e..6d10a029099 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -150,7 +150,10 @@ public class InternalEngine extends Engine { } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; this.versionMap = new LiveVersionMap(); - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() + ); this.deletionPolicy = new CombinedDeletionPolicy( new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); @@ -1854,6 +1857,10 @@ public class InternalEngine extends Engine { // the setting will be re-interpreted if it's set to true this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } + final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); + final IndexSettings indexSettings = engineConfig.getIndexSettings(); + translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); + translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); } public MergeStats getMergeStats() { diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index 6f392c195fd..7f8b7f3fb2c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Files; import java.nio.file.Path; /** @@ -121,4 +122,8 @@ public abstract class BaseTranslogReader implements Comparable= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 84f61a642cc..732b38fcedf 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -21,13 +21,17 @@ package org.elasticsearch.index.translog; import org.apache.lucene.util.Counter; +import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; public class TranslogDeletionPolicy { - /** Records how many views are held against each - * translog generation */ + /** + * Records how many views are held against each + * translog generation + */ private final Map translogRefCounts = new HashMap<>(); /** @@ -36,14 +40,31 @@ public class TranslogDeletionPolicy { */ private long minTranslogGenerationForRecovery = 1; + private long retentionSizeInBytes; + + private long retentionAgeInMillis; + + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) { + this.retentionSizeInBytes = retentionSizeInBytes; + this.retentionAgeInMillis = retentionAgeInMillis; + } + public synchronized void setMinTranslogGenerationForRecovery(long newGen) { if (newGen < minTranslogGenerationForRecovery) { throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" + - minTranslogGenerationForRecovery+ "]"); + minTranslogGenerationForRecovery + "]"); } minTranslogGenerationForRecovery = newGen; } + public synchronized void setRetentionSizeInBytes(long bytes) { + retentionSizeInBytes = bytes; + } + + public synchronized void setRetentionAgeInMillis(long ageInMillis) { + retentionAgeInMillis = ageInMillis; + } + /** * acquires the basis generation for a new view. Any translog generation above, and including, the returned generation * will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called. @@ -74,10 +95,59 @@ public class TranslogDeletionPolicy { /** * returns the minimum translog generation that is still required by the system. Any generation below * the returned value may be safely deleted + * + * @param readers current translog readers + * @param writer current translog writer */ - synchronized long minTranslogGenRequired() { - long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); - return Math.min(viewRefs, minTranslogGenerationForRecovery); + synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) throws IOException { + long minByView = getMinTranslogGenRequiredByViews(); + long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); + long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); + final long minByAgeAndSize; + if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) { + // both size and age are disabled; + minByAgeAndSize = Long.MAX_VALUE; + } else { + minByAgeAndSize = Math.max(minByAge, minBySize); + } + return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery)); + } + + static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { + if (retentionSizeInBytes >= 0) { + long totalSize = writer.sizeInBytes(); + long minGen = writer.getGeneration(); + for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) { + final TranslogReader reader = readers.get(i); + totalSize += reader.sizeInBytes(); + minGen = reader.getGeneration(); + } + return minGen; + } else { + return Long.MIN_VALUE; + } + } + + static long getMinTranslogGenByAge(List readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now) + throws IOException { + if (maxRetentionAgeInMillis >= 0) { + for (TranslogReader reader: readers) { + if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) { + return reader.getGeneration(); + } + } + return writer.getGeneration(); + } else { + return Long.MIN_VALUE; + } + } + + protected long currentTime() { + return System.currentTimeMillis(); + } + + private long getMinTranslogGenRequiredByViews() { + return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); } /** returns the translog generation that will be used as a basis of a future store/peer recovery */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 9057207501c..46439afead1 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -116,7 +116,7 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { throw new IllegalStateException("pre-2.0 translog found [" + path + "]"); case TranslogWriter.VERSION_CHECKPOINTS: assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path; - assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps; + assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps; assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint; int len = headerStream.readInt(); if (len > channel.size()) { @@ -130,8 +130,8 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref + " this translog file belongs to a different translog. path:" + path); } - final long firstOperationOffset = - ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; + final long firstOperationOffset; + firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; return new TranslogReader(checkpoint, channel, path, firstOperationOffset); default: diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index d637c9da79f..2c0bd0c7d89 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -88,6 +88,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException { super(initialCheckpoint.generation, channel, path, channel.position()); + assert initialCheckpoint.offset == channel.position() : + "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion [" + + channel.position() + "]"; this.shardId = shardId; this.channelFactory = channelFactory; this.minTranslogGenerationSupplier = minTranslogGenerationSupplier; @@ -116,18 +119,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { out.writeBytes(ref.bytes, ref.offset, ref.length); } - public static TranslogWriter create( - ShardId shardId, - String translogUUID, - long fileGeneration, - Path file, - ChannelFactory channelFactory, - ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier, - final long initialMinTranslogGen, - final LongSupplier minTranslogGenerationSupplier) throws IOException { + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, + ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, + final long initialMinTranslogGen, final LongSupplier minTranslogGenerationSupplier) + throws IOException { final BytesRef ref = new BytesRef(translogUUID); - final int headerLength = getHeaderLength(ref.length); + final int firstOperationOffset = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); try { // This OutputStreamDataOutput is intentionally not closed because @@ -135,12 +132,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); writeHeader(out, ref); channel.force(true); - final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(), - initialMinTranslogGen); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration, + globalCheckpointSupplier.getAsLong(), initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); - return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier, - minTranslogGenerationSupplier); + return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, + globalCheckpointSupplier, minTranslogGenerationSupplier); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index d21273a7b03..d1eef05c2ef 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -39,7 +40,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testPassThrough() throws IOException { SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class); - CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(), + CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); List commitList = new ArrayList<>(); long count = randomIntBetween(1, 3); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 16e746a67f7..af18781dfa6 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -180,6 +180,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -336,7 +337,7 @@ public class InternalEngineTests extends ESTestCase { protected Translog createTranslog(Path translogPath) throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - return new Translog(translogConfig, null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { @@ -2795,7 +2796,7 @@ public class InternalEngineTests extends ESTestCase { Translog translog = new Translog( new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), - null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); translog.close(); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java new file mode 100644 index 00000000000..3ed595543f8 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + + +public class TranslogDeletionPolicyTests extends ESTestCase { + + public static TranslogDeletionPolicy createTranslogDeletionPolicy() { + return new TranslogDeletionPolicy( + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() + ); + } + + public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { + return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), + indexSettings.getTranslogRetentionAge().getMillis()); + } + + public void testNoRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0); + assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); + final int committedReader = randomIntBetween(0, allGens.size() - 1); + final long committedGen = allGens.get(committedReader).generation; + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + public void testBytesRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + final int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), size), + equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), -1), + equalTo(Long.MIN_VALUE)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + public void testAgeRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + final int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), maxAge, now), + equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), -1, now), + equalTo(Long.MIN_VALUE)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + /** + * Tests that age trumps size but recovery trumps both. + */ + public void testRetentionHierarchy() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE); + deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); + int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationByAge = allGens.get(selectedReader).generation; + long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationBySize = allGens.get(selectedReader).generation; + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + long committedGen = allGens.get(selectedReader).generation; + deletionPolicy.setRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(size); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize)); + // make a new policy as committed gen can't go backwards (for now) + deletionPolicy = new MockDeletionPolicy(now, size, maxAge); + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); + long viewGen = deletionPolicy.acquireTranslogGenForView(); + selectedReader = randomIntBetween(selectedReader, allGens.size() - 1); + committedGen = allGens.get(selectedReader).generation; + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min( + Math.min(committedGen, viewGen), + Math.max(selectedGenerationByAge, selectedGenerationBySize))); + // disable age + deletionPolicy.setRetentionAgeInMillis(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize)); + // disable size + deletionPolicy.setRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge)); + // disable both + deletionPolicy.setRetentionAgeInMillis(-1); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + + } + + private void assertMinGenRequired(TranslogDeletionPolicy deletionPolicy, Tuple, TranslogWriter> readersAndWriter, + long expectedGen) throws IOException { + assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(expectedGen)); + } + + private Tuple, TranslogWriter> createReadersAndWriter(final long now) throws IOException { + final Path tempDir = createTempDir(); + Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME)); + TranslogWriter writer = null; + List readers = new ArrayList<>(); + final int numberOfReaders = randomIntBetween(0, 10); + for (long gen = 1; gen <= numberOfReaders + 1; gen++) { + if (writer != null) { + final TranslogReader reader = Mockito.spy(writer.closeIntoReader()); + Mockito.doReturn(writer.getLastModifiedTime()).when(reader).getLastModifiedTime(); + readers.add(reader); + } + writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen, + tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L + ); + writer = Mockito.spy(writer); + Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); + + byte[] bytes = new byte[4]; + ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + + for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { + out.reset(bytes); + out.writeInt(ops); + writer.add(new BytesArray(bytes), ops); + } + } + return new Tuple<>(readers, writer); + } + + private static class MockDeletionPolicy extends TranslogDeletionPolicy { + + long now; + + MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis) { + super(retentionSizeInBytes, maxRetentionAgeInMillis); + this.now = now; + } + + @Override + protected long currentTime() { + return now; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 4fe97919c38..21bc1a14bc5 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -106,6 +106,7 @@ import java.util.stream.LongStream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -141,7 +142,8 @@ public class TranslogTests extends ESTestCase { } protected Translog createTranslog(TranslogConfig config, String translogUUID) throws IOException { - return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), + () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } private void markCurrentGenAsCommitted(Translog translog) throws IOException { @@ -157,11 +159,6 @@ public class TranslogTests extends ESTestCase { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit); translog.trimUnreferencedReaders(); - if (deletionPolicy.pendingViewsCount() == 0) { - assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(genToCommit)); - } - // we may have some views closed concurrently causing the deletion policy to increase it's minTranslogGenRequired - assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(deletionPolicy.minTranslogGenRequired())); } @Override @@ -186,7 +183,9 @@ public class TranslogTests extends ESTestCase { private Translog create(Path path) throws IOException { globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); - return new Translog(getTranslogConfig(path), null, new TranslogDeletionPolicy(), () -> globalCheckpoint.get()); + final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); + return new Translog(translogConfig, null, deletionPolicy, () -> globalCheckpoint.get()); } private TranslogConfig getTranslogConfig(final Path path) { @@ -1104,7 +1103,12 @@ public class TranslogTests extends ESTestCase { } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint(); - try (TranslogReader reader = writer.closeIntoReader()) { + TranslogReader reader = writer.closeIntoReader(); + try { + if (randomBoolean()) { + reader.close(); + reader = translog.openReader(reader.path(), writerCheckpoint); + } for (int i = 0; i < numOps; i++) { final ByteBuffer buffer = ByteBuffer.allocate(4); reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); @@ -1114,6 +1118,8 @@ public class TranslogTests extends ESTestCase { } final Checkpoint readerCheckpoint = reader.getCheckpoint(); assertThat(readerCheckpoint, equalTo(writerCheckpoint)); + } finally { + IOUtils.close(reader); } } } @@ -1376,7 +1382,7 @@ public class TranslogTests extends ESTestCase { final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()); try { - new Translog(config, foreignTranslog, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("translog doesn't belong to this UUID"); } catch (TranslogCorruptedException ex) { @@ -1602,7 +1608,7 @@ public class TranslogTests extends ESTestCase { Path tempDir = createTempDir(); final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = getFailableTranslog(fail, config, false, true, null, new TranslogDeletionPolicy()); + Translog translog = getFailableTranslog(fail, config, false, true, null, createTranslogDeletionPolicy()); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); fail.failAlways(); @@ -1697,7 +1703,7 @@ public class TranslogTests extends ESTestCase { iterator.remove(); } } - try (Translog tlog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + try (Translog tlog = new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = tlog.newSnapshot(); if (writtenOperations.size() != snapshot.totalOperations()) { for (int i = 0; i < threadCount; i++) { @@ -1740,7 +1746,7 @@ public class TranslogTests extends ESTestCase { // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -1789,7 +1795,7 @@ public class TranslogTests extends ESTestCase { // expected... } } - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { // we don't know when things broke exactly @@ -1803,7 +1809,7 @@ public class TranslogTests extends ESTestCase { } private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { - return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy()); + return getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy()); } private static class FailSwitch { @@ -1965,7 +1971,7 @@ public class TranslogTests extends ESTestCase { translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8")))); translog.close(); try { - new Translog(config, translog.getTranslogUUID(), new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { + new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { @Override protected TranslogWriter createWriter(long fileGeneration) throws IOException { throw new MockDirectoryWrapper.FakeIOException(); @@ -2083,7 +2089,7 @@ public class TranslogTests extends ESTestCase { String generationUUID = null; try { boolean committing = false; - final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, new TranslogDeletionPolicy()); + final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, createTranslogDeletionPolicy()); try { LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { @@ -2142,7 +2148,7 @@ public class TranslogTests extends ESTestCase { // now randomly open this failing tlog again just to make sure we can also recover from failing during recovery if (randomBoolean()) { try { - TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -2153,7 +2159,7 @@ public class TranslogTests extends ESTestCase { } fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file - TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = translog.newSnapshot(); @@ -2218,7 +2224,7 @@ public class TranslogTests extends ESTestCase { translog.rollGeneration(); TranslogConfig config = translog.getConfig(); final String translogUUID = translog.getTranslogUUID(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); translog.close(); translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "2", 1, new byte[]{2})); @@ -2293,7 +2299,14 @@ public class TranslogTests extends ESTestCase { assertEquals("my_id", serializedDelete.id()); } - public void testRollGeneration() throws IOException { + public void testRollGeneration() throws Exception { + // make sure we keep some files around + final boolean longRetention = randomBoolean(); + if (longRetention) { + translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000); + } else { + translog.getDeletionPolicy().setRetentionAgeInMillis(-1); + } final long generation = translog.currentFileGeneration(); final int rolls = randomIntBetween(1, 16); int totalOperations = 0; @@ -2316,8 +2329,22 @@ public class TranslogTests extends ESTestCase { commit(translog, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls )); assertThat(translog.totalOperations(), equalTo(0)); - for (int i = 0; i < rolls; i++) { - assertFileDeleted(translog, generation + i); + if (longRetention) { + for (int i = 0; i <= rolls; i++) { + assertFileIsPresent(translog, generation + i); + } + translog.getDeletionPolicy().setRetentionAgeInMillis(randomBoolean() ? 100 : -1); + assertBusy(() -> { + translog.trimUnreferencedReaders(); + for (int i = 0; i < rolls; i++) { + assertFileDeleted(translog, generation + i); + } + }); + } else { + // immediate cleanup + for (int i = 0; i < rolls; i++) { + assertFileDeleted(translog, generation + i); + } } assertFileIsPresent(translog, generation + rolls); }