Merge pull request #15573 from s1monw/drop_settings_01

Simplify translog-based flush settings
This commit is contained in:
Simon Willnauer 2015-12-21 15:45:56 +01:00
commit b56e19a00c
11 changed files with 49 additions and 65 deletions

View File

@ -182,9 +182,7 @@ public class ClusterModule extends AbstractModule {
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, Validator.EMPTY);
registerIndexDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);

View File

@ -188,18 +188,14 @@ public class IndexShard extends AbstractIndexShardComponent {
private final ShardEventListener shardEventListener = new ShardEventListener();
private volatile boolean flushOnClose = true;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
private volatile boolean disableFlush;
/**
* Index setting to control if a flush is executed before engine is closed
* This setting is realtime updateable.
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
public static final String INDEX_SHARD_INACTIVE_TIME_SETTING = "index.shard.inactive_time";
private static final String INDICES_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
@ -270,9 +266,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, settings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE));
this.flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
this.provider = provider;
this.searcherWrapper = indexSearcherWrapper;
@ -1136,17 +1130,15 @@ public class IndexShard extends AbstractIndexShardComponent {
* Otherwise <code>false</code>.
*/
boolean shouldFlush() {
if (disableFlush == false) {
Engine engine = getEngineOrNull();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.totalOperations() > flushThresholdOperations || translog.sizeInBytes() > flushThresholdSize.bytes();
return translog.sizeInBytes() > flushThresholdSize.bytes();
} catch (AlreadyClosedException | EngineClosedException ex) {
// that's fine we are already close - no need to flush
}
}
}
return false;
}
@ -1156,21 +1148,11 @@ public class IndexShard extends AbstractIndexShardComponent {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.flushThresholdOperations);
if (flushThresholdOperations != this.flushThresholdOperations) {
logger.info("updating flush_threshold_ops from [{}] to [{}]", this.flushThresholdOperations, flushThresholdOperations);
this.flushThresholdOperations = flushThresholdOperations;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize);
if (!flushThresholdSize.equals(this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize);
this.flushThresholdSize = flushThresholdSize;
}
boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, this.disableFlush);
if (disableFlush != this.disableFlush) {
logger.info("updating disable_flush from [{}] to [{}]", this.disableFlush, disableFlush);
this.disableFlush = disableFlush;
}
final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose);

View File

@ -874,7 +874,7 @@ public class GetActionIT extends ESIntegTestCase {
public void testUngeneratedFieldsThatAreNeverStored() throws IOException {
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\"\n" +
" },\n" +
" \"mappings\": {\n" +
@ -913,7 +913,7 @@ public class GetActionIT extends ESIntegTestCase {
public void testUngeneratedFieldsThatAreAlwaysStored() throws IOException {
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\"\n" +
" },\n" +
" \"mappings\": {\n" +
@ -983,7 +983,7 @@ public class GetActionIT extends ESIntegTestCase {
String storedString = stored ? "yes" : "no";
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\",\n" +
" \"" + IndexMetaData.SETTING_VERSION_CREATED + "\": " + Version.V_1_4_2.id + "\n" +
" },\n" +
@ -1008,7 +1008,7 @@ public class GetActionIT extends ESIntegTestCase {
public void testUngeneratedFieldsNotPartOfSourceStored() throws IOException {
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\"\n" +
" },\n" +
" \"mappings\": {\n" +
@ -1074,7 +1074,7 @@ public class GetActionIT extends ESIntegTestCase {
String storedString = stored ? "yes" : "no";
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\",\n" +
" \"" + IndexMetaData.SETTING_VERSION_CREATED + "\": " + Version.V_1_4_2.id + "\n" +
" },\n" +
@ -1126,7 +1126,7 @@ public class GetActionIT extends ESIntegTestCase {
String storedString = stored ? "yes" : "no";
String createIndexSource = "{\n" +
" \"settings\": {\n" +
" \"index.translog.disable_flush\": true,\n" +
" \"index.translog.flush_threshold_size\": \"1pb\",\n" +
" \"refresh_interval\": \"-1\",\n" +
" \"" + IndexMetaData.SETTING_VERSION_CREATED + "\": " + Version.V_1_4_2.id + "\n" +
" },\n" +

View File

@ -34,6 +34,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.index.translog.TranslogStats;
@ -179,7 +181,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true)
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB))
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)

View File

@ -697,7 +697,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexService test = indicesService.indexService("test");
IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1).build()).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
assertFalse(shard.shouldFlush());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
@ -713,8 +713,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
shard.getEngine().getTranslog().sync();
long size = shard.getEngine().getTranslog().sizeInBytes();
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1000)
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES))
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES))
.build()).get();
client().prepareDelete("test", "test", "2").get();
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
@ -732,7 +731,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, 1).build()).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
assertFalse(shard.shouldFlush());
final AtomicBoolean running = new AtomicBoolean(true);

View File

@ -47,6 +47,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -142,7 +143,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -247,7 +248,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -473,7 +474,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -528,7 +529,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();

View File

@ -29,6 +29,9 @@ import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.monitor.fs.FsInfo;
@ -167,4 +170,16 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
}
assertThat("no file corrupted", fileToCorrupt, notNullValue());
}
/** Disables translog flushing for the specified index */
private static void disableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Enables translog flushing for the specified index */
private static void enableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
}

View File

@ -28,7 +28,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -144,14 +147,13 @@ public class FlushIT extends ESIntegTestCase {
}
}
@TestLogging("indices:TRACE")
public void testSyncedFlushWithConcurrentIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
createIndex("test");
client().admin().indices().prepareUpdateSettings("test").setSettings(
Settings.builder().put("index.translog.disable_flush", true).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
.get();
ensureGreen();
final AtomicBoolean stop = new AtomicBoolean(false);

View File

@ -20,7 +20,6 @@ replaying its operations take a considerable amount of time during recovery.
It is also exposed through an API, though its rarely needed to be performed
manually.
[float]
=== Flush settings
@ -31,10 +30,6 @@ control how often the in-memory buffer is flushed to disk:
Once the translog hits this size, a flush will happen. Defaults to `512mb`.
`index.translog.flush_threshold_ops`::
After how many operations to flush. Defaults to `unlimited`.
[float]
=== Translog settings

View File

@ -200,6 +200,11 @@ If you are using any of these settings please take the time and review their pur
_expert settings_ and should only be used if absolutely necessary. If you have set any of the above setting as persistent
cluster settings please use the settings update API and set their superseded keys accordingly.
==== Translog settings
The `index.translog.flush_threshold_ops` setting is not supported anymore. In order to control flushes based on the transaction log
growth use `index.translog.flush_threshold_size` instead.
[[breaking_30_mapping_changes]]
=== Mapping changes

View File

@ -511,14 +511,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
private static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) {
if (random.nextBoolean()) {
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, RandomInts.randomIntBetween(random, 1, 10000));
}
if (random.nextBoolean()) {
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
}
if (random.nextBoolean()) {
builder.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, random.nextBoolean());
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
}
if (random.nextBoolean()) {
builder.put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durabilty.values()));
@ -1452,18 +1449,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
private AtomicInteger dummmyDocIdGenerator = new AtomicInteger();
/** Disables translog flushing for the specified index */
public static void disableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Enables translog flushing for the specified index */
public static void enableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, false).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Disables an index block for the specified index */
public static void disableIndexBlock(String index, String block) {
Settings settings = Settings.builder().put(block, false).build();