From f02cbe9e402d0f96c001ed80b6a10bdcc863e3f7 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 28 Jun 2019 16:18:54 +0200 Subject: [PATCH] Trim translog for closed indices (#43156) Today when an index is closed all its shards are forced flushed but the translog files are left around. As explained in #42445 we'd like to trim the translog for closed indices in order to consume less disk space. This commit reuses the existing AsyncTrimTranslogTask task and reenables it for closed indices. At the time the task is executed, we should have the guarantee that nothing holds the translog files that are going to be removed. It also leaves a short period of time (10 min) during which translog files of a recently closed index are still present on disk. This could also help in some cases where the closed index is reopened shortly after being closed (in order to update an index setting for example). Relates to #42445 --- .../org/elasticsearch/index/IndexService.java | 9 ++- .../index/engine/NoOpEngine.java | 58 ++++++++++++++++++- .../index/IndexServiceTests.java | 48 ++++++++++++++- .../index/engine/NoOpEngineTests.java | 43 +++++++++++++- 4 files changed, 153 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index d09dcd02b89..2e27423cbcd 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -944,6 +944,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust .getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10))); } + @Override + protected boolean mustReschedule() { + return indexService.closed.get() == false; + } + @Override protected void runInternal() { indexService.maybeTrimTranslog(); @@ -1035,8 +1040,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust return fsyncTask; } - AsyncGlobalCheckpointTask getGlobalCheckpointTask() { - return globalCheckpointTask; + AsyncTrimTranslogTask getTrimTranslogTask() { // for tests + return trimTranslogTask; } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 7f474d1be24..007a13351df 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -27,16 +27,24 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.store.Directory; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.function.Function; /** * NoOpEngine is an engine implementation that does nothing but the bare minimum * required in order to have an engine. All attempts to do something (search, - * index, get), throw {@link UnsupportedOperationException}. + * index, get), throw {@link UnsupportedOperationException}. However, NoOpEngine + * allows to trim any existing translog files through the usage of the + * {{@link #trimUnreferencedTranslogFiles()}} method. */ public final class NoOpEngine extends ReadOnlyEngine { @@ -116,4 +124,52 @@ public final class NoOpEngine extends ReadOnlyEngine { return super.segmentsStats(includeSegmentFileSizes, includeUnloadedSegments); } } + + /** + * This implementation will trim existing translog files using a {@link TranslogDeletionPolicy} + * that retains nothing but the last translog generation from safe commit. + */ + @Override + public void trimUnreferencedTranslogFiles() { + final Store store = this.engineConfig.getStore(); + store.incRef(); + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + final List commits = DirectoryReader.listCommits(store.directory()); + if (commits.size() == 1) { + final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); + final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); + if (translogUuid == null) { + throw new IllegalStateException("commit doesn't contain translog unique id"); + } + if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { + throw new IllegalStateException("commit doesn't contain translog generation id"); + } + final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid); + + if (minTranslogGeneration < lastCommitGeneration) { + // a translog deletion policy that retains nothing but the last translog generation from safe commit + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); + translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); + translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); + + try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, + engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { + translog.trimUnreferencedReaders(); + } + } + } + } catch (final Exception e) { + try { + failEngine("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new EngineException(shardId, "failed to trim translog", e); + } finally { + store.decRef(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 2d4030a51ce..5f1081f731b 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.index; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; @@ -42,12 +43,15 @@ import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.index.shard.IndexShardTestCase.getEngine; import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.core.IsEqual.equalTo; @@ -370,7 +374,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase { .build(); IndexService indexService = createIndex("test", settings); ensureGreen("test"); - assertTrue(indexService.getRefreshTask().mustReschedule()); + assertTrue(indexService.getTrimTranslogTask().mustReschedule()); client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); client().admin().indices().prepareFlush("test").get(); client().admin().indices().prepareUpdateSettings("test") @@ -382,6 +386,48 @@ public class IndexServiceTests extends ESSingleNodeTestCase { assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0))); } + public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { + final String indexName = "test"; + IndexService indexService = createIndex(indexName, Settings.builder() + .put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .build()); + + Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); + final Path translogPath = translog.getConfig().getTranslogPath(); + final String translogUuid = translog.getTranslogUUID(); + + final int numDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); + if (randomBoolean()) { + client().admin().indices().prepareFlush(indexName).get(); + } + } + assertThat(translog.totalOperations(), equalTo(numDocs)); + assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(numDocs)); + assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.DEFAULT)); + + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); + assertTrue(indexService.getTrimTranslogTask().mustReschedule()); + + final long lastCommitedTranslogGeneration; + try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) { + Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); + lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + } + assertBusy(() -> { + long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid); + assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration)); + }); + + assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT)); + + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); + translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); + assertThat(translog.totalOperations(), equalTo(0)); + assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0)); + } + public void testIllegalFsyncInterval() { Settings settings = Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index 6f74ac23a8e..f45eab0e057 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.test.IndexSettingsModule; @@ -42,6 +43,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.Collections; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; @@ -83,7 +85,7 @@ public class NoOpEngineTests extends EngineTestCase { tracker.updateLocalCheckpoint(allocationId.getId(), i); } - flushAndTrimTranslog(engine); + engine.flush(true, true); long localCheckpoint = engine.getPersistedLocalCheckpoint(); long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo(); @@ -159,6 +161,45 @@ public class NoOpEngineTests extends EngineTestCase { } } + public void testTrimUnreferencedTranslogFiles() throws Exception { + final ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier(); + ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node", + null, true, ShardRoutingState.STARTED, allocationId); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build(); + tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table); + tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final int numDocs = scaledRandomIntBetween(10, 3000); + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + if (rarely()) { + engine.flush(); + } + tracker.updateLocalCheckpoint(allocationId.getId(), i); + } + engine.flush(true, true); + + final String translogUuid = engine.getTranslog().getTranslogUUID(); + final long minFileGeneration = engine.getTranslog().getMinFileGeneration(); + final long currentFileGeneration = engine.getTranslog().currentFileGeneration(); + engine.close(); + + final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); + final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath(); + + final long lastCommitedTranslogGeneration; + try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) { + Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); + lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration)); + } + + assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration)); + noOpEngine.trimUnreferencedTranslogFiles(); + assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration)); + noOpEngine.close(); + } + private void flushAndTrimTranslog(final InternalEngine engine) { engine.flush(true, true); final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy();