Add a scheduled translog retention check (#25622)

We currently check whether translog files can be trimmed whenever we create a new translog generation or close a view. However #25294 added a long translog retention period (12h, max 512MB by default), which means translog files should potentially be cleaned up long after there isn't any indexing activity to trigger flushes/the creation of new translog files. We therefore need a scheduled background check to clean up those files once they are no longer needed.

Relates to #10708
This commit is contained in:
Boaz Leskes 2017-07-10 10:28:39 +02:00 committed by GitHub
parent c084542731
commit 09378f48e4
6 changed files with 124 additions and 6 deletions

View File

@ -111,6 +111,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final List<SearchOperationListener> searchOperationListeners;
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
private final AsyncTrimTranslogTask trimTranslogTask;
private final ThreadPool threadPool;
private final BigArrays bigArrays;
private final ScriptService scriptService;
@ -177,6 +182,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}
@ -629,7 +635,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
} finally {
refreshTask = new AsyncRefreshTask(this);
}
}
public interface ShardStoreDeleter {
@ -693,6 +698,28 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
private void maybeTrimTranslog() {
for (IndexShard shard : this.shards.values()) {
switch (shard.state()) {
case CREATED:
case RECOVERING:
case CLOSED:
continue;
case POST_RECOVERY:
case STARTED:
case RELOCATED:
try {
shard.trimTranslog();
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
continue;
default:
throw new IllegalStateException("unknown state: " + shard.state());
}
}
}
abstract static class BaseAsyncTask implements Runnable, Closeable {
protected final IndexService indexService;
protected final ThreadPool threadPool;
@ -837,6 +864,29 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
final class AsyncTrimTranslogTask extends BaseAsyncTask {
AsyncTrimTranslogTask(IndexService indexService) {
super(indexService, indexService.getIndexSettings()
.getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10)));
}
@Override
protected void runInternal() {
indexService.maybeTrimTranslog();
}
@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}
@Override
public String toString() {
return "trim_translog";
}
}
AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}

View File

@ -803,12 +803,18 @@ public abstract class Engine implements Closeable {
*/
public abstract CommitId flush() throws EngineException;
/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimTranslog() throws EngineException;
/**
* Rolls the translog generation and cleans unneeded.
*/
public abstract void rollTranslogGeneration() throws EngineException;
/**
* Force merges to 1 segment
*/

View File

@ -1353,6 +1353,24 @@ public class InternalEngine extends Engine {
}
}
@Override
public void trimTranslog() throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog", e);
}
}
private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

View File

@ -988,6 +988,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return commitId;
}
/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
*/
public void trimTranslog() {
verifyNotClosed();
final Engine engine = getEngine();
engine.trimTranslog();
}
/**
* Rolls the tranlog generation and cleans unneeded.
*/

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
@ -34,19 +33,29 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.hamcrest.core.IsEqual.equalTo;
/** Unit test(s) for IndexService */
public class IndexServiceTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(InternalSettingsPlugin.class);
}
public static CompressedXContent filter(QueryBuilder filterBuilder) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
filterBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS);
@ -263,6 +272,26 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
assertNotNull(indexService.getFsyncTask());
}
public void testAsyncTranslogTrimActuallyWorks() throws Exception {
Settings settings = Settings.builder()
.put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") // very often :)
.build();
IndexService indexService = createIndex("test", settings);
ensureGreen("test");
assertTrue(indexService.getRefreshTask().mustReschedule());
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
client().admin().indices().prepareFlush("test").get();
IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder()
.put(indexService.getMetaData().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1)
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1))
.build();
indexService.updateMetaData(metaData);
IndexShard shard = indexService.getShard(0);
assertBusy(() -> assertThat(shard.getTranslog().totalOperations(), equalTo(0)));
}
public void testIllegalFsyncInterval() {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable

View File

@ -21,10 +21,12 @@ package org.elasticsearch.test;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public final class InternalSettingsPlugin extends Plugin {
@ -36,10 +38,13 @@ public final class InternalSettingsPlugin extends Plugin {
Setting.boolSetting("index.merge.enabled", true, Property.IndexScope, Property.NodeScope);
public static final Setting<Long> INDEX_CREATION_DATE_SETTING =
Setting.longSetting(IndexMetaData.SETTING_CREATION_DATE, -1, -1, Property.IndexScope, Property.NodeScope);
public static final Setting<TimeValue> TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING =
Setting.timeSetting("index.translog.retention.check_interval", new TimeValue(10, TimeUnit.MINUTES),
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(VERSION_CREATED, MERGE_ENABLED,
INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING);
INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING, TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING);
}
}