Use Merge Throtteling by default on node level.

Merge Throtteling is one of the most recommended settings and crucial in the
RealTime indexing case. We should set the default to a reasonable setting
that allows folks to index in a production index and don't see large merge
peaks by default. The default is set to 20 MB/sec on the node level.

Closes #3033
This commit is contained in:
Simon Willnauer 2013-05-14 10:48:58 +02:00
parent 09fb2264d0
commit 6624949501
2 changed files with 22 additions and 22 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.service.IndexService;
@ -91,10 +92,10 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
this.nodeSettingsService = nodeSettingsService;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.rateLimitingType = componentSettings.get("throttle.type", "none");
// we limit with 20MB / sec by default with a default type set to merge sice 0.90.1
this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name());
rateLimiting.setType(rateLimitingType);
this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(0));
this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
rateLimiting.setMaxRate(rateLimitingThrottle);
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);

View File

@ -74,42 +74,42 @@ public class SimpleDistributorTests extends AbstractNodesTests {
createIndexWithStoreType("node1", "test", "niofs", "least_used");
String storeString = getStoreDirectory("node1", "test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(least_used[niofs(" + dataPath1 ));
assertThat(storeString, containsString("), niofs(" + dataPath2));
assertThat(storeString, endsWith(")])"));
assertThat(storeString, startsWith("store(least_used[rate_limited(niofs(" + dataPath1 ));
assertThat(storeString, containsString("), rate_limited(niofs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("node1", "test", "niofs", "random");
storeString = getStoreDirectory("node1", "test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(random[niofs(" + dataPath1 ));
assertThat(storeString, containsString("), niofs(" + dataPath2));
assertThat(storeString, endsWith(")])"));
assertThat(storeString, startsWith("store(random[rate_limited(niofs(" + dataPath1 ));
assertThat(storeString, containsString("), rate_limited(niofs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("node1", "test", "mmapfs", "least_used");
storeString = getStoreDirectory("node1", "test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(least_used[mmapfs(" + dataPath1));
assertThat(storeString, containsString("), mmapfs(" + dataPath2));
assertThat(storeString, endsWith(")])"));
assertThat(storeString, startsWith("store(least_used[rate_limited(mmapfs(" + dataPath1));
assertThat(storeString, containsString("), rate_limited(mmapfs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("node1", "test", "simplefs", "least_used");
storeString = getStoreDirectory("node1", "test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(least_used[simplefs(" + dataPath1));
assertThat(storeString, containsString("), simplefs(" + dataPath2));
assertThat(storeString, endsWith(")])"));
assertThat(storeString, startsWith("store(least_used[rate_limited(simplefs(" + dataPath1));
assertThat(storeString, containsString("), rate_limited(simplefs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("node1", "test", "memory", "least_used");
storeString = getStoreDirectory("node1", "test", 0).toString();
logger.info(storeString);
assertThat(storeString, equalTo("store(least_used[byte_buffer])"));
createIndexWithRateLimitingStoreType("node1", "test", "niofs", "least_used", "5mb");
createIndexWithoutRateLimitingStoreType("node1", "test", "niofs", "least_used");
storeString = getStoreDirectory("node1", "test", 0).toString();
logger.info(storeString);
assertThat(storeString, startsWith("store(least_used[rate_limited(niofs(" + dataPath1 ));
assertThat(storeString, containsString("), rate_limited(niofs(" + dataPath2));
assertThat(storeString, endsWith(", type=MERGE, rate=5.0)])"));
assertThat(storeString, startsWith("store(least_used[niofs(" + dataPath1 ));
assertThat(storeString, containsString("), niofs(" + dataPath2));
assertThat(storeString, endsWith(")])"));
}
private void createIndexWithStoreType(String nodeId, String index, String storeType, String distributor) {
@ -130,7 +130,7 @@ public class SimpleDistributorTests extends AbstractNodesTests {
.setTimeout(TimeValue.timeValueSeconds(5)).execute().actionGet().isTimedOut(), equalTo(false));
}
private void createIndexWithRateLimitingStoreType(String nodeId, String index, String storeType, String distributor, String limit) {
private void createIndexWithoutRateLimitingStoreType(String nodeId, String index, String storeType, String distributor) {
try {
client(nodeId).admin().indices().prepareDelete(index).execute().actionGet();
} catch (IndexMissingException ex) {
@ -140,8 +140,7 @@ public class SimpleDistributorTests extends AbstractNodesTests {
.setSettings(settingsBuilder()
.put("index.store.distributor", distributor)
.put("index.store.type", storeType)
.put("index.store.throttle.type", "merge")
.put("index.store.throttle.max_bytes_per_sec", limit)
.put("index.store.throttle.type", "none")
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1)
)