Merge pull request #16257 from s1monw/no_fsync_on_every_operation

Remove the ability to fsync on every operation and only schedule fsync task if really needed
This commit is contained in:
Simon Willnauer 2016-01-27 15:48:30 +01:00
commit 1df7d4d5b5
9 changed files with 67 additions and 31 deletions

View File

@ -108,7 +108,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final IndexingSlowLog slowLog; private final IndexingSlowLog slowLog;
private final IndexingOperationListener[] listeners; private final IndexingOperationListener[] listeners;
private volatile AsyncRefreshTask refreshTask; private volatile AsyncRefreshTask refreshTask;
private final AsyncTranslogFSync fsyncTask; private volatile AsyncTranslogFSync fsyncTask;
private final SearchSlowLog searchSlowLog; private final SearchSlowLog searchSlowLog;
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
@ -147,13 +147,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.listeners[0] = slowLog; this.listeners[0] = slowLog;
System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length); System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
// kick off async ops for the first shard in this index // kick off async ops for the first shard in this index
if (this.indexSettings.getTranslogSyncInterval().millis() != 0) {
this.fsyncTask = new AsyncTranslogFSync(this);
} else {
this.fsyncTask = null;
}
this.refreshTask = new AsyncRefreshTask(this); this.refreshTask = new AsyncRefreshTask(this);
searchSlowLog = new SearchSlowLog(indexSettings); searchSlowLog = new SearchSlowLog(indexSettings);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
} }
public int numberOfShards() { public int numberOfShards() {
@ -565,6 +561,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
} }
public synchronized void updateMetaData(final IndexMetaData metadata) { public synchronized void updateMetaData(final IndexMetaData metadata) {
final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability();
if (indexSettings.updateIndexMetaData(metadata)) { if (indexSettings.updateIndexMetaData(metadata)) {
for (final IndexShard shard : this.shards.values()) { for (final IndexShard shard : this.shards.values()) {
try { try {
@ -576,6 +573,20 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
rescheduleRefreshTasks(); rescheduleRefreshTasks();
} }
final Translog.Durability durability = indexSettings.getTranslogDurability();
if (durability != oldTranslogDurability) {
rescheduleFsyncTask(durability);
}
}
}
private void rescheduleFsyncTask(Translog.Durability durability) {
try {
if (fsyncTask != null) {
fsyncTask.close();
}
} finally {
fsyncTask = durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this);
} }
} }

View File

@ -55,7 +55,7 @@ public final class IndexSettings {
public static final Setting<Boolean> QUERY_STRING_ANALYZE_WILDCARD = Setting.boolSetting("indices.query.query_string.analyze_wildcard", false, false, Setting.Scope.CLUSTER); public static final Setting<Boolean> QUERY_STRING_ANALYZE_WILDCARD = Setting.boolSetting("indices.query.query_string.analyze_wildcard", false, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> QUERY_STRING_ALLOW_LEADING_WILDCARD = Setting.boolSetting("indices.query.query_string.allowLeadingWildcard", true, false, Setting.Scope.CLUSTER); public static final Setting<Boolean> QUERY_STRING_ALLOW_LEADING_WILDCARD = Setting.boolSetting("indices.query.query_string.allowLeadingWildcard", true, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> ALLOW_UNMAPPED = Setting.boolSetting("index.query.parse.allow_unmapped_fields", true, false, Setting.Scope.INDEX); public static final Setting<Boolean> ALLOW_UNMAPPED = Setting.boolSetting("index.query.parse.allow_unmapped_fields", true, false, Setting.Scope.INDEX);
public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), false, Setting.Scope.INDEX); public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), false, Setting.Scope.INDEX);
public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING = new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(), (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), true, Setting.Scope.INDEX); public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING = new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(), (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), true, Setting.Scope.INDEX);
public static final Setting<Boolean> INDEX_WARMER_ENABLED_SETTING = Setting.boolSetting("index.warmer.enabled", true, true, Setting.Scope.INDEX); public static final Setting<Boolean> INDEX_WARMER_ENABLED_SETTING = Setting.boolSetting("index.warmer.enabled", true, true, Setting.Scope.INDEX);
public static final Setting<Boolean> INDEX_TTL_DISABLE_PURGE_SETTING = Setting.boolSetting("index.ttl.disable_purge", false, true, Setting.Scope.INDEX); public static final Setting<Boolean> INDEX_TTL_DISABLE_PURGE_SETTING = Setting.boolSetting("index.ttl.disable_purge", false, true, Setting.Scope.INDEX);

View File

@ -429,9 +429,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try (ReleasableLock lock = readLock.acquire()) { try (ReleasableLock lock = readLock.acquire()) {
ensureOpen(); ensureOpen();
Location location = current.add(bytes); Location location = current.add(bytes);
if (config.isSyncOnEachOperation()) {
current.sync();
}
assert assertBytesAtLocation(location, bytes); assert assertBytesAtLocation(location, bytes);
return location; return location;
} }

View File

@ -65,13 +65,6 @@ public final class TranslogConfig {
this.bigArrays = bigArrays; this.bigArrays = bigArrays;
} }
/**
* Returns <code>true</code> iff each low level operation shoudl be fsynced
*/
public boolean isSyncOnEachOperation() {
return indexSettings.getTranslogSyncInterval().millis() == 0;
}
/** /**
* Returns the index indexSettings * Returns the index indexSettings
*/ */

View File

