Trim translog for closed indices (#43156)

Today when an index is closed all its shards are forced flushed
but the translog files are left around. As explained in #42445
we'd like to trim the translog for closed indices in order to
consume less disk space. This commit reuses the existing
AsyncTrimTranslogTask task and reenables it for closed indices.

At the time the task is executed, we should have the guarantee
that nothing holds the translog files that are going to be removed.
It also leaves a short period of time (10 min) during which translog
files of a recently closed index are still present on disk. This could
 also help in some cases where the closed index is reopened
shortly after being closed (in order to update an index setting
for example).

Relates to #42445
This commit is contained in:
Tanguy Leroux 2019-06-28 16:18:54 +02:00
parent 7ca69db83f
commit f02cbe9e40
4 changed files with 153 additions and 5 deletions

View File

@ -944,6 +944,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
.getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10)));
}
@Override
protected boolean mustReschedule() {
return indexService.closed.get() == false;
}
@Override
protected void runInternal() {
indexService.maybeTrimTranslog();
@ -1035,8 +1040,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
return fsyncTask;
}
AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
return globalCheckpointTask;
AsyncTrimTranslogTask getTrimTranslogTask() { // for tests
return trimTranslogTask;
}
/**

View File

@ -27,16 +27,24 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
/**
* NoOpEngine is an engine implementation that does nothing but the bare minimum
* required in order to have an engine. All attempts to do something (search,
* index, get), throw {@link UnsupportedOperationException}.
* index, get), throw {@link UnsupportedOperationException}. However, NoOpEngine
* allows to trim any existing translog files through the usage of the
* {{@link #trimUnreferencedTranslogFiles()}} method.
*/
public final class NoOpEngine extends ReadOnlyEngine {
@ -116,4 +124,52 @@ public final class NoOpEngine extends ReadOnlyEngine {
return super.segmentsStats(includeSegmentFileSizes, includeUnloadedSegments);
}
}
/**
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
* that retains nothing but the last translog generation from safe commit.
*/
@Override
public void trimUnreferencedTranslogFiles() {
final Store store = this.engineConfig.getStore();
store.incRef();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1) {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);
if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
}
}
}
} catch (final Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog", e);
} finally {
store.decRef();
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
@ -42,12 +43,15 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.shard.IndexShardTestCase.getEngine;
import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.core.IsEqual.equalTo;
@ -370,7 +374,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
.build();
IndexService indexService = createIndex("test", settings);
ensureGreen("test");
assertTrue(indexService.getRefreshTask().mustReschedule());
assertTrue(indexService.getTrimTranslogTask().mustReschedule());
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
client().admin().indices().prepareFlush("test").get();
client().admin().indices().prepareUpdateSettings("test")
@ -382,6 +386,48 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0)));
}
public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception {
final String indexName = "test";
IndexService indexService = createIndex(indexName, Settings.builder()
.put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.build());
Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
final Path translogPath = translog.getConfig().getTranslogPath();
final String translogUuid = translog.getTranslogUUID();
final int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
}
}
assertThat(translog.totalOperations(), equalTo(numDocs));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(numDocs));
assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
assertTrue(indexService.getTrimTranslogTask().mustReschedule());
final long lastCommitedTranslogGeneration;
try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) {
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertBusy(() -> {
long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid);
assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration));
});
assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
assertThat(translog.totalOperations(), equalTo(0));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0));
}
public void testIllegalFsyncInterval() {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable

View File

@ -35,6 +35,7 @@ import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.test.IndexSettingsModule;
@ -42,6 +43,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
@ -83,7 +85,7 @@ public class NoOpEngineTests extends EngineTestCase {
tracker.updateLocalCheckpoint(allocationId.getId(), i);
}
flushAndTrimTranslog(engine);
engine.flush(true, true);
long localCheckpoint = engine.getPersistedLocalCheckpoint();
long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo();
@ -159,6 +161,45 @@ public class NoOpEngineTests extends EngineTestCase {
}
}
public void testTrimUnreferencedTranslogFiles() throws Exception {
final ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier();
ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node",
null, true, ShardRoutingState.STARTED, allocationId);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int numDocs = scaledRandomIntBetween(10, 3000);
for (int i = 0; i < numDocs; i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
if (rarely()) {
engine.flush();
}
tracker.updateLocalCheckpoint(allocationId.getId(), i);
}
engine.flush(true, true);
final String translogUuid = engine.getTranslog().getTranslogUUID();
final long minFileGeneration = engine.getTranslog().getMinFileGeneration();
final long currentFileGeneration = engine.getTranslog().currentFileGeneration();
engine.close();
final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker));
final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath();
final long lastCommitedTranslogGeneration;
try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) {
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration));
}
assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration));
noOpEngine.trimUnreferencedTranslogFiles();
assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration));
noOpEngine.close();
}
private void flushAndTrimTranslog(final InternalEngine engine) {
engine.flush(true, true);
final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy();