Simplify translog-based flush settings

This commit removes `index.translog.flush_threshold_ops` and `index.translog.disable_flush`
in favor of `index.translog.flush_threshold_size`. The number of operations is meaningless by itself and
can easily be turned into a size value with knowledge of the data. Disabling the flush is only useful in
tests and we can set the size value to a really high value. If users really need to do this they can
also apply a very high value like `1PB`.
This commit is contained in:
Simon Willnauer 2015-12-21 15:15:00 +01:00
parent eb64a81d05
commit afc1cc19af
10 changed files with 44 additions and 65 deletions

View File

@ -182,9 +182,7 @@ public class ClusterModule extends AbstractModule {
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, Validator.EMPTY);
registerIndexDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);

View File

@ -188,18 +188,14 @@ public class IndexShard extends AbstractIndexShardComponent {
private final ShardEventListener shardEventListener = new ShardEventListener();
private volatile boolean flushOnClose = true;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
private volatile boolean disableFlush;
/**
* Index setting to control if a flush is executed before engine is closed
* This setting is realtime updateable.
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
public static final String INDEX_SHARD_INACTIVE_TIME_SETTING = "index.shard.inactive_time";
private static final String INDICES_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
@ -270,9 +266,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, settings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE));
this.flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
this.provider = provider;
this.searcherWrapper = indexSearcherWrapper;
@ -1136,15 +1130,13 @@ public class IndexShard extends AbstractIndexShardComponent {
* Otherwise <code>false</code>.
*/
boolean shouldFlush() {
if (disableFlush == false) {
Engine engine = getEngineOrNull();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.totalOperations() > flushThresholdOperations || translog.sizeInBytes() > flushThresholdSize.bytes();
} catch (AlreadyClosedException | EngineClosedException ex) {
// that's fine we are already close - no need to flush
}
Engine engine = getEngineOrNull();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.sizeInBytes() > flushThresholdSize.bytes();
} catch (AlreadyClosedException | EngineClosedException ex) {
// that's fine we are already close - no need to flush
}
}
return false;
@ -1156,21 +1148,11 @@ public class IndexShard extends AbstractIndexShardComponent {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.flushThresholdOperations);
if (flushThresholdOperations != this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", this.flushThresholdOperations, flushThresholdOperations);
this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize);
if (!flushThresholdSize.equals(this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize);
this.flushThresholdSize = flushThresholdSize;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, this.disableFlush);
if (disableFlush != this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", this.disableFlush, disableFlush);
this.disableFlush = disableFlush;
}
final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose);

View File

@ -874,7 +874,7 @@ public class GetActionIT extends ESIntegTestCase {
public void testUngeneratedFieldsThatAreNeverStored() throws IOException {
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\"\n" +
" },\n" +
" \"mappings\": {\n" +
@ -913,7 +913,7 @@ public class GetActionIT extends ESIntegTestCase {
public void testUngeneratedFieldsThatAreAlwaysStored() throws IOException {
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\"\n" +
" },\n" +
" \"mappings\": {\n" +
@ -983,7 +983,7 @@ public class GetActionIT extends ESIntegTestCase {
String storedString = stored ? "yes" : "no";
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\",\n" +
" \"" + IndexMetaData.SETTING_VERSION_CREATED + "\": " + Version.V_1_4_2.id + "\n" +
" },\n" +
@ -1008,7 +1008,7 @@ public class GetActionIT extends ESIntegTestCase {
public void testUngeneratedFieldsNotPartOfSourceStored() throws IOException {
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\"\n" +
" },\n" +
" \"mappings\": {\n" +
@ -1074,7 +1074,7 @@ public class GetActionIT extends ESIntegTestCase {
String storedString = stored ? "yes" : "no";
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\",\n" +
" \"" + IndexMetaData.SETTING_VERSION_CREATED + "\": " + Version.V_1_4_2.id + "\n" +
" },\n" +
@ -1126,7 +1126,7 @@ public class GetActionIT extends ESIntegTestCase {
String storedString = stored ? "yes" : "no";
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\",\n" +
" \"" + IndexMetaData.SETTING_VERSION_CREATED + "\": " + Version.V_1_4_2.id + "\n" +
" },\n" +

View File

@ -34,6 +34,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.index.translog.TranslogStats;
@ -179,7 +181,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true)
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB))
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)

View File

@ -697,7 +697,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexService test = indicesService.indexService("test");
IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1).build()).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
assertFalse(shard.shouldFlush());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
@ -713,8 +713,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
shard.getEngine().getTranslog().sync();
long size = shard.getEngine().getTranslog().sizeInBytes();
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1000)
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES))
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES))
.build()).get();
client().prepareDelete("test", "test", "2").get();
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
@ -732,7 +731,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1).build()).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
assertFalse(shard.shouldFlush());
final AtomicBoolean running = new AtomicBoolean(true);

View File

@ -47,6 +47,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -142,7 +143,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -247,7 +248,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -473,7 +474,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -528,7 +529,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();

View File

@ -29,6 +29,9 @@ import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.monitor.fs.FsInfo;
@ -167,4 +170,16 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
}
assertThat("no file corrupted", fileToCorrupt, notNullValue());
}
/** Disables translog flushing for the specified index */
private static void disableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Enables translog flushing for the specified index */
private static void enableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
}

View File

@ -28,7 +28,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -144,14 +147,13 @@ public class FlushIT extends ESIntegTestCase {
}
}
@TestLogging("indices:TRACE")
public void testSyncedFlushWithConcurrentIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
createIndex("test");
client().admin().indices().prepareUpdateSettings("test").setSettings(
Settings.builder().put("index.translog.disable_flush", true).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
.get();
ensureGreen();
final AtomicBoolean stop = new AtomicBoolean(false);

View File

@ -20,7 +20,6 @@ replaying its operations take a considerable amount of time during recovery.
It is also exposed through an API, though its rarely needed to be performed
manually.
[float]
=== Flush settings
@ -31,10 +30,6 @@ control how often the in-memory buffer is flushed to disk:
Once the translog hits this size, a flush will happen. Defaults to `512mb`.
`index.translog.flush_threshold_ops`::
After how many operations to flush. Defaults to `unlimited`.
[float]
=== Translog settings

View File

@ -511,14 +511,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
private static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) {
if (random.nextBoolean()) {
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, RandomInts.randomIntBetween(random, 1, 10000));
}
if (random.nextBoolean()) {
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
}
if (random.nextBoolean()) {
builder.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, random.nextBoolean());
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
}
if (random.nextBoolean()) {
builder.put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durabilty.values()));
@ -1452,18 +1449,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
private AtomicInteger dummmyDocIdGenerator = new AtomicInteger();
/** Disables translog flushing for the specified index */
public static void disableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Enables translog flushing for the specified index */
public static void enableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, false).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Disables an index block for the specified index */
public static void disableIndexBlock(String index, String block) {
Settings settings = Settings.builder().put(block, false).build();