@ -39,6 +39,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -264,7 +265,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
} }
public void testFsyncTaskIsRunning() throws IOException { public void testFsyncTaskIsRunning() throws IOException {
IndexService indexService = createIndex("test", Settings.EMPTY); IndexService indexService = createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC).build());
IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask(); IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask();
assertNotNull(fsyncTask); assertNotNull(fsyncTask);
assertEquals(5000, fsyncTask.getInterval().millis()); assertEquals(5000, fsyncTask.getInterval().millis());
@ -274,6 +275,9 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
indexService.close("simon says", false); indexService.close("simon says", false);
assertFalse(fsyncTask.isScheduled()); assertFalse(fsyncTask.isScheduled());
assertTrue(fsyncTask.isClosed()); assertTrue(fsyncTask.isClosed());
indexService = createIndex("test1", Settings.EMPTY);
assertNull(indexService.getFsyncTask());
} }
public void testRefreshActuallyWorks() throws Exception { public void testRefreshActuallyWorks() throws Exception {
@ -307,7 +311,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
public void testAsyncFsyncActuallyWorks() throws Exception { public void testAsyncFsyncActuallyWorks() throws Exception {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10ms") // very often :) .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "100ms") // very often :)
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC) .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)
.build(); .build();
IndexService indexService = createIndex("test", settings); IndexService indexService = createIndex("test", settings);
@ -320,11 +324,43 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
}); });
} }
public void testNoFsyncTaskIfDisabled() { public void testRescheduleAsyncFsync() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "100ms") // very often :)
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.build();
IndexService indexService = createIndex("test", settings);
ensureGreen("test");
assertNull(indexService.getFsyncTask());
IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)).build();
indexService.updateMetaData(metaData);
assertNotNull(indexService.getFsyncTask());
assertTrue(indexService.getRefreshTask().mustReschedule());
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get();
IndexShard shard = indexService.getShard(0);
assertBusy(() -> {
assertFalse(shard.getTranslog().syncNeeded());
});
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)).build();
indexService.updateMetaData(metaData);
assertNull(indexService.getFsyncTask());
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)).build();
indexService.updateMetaData(metaData);
assertNotNull(indexService.getFsyncTask());
}
public void testIllegalFsyncInterval() {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable
.build(); .build();
IndexService indexService = createIndex("test", settings); try {
assertNull(indexService.getFsyncTask()); createIndex("test", settings);
fail();
} catch (IllegalArgumentException ex) {
assertEquals("Failed to parse value [0ms] for setting [index.translog.sync_interval] must be >= 100ms", ex.getMessage());
}
} }
} }

View File

@ -1393,7 +1393,6 @@ public class TranslogTests extends ESTestCase {
Path tempDir = createTempDir(); Path tempDir = createTempDir();
final FailSwitch fail = new FailSwitch(); final FailSwitch fail = new FailSwitch();
TranslogConfig config = getTranslogConfig(tempDir); TranslogConfig config = getTranslogConfig(tempDir);
assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation());
Translog translog = getFailableTranslog(fail, config, false, true); Translog translog = getFailableTranslog(fail, config, false, true);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));

View File

@ -37,8 +37,8 @@ The data in the transaction log is only persisted to disk when the translog is
++fsync++ed and committed. In the event of hardware failure, any data written ++fsync++ed and committed. In the event of hardware failure, any data written
since the previous translog commit will be lost. since the previous translog commit will be lost.
By default, Elasticsearch ++fsync++s and commits the translog every 5 seconds By default, Elasticsearch ++fsync++s and commits the translog every 5 seconds if `index.translog.durability` is set
and at the end of every <<docs-index_,index>>, <<docs-delete,delete>>, to `async` or if set to `request` (default) at the end of every <<docs-index_,index>>, <<docs-delete,delete>>,
<<docs-update,update>>, or <<docs-bulk,bulk>> request. In fact, Elasticsearch <<docs-update,update>>, or <<docs-bulk,bulk>> request. In fact, Elasticsearch
will only report success of an index, delete, update, or bulk request to the will only report success of an index, delete, update, or bulk request to the
client after the transaction log has been successfully ++fsync++ed and committed client after the transaction log has been successfully ++fsync++ed and committed
@ -50,7 +50,7 @@ control the behaviour of the transaction log:
`index.translog.sync_interval`:: `index.translog.sync_interval`::
How often the translog is ++fsync++ed to disk and committed, regardless of How often the translog is ++fsync++ed to disk and committed, regardless of
write operations. Defaults to `5s`. write operations. Defaults to `5s`. Values less than `100ms` are not allowed.
`index.translog.durability`:: `index.translog.durability`::
+ +

View File

@ -233,6 +233,10 @@ The `index.translog.flush_threshold_ops` setting is not supported anymore. In or
growth use `index.translog.flush_threshold_size` instead. Changing the translog type with `index.translog.fs.type` is not supported growth use `index.translog.flush_threshold_size` instead. Changing the translog type with `index.translog.fs.type` is not supported
anymore, the `buffered` implementation is now the only available option and uses a fixed `8kb` buffer. anymore, the `buffered` implementation is now the only available option and uses a fixed `8kb` buffer.
The translog by default is fsynced on a request basis such that the ability to fsync on every operation is not necessary anymore. In-fact it can
be a performance bottleneck and it's trappy since it enabled by a special value set on `index.translog.sync_interval`. `index.translog.sync_interval`
now doesn't accept a value less than `100ms` which prevents fsyncing too often if async durability is enabled. The special value `0` is not supported anymore.
==== Request Cache Settings ==== Request Cache Settings
The deprecated settings `index.cache.query.enable` and `indices.cache.query.size` have been removed and are replaced with The deprecated settings `index.cache.query.enable` and `indices.cache.query.size` have been removed and are replaced with

View File

@ -527,12 +527,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
} }
if (random.nextBoolean()) { if (random.nextBoolean()) {
if (rarely(random)) {
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), 0); // 0 has special meaning to sync each op
} else {
builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS); builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
} }
}
return builder; return builder;
} }