From 9ddea539f509165a4cd9a713d240b9ddedb1ea08 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 16 Jun 2017 09:09:51 +0200 Subject: [PATCH] Introduce translog size and age based retention policies (#25147) This PR extends the TranslogDeletionPolicy to allow keeping the translog files longer than what is needed for recovery from lucene. Specifically, we allow specifying the total size of the files and their maximum age (i.e., keep up to 512MB but no longer than 12 hours). This will allow making ops based recoveries more common. Note that the default size and age still set to 0, maintaining current behavior. This is needed as the other components in the system are not yet ready for a longer translog retention. I will adapt those in follow up PRs. Relates to #10708 --- .../common/settings/IndexScopedSettings.java | 2 + .../elasticsearch/index/IndexSettings.java | 43 ++++ .../index/engine/InternalEngine.java | 9 +- .../index/translog/BaseTranslogReader.java | 5 + .../index/translog/Translog.java | 2 +- .../translog/TranslogDeletionPolicy.java | 82 ++++++- .../index/translog/TranslogReader.java | 6 +- .../index/translog/TranslogWriter.java | 28 +-- .../engine/CombinedDeletionPolicyTests.java | 3 +- .../index/engine/InternalEngineTests.java | 5 +- .../translog/TranslogDeletionPolicyTests.java | 214 ++++++++++++++++++ .../index/translog/TranslogTests.java | 71 ++++-- 12 files changed, 418 insertions(+), 52 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 9fcafcea3b2..ae4cf6cd41a 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -127,6 +127,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 2764ffd38cc..43ddb09e61f 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -37,6 +37,7 @@ import org.elasticsearch.node.Node; import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -111,6 +112,24 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + /** + * Controls how long translog files that are no longer needed for persistence reasons + * will be kept around before being deleted. A longer retention policy is useful to increase + * the chance of ops based recoveries. + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = + Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic, + Property.IndexScope); + + /** + * Controls how many translog files that are no longer needed for persistence reasons + * will be kept around before being deleted. Keeping more files is useful to increase + * the chance of ops based recoveries. + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = + Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic, + Property.IndexScope); + /** * The maximum size of a translog generation. This is independent of the maximum size of * translog operations that have not been flushed. @@ -168,6 +187,8 @@ public final class IndexSettings { private final TimeValue syncInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; + private volatile TimeValue translogRetentionAge; + private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue generationThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; @@ -265,6 +286,8 @@ public final class IndexSettings { syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); + translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING); + translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); @@ -302,6 +325,8 @@ public final class IndexSettings { scopedSettings.addSettingsUpdateConsumer( INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, this::setGenerationThresholdSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -311,6 +336,14 @@ public final class IndexSettings { this.flushThresholdSize = byteSizeValue; } + private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { + this.translogRetentionSize = byteSizeValue; + } + + private void setTranslogRetentionAge(TimeValue age) { + this.translogRetentionAge = age; + } + private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { this.generationThresholdSize = generationThresholdSize; } @@ -469,6 +502,16 @@ public final class IndexSettings { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + /** + * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries + */ + public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; } + + /** + * Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept around + */ + public TimeValue getTranslogRetentionAge() { return translogRetentionAge; } + /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f84f76b537e..6d10a029099 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -150,7 +150,10 @@ public class InternalEngine extends Engine { } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; this.versionMap = new LiveVersionMap(); - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() + ); this.deletionPolicy = new CombinedDeletionPolicy( new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); @@ -1854,6 +1857,10 @@ public class InternalEngine extends Engine { // the setting will be re-interpreted if it's set to true this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } + final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); + final IndexSettings indexSettings = engineConfig.getIndexSettings(); + translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); + translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); } public MergeStats getMergeStats() { diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index 6f392c195fd..7f8b7f3fb2c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Files; import java.nio.file.Path; /** @@ -121,4 +122,8 @@ public abstract class BaseTranslogReader implements Comparable= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 84f61a642cc..732b38fcedf 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -21,13 +21,17 @@ package org.elasticsearch.index.translog; import org.apache.lucene.util.Counter; +import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; public class TranslogDeletionPolicy { - /** Records how many views are held against each - * translog generation */ + /** + * Records how many views are held against each + * translog generation + */ private final Map translogRefCounts = new HashMap<>(); /** @@ -36,14 +40,31 @@ public class TranslogDeletionPolicy { */ private long minTranslogGenerationForRecovery = 1; + private long retentionSizeInBytes; + + private long retentionAgeInMillis; + + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) { + this.retentionSizeInBytes = retentionSizeInBytes; + this.retentionAgeInMillis = retentionAgeInMillis; + } + public synchronized void setMinTranslogGenerationForRecovery(long newGen) { if (newGen < minTranslogGenerationForRecovery) { throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" + - minTranslogGenerationForRecovery+ "]"); + minTranslogGenerationForRecovery + "]"); } minTranslogGenerationForRecovery = newGen; } + public synchronized void setRetentionSizeInBytes(long bytes) { + retentionSizeInBytes = bytes; + } + + public synchronized void setRetentionAgeInMillis(long ageInMillis) { + retentionAgeInMillis = ageInMillis; + } + /** * acquires the basis generation for a new view. Any translog generation above, and including, the returned generation * will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called. @@ -74,10 +95,59 @@ public class TranslogDeletionPolicy { /** * returns the minimum translog generation that is still required by the system. Any generation below * the returned value may be safely deleted + * + * @param readers current translog readers + * @param writer current translog writer */ - synchronized long minTranslogGenRequired() { - long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); - return Math.min(viewRefs, minTranslogGenerationForRecovery); + synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) throws IOException { + long minByView = getMinTranslogGenRequiredByViews(); + long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); + long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); + final long minByAgeAndSize; + if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) { + // both size and age are disabled; + minByAgeAndSize = Long.MAX_VALUE; + } else { + minByAgeAndSize = Math.max(minByAge, minBySize); + } + return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery)); + } + + static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { + if (retentionSizeInBytes >= 0) { + long totalSize = writer.sizeInBytes(); + long minGen = writer.getGeneration(); + for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) { + final TranslogReader reader = readers.get(i); + totalSize += reader.sizeInBytes(); + minGen = reader.getGeneration(); + } + return minGen; + } else { + return Long.MIN_VALUE; + } + } + + static long getMinTranslogGenByAge(List readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now) + throws IOException { + if (maxRetentionAgeInMillis >= 0) { + for (TranslogReader reader: readers) { + if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) { + return reader.getGeneration(); + } + } + return writer.getGeneration(); + } else { + return Long.MIN_VALUE; + } + } + + protected long currentTime() { + return System.currentTimeMillis(); + } + + private long getMinTranslogGenRequiredByViews() { + return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); } /** returns the translog generation that will be used as a basis of a future store/peer recovery */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 9057207501c..46439afead1 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -116,7 +116,7 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { throw new IllegalStateException("pre-2.0 translog found [" + path + "]"); case TranslogWriter.VERSION_CHECKPOINTS: assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path; - assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps; + assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps; assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint; int len = headerStream.readInt(); if (len > channel.size()) { @@ -130,8 +130,8 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref + " this translog file belongs to a different translog. path:" + path); } - final long firstOperationOffset = - ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; + final long firstOperationOffset; + firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; return new TranslogReader(checkpoint, channel, path, firstOperationOffset); default: diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index d637c9da79f..2c0bd0c7d89 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -88,6 +88,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException { super(initialCheckpoint.generation, channel, path, channel.position()); + assert initialCheckpoint.offset == channel.position() : + "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion [" + + channel.position() + "]"; this.shardId = shardId; this.channelFactory = channelFactory; this.minTranslogGenerationSupplier = minTranslogGenerationSupplier; @@ -116,18 +119,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { out.writeBytes(ref.bytes, ref.offset, ref.length); } - public static TranslogWriter create( - ShardId shardId, - String translogUUID, - long fileGeneration, - Path file, - ChannelFactory channelFactory, - ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier, - final long initialMinTranslogGen, - final LongSupplier minTranslogGenerationSupplier) throws IOException { + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, + ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, + final long initialMinTranslogGen, final LongSupplier minTranslogGenerationSupplier) + throws IOException { final BytesRef ref = new BytesRef(translogUUID); - final int headerLength = getHeaderLength(ref.length); + final int firstOperationOffset = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); try { // This OutputStreamDataOutput is intentionally not closed because @@ -135,12 +132,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); writeHeader(out, ref); channel.force(true); - final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(), - initialMinTranslogGen); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration, + globalCheckpointSupplier.getAsLong(), initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); - return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier, - minTranslogGenerationSupplier); + return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, + globalCheckpointSupplier, minTranslogGenerationSupplier); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index d21273a7b03..d1eef05c2ef 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -39,7 +40,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testPassThrough() throws IOException { SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class); - CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(), + CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); List commitList = new ArrayList<>(); long count = randomIntBetween(1, 3); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 16e746a67f7..af18781dfa6 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -180,6 +180,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -336,7 +337,7 @@ public class InternalEngineTests extends ESTestCase { protected Translog createTranslog(Path translogPath) throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - return new Translog(translogConfig, null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { @@ -2795,7 +2796,7 @@ public class InternalEngineTests extends ESTestCase { Translog translog = new Translog( new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), - null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); translog.close(); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java new file mode 100644 index 00000000000..3ed595543f8 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + + +public class TranslogDeletionPolicyTests extends ESTestCase { + + public static TranslogDeletionPolicy createTranslogDeletionPolicy() { + return new TranslogDeletionPolicy( + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() + ); + } + + public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { + return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), + indexSettings.getTranslogRetentionAge().getMillis()); + } + + public void testNoRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0); + assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); + final int committedReader = randomIntBetween(0, allGens.size() - 1); + final long committedGen = allGens.get(committedReader).generation; + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + public void testBytesRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + final int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), size), + equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), -1), + equalTo(Long.MIN_VALUE)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + public void testAgeRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + final int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), maxAge, now), + equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), -1, now), + equalTo(Long.MIN_VALUE)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + /** + * Tests that age trumps size but recovery trumps both. + */ + public void testRetentionHierarchy() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE); + deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); + int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationByAge = allGens.get(selectedReader).generation; + long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationBySize = allGens.get(selectedReader).generation; + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + long committedGen = allGens.get(selectedReader).generation; + deletionPolicy.setRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(size); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize)); + // make a new policy as committed gen can't go backwards (for now) + deletionPolicy = new MockDeletionPolicy(now, size, maxAge); + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); + long viewGen = deletionPolicy.acquireTranslogGenForView(); + selectedReader = randomIntBetween(selectedReader, allGens.size() - 1); + committedGen = allGens.get(selectedReader).generation; + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min( + Math.min(committedGen, viewGen), + Math.max(selectedGenerationByAge, selectedGenerationBySize))); + // disable age + deletionPolicy.setRetentionAgeInMillis(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize)); + // disable size + deletionPolicy.setRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge)); + // disable both + deletionPolicy.setRetentionAgeInMillis(-1); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + + } + + private void assertMinGenRequired(TranslogDeletionPolicy deletionPolicy, Tuple, TranslogWriter> readersAndWriter, + long expectedGen) throws IOException { + assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(expectedGen)); + } + + private Tuple, TranslogWriter> createReadersAndWriter(final long now) throws IOException { + final Path tempDir = createTempDir(); + Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME)); + TranslogWriter writer = null; + List readers = new ArrayList<>(); + final int numberOfReaders = randomIntBetween(0, 10); + for (long gen = 1; gen <= numberOfReaders + 1; gen++) { + if (writer != null) { + final TranslogReader reader = Mockito.spy(writer.closeIntoReader()); + Mockito.doReturn(writer.getLastModifiedTime()).when(reader).getLastModifiedTime(); + readers.add(reader); + } + writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen, + tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L + ); + writer = Mockito.spy(writer); + Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); + + byte[] bytes = new byte[4]; + ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + + for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { + out.reset(bytes); + out.writeInt(ops); + writer.add(new BytesArray(bytes), ops); + } + } + return new Tuple<>(readers, writer); + } + + private static class MockDeletionPolicy extends TranslogDeletionPolicy { + + long now; + + MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis) { + super(retentionSizeInBytes, maxRetentionAgeInMillis); + this.now = now; + } + + @Override + protected long currentTime() { + return now; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 4fe97919c38..21bc1a14bc5 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -106,6 +106,7 @@ import java.util.stream.LongStream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -141,7 +142,8 @@ public class TranslogTests extends ESTestCase { } protected Translog createTranslog(TranslogConfig config, String translogUUID) throws IOException { - return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), + () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } private void markCurrentGenAsCommitted(Translog translog) throws IOException { @@ -157,11 +159,6 @@ public class TranslogTests extends ESTestCase { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit); translog.trimUnreferencedReaders(); - if (deletionPolicy.pendingViewsCount() == 0) { - assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(genToCommit)); - } - // we may have some views closed concurrently causing the deletion policy to increase it's minTranslogGenRequired - assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(deletionPolicy.minTranslogGenRequired())); } @Override @@ -186,7 +183,9 @@ public class TranslogTests extends ESTestCase { private Translog create(Path path) throws IOException { globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); - return new Translog(getTranslogConfig(path), null, new TranslogDeletionPolicy(), () -> globalCheckpoint.get()); + final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); + return new Translog(translogConfig, null, deletionPolicy, () -> globalCheckpoint.get()); } private TranslogConfig getTranslogConfig(final Path path) { @@ -1104,7 +1103,12 @@ public class TranslogTests extends ESTestCase { } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint(); - try (TranslogReader reader = writer.closeIntoReader()) { + TranslogReader reader = writer.closeIntoReader(); + try { + if (randomBoolean()) { + reader.close(); + reader = translog.openReader(reader.path(), writerCheckpoint); + } for (int i = 0; i < numOps; i++) { final ByteBuffer buffer = ByteBuffer.allocate(4); reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); @@ -1114,6 +1118,8 @@ public class TranslogTests extends ESTestCase { } final Checkpoint readerCheckpoint = reader.getCheckpoint(); assertThat(readerCheckpoint, equalTo(writerCheckpoint)); + } finally { + IOUtils.close(reader); } } } @@ -1376,7 +1382,7 @@ public class TranslogTests extends ESTestCase { final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()); try { - new Translog(config, foreignTranslog, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("translog doesn't belong to this UUID"); } catch (TranslogCorruptedException ex) { @@ -1602,7 +1608,7 @@ public class TranslogTests extends ESTestCase { Path tempDir = createTempDir(); final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = getFailableTranslog(fail, config, false, true, null, new TranslogDeletionPolicy()); + Translog translog = getFailableTranslog(fail, config, false, true, null, createTranslogDeletionPolicy()); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); fail.failAlways(); @@ -1697,7 +1703,7 @@ public class TranslogTests extends ESTestCase { iterator.remove(); } } - try (Translog tlog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + try (Translog tlog = new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = tlog.newSnapshot(); if (writtenOperations.size() != snapshot.totalOperations()) { for (int i = 0; i < threadCount; i++) { @@ -1740,7 +1746,7 @@ public class TranslogTests extends ESTestCase { // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -1789,7 +1795,7 @@ public class TranslogTests extends ESTestCase { // expected... } } - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { // we don't know when things broke exactly @@ -1803,7 +1809,7 @@ public class TranslogTests extends ESTestCase { } private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { - return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy()); + return getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy()); } private static class FailSwitch { @@ -1965,7 +1971,7 @@ public class TranslogTests extends ESTestCase { translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8")))); translog.close(); try { - new Translog(config, translog.getTranslogUUID(), new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { + new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { @Override protected TranslogWriter createWriter(long fileGeneration) throws IOException { throw new MockDirectoryWrapper.FakeIOException(); @@ -2083,7 +2089,7 @@ public class TranslogTests extends ESTestCase { String generationUUID = null; try { boolean committing = false; - final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, new TranslogDeletionPolicy()); + final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, createTranslogDeletionPolicy()); try { LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { @@ -2142,7 +2148,7 @@ public class TranslogTests extends ESTestCase { // now randomly open this failing tlog again just to make sure we can also recover from failing during recovery if (randomBoolean()) { try { - TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -2153,7 +2159,7 @@ public class TranslogTests extends ESTestCase { } fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file - TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = translog.newSnapshot(); @@ -2218,7 +2224,7 @@ public class TranslogTests extends ESTestCase { translog.rollGeneration(); TranslogConfig config = translog.getConfig(); final String translogUUID = translog.getTranslogUUID(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); translog.close(); translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "2", 1, new byte[]{2})); @@ -2293,7 +2299,14 @@ public class TranslogTests extends ESTestCase { assertEquals("my_id", serializedDelete.id()); } - public void testRollGeneration() throws IOException { + public void testRollGeneration() throws Exception { + // make sure we keep some files around + final boolean longRetention = randomBoolean(); + if (longRetention) { + translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000); + } else { + translog.getDeletionPolicy().setRetentionAgeInMillis(-1); + } final long generation = translog.currentFileGeneration(); final int rolls = randomIntBetween(1, 16); int totalOperations = 0; @@ -2316,8 +2329,22 @@ public class TranslogTests extends ESTestCase { commit(translog, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls )); assertThat(translog.totalOperations(), equalTo(0)); - for (int i = 0; i < rolls; i++) { - assertFileDeleted(translog, generation + i); + if (longRetention) { + for (int i = 0; i <= rolls; i++) { + assertFileIsPresent(translog, generation + i); + } + translog.getDeletionPolicy().setRetentionAgeInMillis(randomBoolean() ? 100 : -1); + assertBusy(() -> { + translog.trimUnreferencedReaders(); + for (int i = 0; i < rolls; i++) { + assertFileDeleted(translog, generation + i); + } + }); + } else { + // immediate cleanup + for (int i = 0; i < rolls; i++) { + assertFileDeleted(translog, generation + i); + } } assertFileIsPresent(translog, generation + rolls); }