diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 9738cb034e8..cd7e03144b3 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -199,7 +199,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); - rescheduleFsyncTask(indexSettings.getTranslogDurability()); + updateFsyncTaskIfNecessary(); } public int numberOfShards() { @@ -640,8 +640,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust @Override public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) { - final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability(); - final boolean updateIndexMetaData = indexSettings.updateIndexMetaData(newIndexMetaData); if (Assertions.ENABLED @@ -693,20 +691,23 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust }); rescheduleRefreshTasks(); } - final Translog.Durability durability = indexSettings.getTranslogDurability(); - if (durability != oldTranslogDurability) { - rescheduleFsyncTask(durability); - } + updateFsyncTaskIfNecessary(); } } - private void rescheduleFsyncTask(Translog.Durability durability) { - try { - if (fsyncTask != null) { - fsyncTask.close(); + private void updateFsyncTaskIfNecessary() { + if (indexSettings.getTranslogDurability() == Translog.Durability.REQUEST) { + try { + if (fsyncTask != null) { + fsyncTask.close(); + } + } finally { + fsyncTask = null; } - } finally { - fsyncTask = durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this); + } else if (fsyncTask == null) { + fsyncTask = new AsyncTranslogFSync(this); + } else { + fsyncTask.updateIfNeeded(); } } @@ -868,6 +869,13 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust indexService.maybeFSyncTranslogs(); } + void updateIfNeeded() { + final TimeValue newInterval = indexService.getIndexSettings().getTranslogSyncInterval(); + if (newInterval.equals(getInterval()) == false) { + setInterval(newInterval); + } + } + @Override public String toString() { return "translog_sync"; diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 33215564305..d4cc38f0b95 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -62,7 +62,7 @@ public final class IndexSettings { Setting.boolSetting("index.query.parse.allow_unmapped_fields", true, Property.IndexScope); public static final Setting INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), - Property.IndexScope); + Property.Dynamic, Property.IndexScope); public static final Setting INDEX_SEARCH_IDLE_AFTER = Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30), TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic); @@ -316,7 +316,7 @@ public final class IndexSettings { private final boolean queryStringAllowLeadingWildcard; private final boolean defaultAllowUnmappedFields; private volatile Translog.Durability durability; - private final TimeValue syncInterval; + private volatile TimeValue syncInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; private volatile TimeValue translogRetentionAge; @@ -501,6 +501,7 @@ public final class IndexSettings { MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, mergeSchedulerConfig::setMaxThreadAndMergeCount); scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval); scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters); @@ -701,6 +702,10 @@ public final class IndexSettings { return syncInterval; } + public void setTranslogSyncInterval(TimeValue translogSyncInterval) { + this.syncInterval = translogSyncInterval; + } + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index e5e554818c0..2d4030a51ce 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.index; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; @@ -320,9 +321,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase { assertTrue(indexService.getRefreshTask().mustReschedule()); client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); IndexShard shard = indexService.getShard(0); - assertBusy(() -> { - assertFalse(shard.isSyncNeeded()); - }); + assertBusy(() -> assertFalse(shard.isSyncNeeded())); } public void testRescheduleAsyncFsync() throws Exception { @@ -394,4 +393,39 @@ public class IndexServiceTests extends ESSingleNodeTestCase { assertEquals("failed to parse value [0ms] for setting [index.translog.sync_interval], must be >= [100ms]", ex.getMessage()); } } + + public void testUpdateSyncIntervalDynamically() { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s") // very often :) + .build(); + IndexService indexService = createIndex("test", settings); + ensureGreen("test"); + assertNull(indexService.getFsyncTask()); + + Settings.Builder builder = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "5s") + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name()); + + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(builder) + .get(); + + assertNotNull(indexService.getFsyncTask()); + assertTrue(indexService.getFsyncTask().mustReschedule()); + + IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test"); + assertEquals("5s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey())); + + client().admin().indices().prepareClose("test").get(); + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "20s")) + .get(); + indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test"); + assertEquals("20s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey())); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 1d595e7c95b..cf35387c6ec 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -402,6 +402,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction