Engine: update index buffer size during recovery and allow configuring version map size.

To support real time gets, the engine keeps an in-memory map of recently index docs and their location in the translog. This is needed until documents become visible in Lucene. With 1.3.0, we have improved this map and made tightly integrated with refresh cycles in Lucene in order to keep the memory signature to a bare minimum. On top of that, if the version map grows above a 25% of the index buffer size, we proactively refresh in order to be able to trim the version map back to 0 (see #6363) . In the same release, we have fixed an issue where an update to the indexing buffer could result in an unwanted exception during recovery (#6667) . We solved this by waiting with updating the size of the index buffer until the shard was fully recovered. Sadly this two together can have a negative impact on the speed of translog recovery.

During the second phase of recovery we replay all operations that happened on the shard during the first phase of copying files. In parallel we start indexing new documents into the new created shard. At some point (phase 3 in the recovery), the translog replay starts to send operation which have already been indexed into the shard. The version map is crucial in being able to quickly detect this and skip the replayed operations, without hitting lucene. Sadly #6667 (only updating the index memory buffer once shard is started) means that a shard is using the default 64MB for it's index buffer, and thus only 16MB (25%) for the version map. This much less then the default index buffer size 10% of machine memory (shared between shards).

Since we don't flush anymore when updating the memory buffer, we can remove #6667 and update recovering shards as well. Also, we make the version map max size configurable, with the same default of 25% of the current index buffer.

Closes #10046
This commit is contained in:
Boaz Leskes 2015-03-09 21:13:09 -07:00
parent a596459ed6
commit 97559c0614
9 changed files with 183 additions and 17 deletions

View File

@ -205,6 +205,44 @@ public interface Validator {
}
};
public static final Validator PERCENTAGE = new Validator() {
@Override
public String validate(String setting, String value) {
try {
if (value == null) {
return "the value of " + setting + " can not be null";
}
if (!value.endsWith("%")) {
return "the value [" + value + "] for " + setting + " must end with %";
}
final double asDouble = Double.parseDouble(value.substring(0, value.length() - 1));
if (asDouble < 0.0 || asDouble > 100.0) {
return "the value [" + value + "] for " + setting + " must be a percentage between 0% and 100%";
}
} catch (NumberFormatException ex) {
return ex.getMessage();
}
return null;
}
};
public static final Validator BYTES_SIZE_OR_PERCENTAGE = new Validator() {
@Override
public String validate(String setting, String value) {
String byteSize = BYTES_SIZE.validate(setting, value);
if (byteSize != null) {
String percentage = PERCENTAGE.validate(setting, value);
if (percentage == null) {
return null;
}
return percentage + " or be a valid bytes size value, like [16mb]";
}
return null;
}
};
public static final Validator MEMORY_SIZE = new Validator() {
@Override
public String validate(String setting, String value) {

View File

@ -23,7 +23,6 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -51,6 +50,8 @@ import java.util.concurrent.TimeUnit;
public final class EngineConfig {
private final ShardId shardId;
private volatile ByteSizeValue indexingBufferSize;
private volatile ByteSizeValue versionMapSize;
private volatile String versionMapSizeSetting;
private final int indexConcurrency;
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
@ -109,11 +110,25 @@ public final class EngineConfig {
*/
public static final String INDEX_CODEC_SETTING = "index.codec";
/**
* Index setting to enable / disable checksum checks on merge
* This setting is realtime updateable.
*/
public static final String INDEX_CHECKSUM_ON_MERGE = "index.checksum_on_merge";
/**
* The maximum size the version map should grow to before issuing a refresh. Can be an absolute value or a percentage of
* the current index memory buffer (defaults to 25%)
*/
public static final String INDEX_VERSION_MAP_SIZE = "index.version_map_size";
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final ByteSizeValue DEFAUTL_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb");
public static final String DEFAULT_VERSION_MAP_SIZE = "25%";
private static final String DEFAULT_CODEC_NAME = "default";
@ -142,6 +157,41 @@ public final class EngineConfig {
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
indexingBufferSize = indexSettings.getAsBytesSize(INDEX_BUFFER_SIZE_SETTING, DEFAUTL_INDEX_BUFFER_SIZE);
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
updateVersionMapSize();
}
/** updates {@link #versionMapSize} based on current setting and {@link #indexingBufferSize} */
private void updateVersionMapSize() {
if (versionMapSizeSetting.endsWith("%")) {
double percent = Double.parseDouble(versionMapSizeSetting.substring(0, versionMapSizeSetting.length() - 1));
versionMapSize = new ByteSizeValue((long) (((double) indexingBufferSize.bytes() * (percent / 100))));
} else {
versionMapSize = ByteSizeValue.parseBytesSizeValue(versionMapSizeSetting);
}
}
/**
* Settings the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
*/
public void setVersionMapSizeSetting(String versionMapSizeSetting) {
this.versionMapSizeSetting = versionMapSizeSetting;
updateVersionMapSize();
}
/**
* current setting for the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
*/
public String getVersionMapSizeSetting() {
return versionMapSizeSetting;
}
/**
* returns the size of the version map that should trigger a refresh
*/
public ByteSizeValue getVersionMapSize() {
return versionMapSize;
}
/**
@ -149,6 +199,7 @@ public final class EngineConfig {
*/
public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) {
this.indexingBufferSize = indexingBufferSize;
updateVersionMapSize();
}
/**

View File

@ -357,11 +357,10 @@ public class InternalEngine extends Engine {
}
/**
* Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer).
* Forces a refresh if the versionMap is using too much RAM
*/
private void checkVersionMapRefresh() {
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
if (versionMap.ramBytesUsedForRefresh() > 0.25 * engineConfig.getIndexingBufferSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
if (versionMap.ramBytesUsedForRefresh() > config().getVersionMapSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
try {
if (isClosed.get()) {
// no point...

View File

@ -39,9 +39,9 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.indices.IndicesWarmer;
/**
*/
@ -86,6 +86,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_VERSION_MAP_SIZE, Validator.BYTES_SIZE_OR_PERCENTAGE);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);

View File

@ -46,7 +46,6 @@ import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
@ -960,7 +959,8 @@ public class IndexShard extends AbstractIndexShardComponent {
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
ByteSizeValue preValue = config.getIndexingBufferSize();
config.setIndexingBufferSize(shardIndexingBufferSize);
if (preValue.bytes() != shardIndexingBufferSize.bytes()) {
// update engine if it is already started.
if (preValue.bytes() != shardIndexingBufferSize.bytes() && engineUnsafe() != null) {
// its inactive, make sure we do a refresh / full IW flush in this case, since the memory
// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
@ -1029,6 +1029,10 @@ public class IndexShard extends AbstractIndexShardComponent {
config.setCompoundOnFlush(compoundOnFlush);
change = true;
}
final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting());
if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
config.setVersionMapSizeSetting(versionMapSize);
}
}
if (change) {
refresh("apply settings");
@ -1162,13 +1166,17 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public Engine engine() {
Engine engine = this.currentEngineReference.get();
Engine engine = engineUnsafe();
if (engine == null) {
throw new EngineClosedException(shardId);
}
return engine;
}
protected Engine engineUnsafe() {
return this.currentEngineReference.get();
}
class ShardEngineFailListener implements Engine.FailedEngineListener {
private final CopyOnWriteArrayList<Engine.FailedEngineListener> delegates = new CopyOnWriteArrayList<>();

View File

@ -28,13 +28,13 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo;
@ -64,7 +64,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private volatile ScheduledFuture scheduler;
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
@Inject
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {

View File

@ -161,6 +161,7 @@ public class ReplicaRecoveryBenchmark {
long totalRecoveryTime = 0;
long startTime = System.currentTimeMillis();
long[] recoveryTimes = new long[3];
for (int iteration = 0; iteration < 3; iteration++) {
logger.info("--> removing replicas");
client1.admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS + ": 0").get();
@ -170,7 +171,9 @@ public class ReplicaRecoveryBenchmark {
client1.admin().cluster().prepareHealth(INDEX_NAME).setWaitForGreenStatus().setTimeout("15m").get();
long recoveryTime = System.currentTimeMillis() - recoveryStart;
totalRecoveryTime += recoveryTime;
recoveryTimes[iteration] = recoveryTime;
logger.info("--> recovery done in [{}]", new TimeValue(recoveryTime));
// sleep some to let things clean up
Thread.sleep(10000);
}
@ -185,7 +188,9 @@ public class ReplicaRecoveryBenchmark {
backgroundLogger.join();
logger.info("average doc/s [{}], average relocation time [{}]", (endDocIndexed - startDocIndexed) * 1000.0 / totalTime, new TimeValue(totalRecoveryTime / 3));
logger.info("average doc/s [{}], average relocation time [{}], taking [{}], [{}], [{}]", (endDocIndexed - startDocIndexed) * 1000.0 / totalTime, new TimeValue(totalRecoveryTime / 3),
TimeValue.timeValueMillis(recoveryTimes[0]), TimeValue.timeValueMillis(recoveryTimes[1]), TimeValue.timeValueMillis(recoveryTimes[2])
);
client1.close();
node1.close();

View File

@ -22,9 +22,7 @@ package org.elasticsearch.cluster.settings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
/**
*
@ -83,6 +81,24 @@ public class SettingsValidatorTests extends ElasticsearchTestCase {
assertThat(Validator.POSITIVE_INTEGER.validate("", "0"), notNullValue());
assertThat(Validator.POSITIVE_INTEGER.validate("", "-1"), notNullValue());
assertThat(Validator.POSITIVE_INTEGER.validate("", "10.2"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "asdasd"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "-1"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "20"), notNullValue()); // we expect 20%
assertThat(Validator.PERCENTAGE.validate("", "-1%"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "101%"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "100%"), nullValue());
assertThat(Validator.PERCENTAGE.validate("", "99%"), nullValue());
assertThat(Validator.PERCENTAGE.validate("", "0%"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "asdasd"), notNullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "20"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "20mb"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "-1%"), notNullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "101%"), notNullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "100%"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "99%"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "0%"), nullValue());
}
@Test

View File

@ -19,13 +19,13 @@
package org.elasticsearch.index.engine;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
@ -41,14 +41,24 @@ public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true));
// VERSION MAP SIZE
long indexBufferSize = engine.config().getIndexingBufferSize().bytes();
long versionMapSize = engine.config().getVersionMapSize().bytes();
assertThat(versionMapSize, equalTo((long) (indexBufferSize * 0.25)));
final int iters = between(1, 20);
for (int i = 0; i < iters; i++) {
boolean compoundOnFlush = randomBoolean();
long gcDeletes = Math.max(0, randomLong());
boolean versionMapAsPercent = randomBoolean();
double versionMapPercent = randomIntBetween(0, 100);
long versionMapSizeInMB = randomIntBetween(10, 20);
String versionMapString = versionMapAsPercent ? versionMapPercent + "%" : versionMapSizeInMB + "mb";
Settings build = ImmutableSettings.builder()
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes)
.put(EngineConfig.INDEX_VERSION_MAP_SIZE, versionMapString)
.build();
client().admin().indices().prepareUpdateSettings("foo").setSettings(build).get();
@ -59,6 +69,14 @@ public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
indexBufferSize = engine.config().getIndexingBufferSize().bytes();
versionMapSize = engine.config().getVersionMapSize().bytes();
if (versionMapAsPercent) {
assertThat(versionMapSize, equalTo((long) (indexBufferSize * (versionMapPercent / 100))));
} else {
assertThat(versionMapSize, equalTo(1024 * 1024 * versionMapSizeInMB));
}
}
Settings settings = ImmutableSettings.builder()
@ -84,6 +102,35 @@ public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
assertEquals(engine.getGcDeletesInMillis(), 1000);
assertTrue(engine.config().isEnableGcDeletes());
settings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_VERSION_MAP_SIZE, "sdfasfd")
.build();
try {
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
fail("settings update didn't fail, but should have");
} catch (ElasticsearchIllegalArgumentException e) {
// good
}
settings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_VERSION_MAP_SIZE, "-12%")
.build();
try {
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
fail("settings update didn't fail, but should have");
} catch (ElasticsearchIllegalArgumentException e) {
// good
}
settings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_VERSION_MAP_SIZE, "130%")
.build();
try {
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
fail("settings update didn't fail, but should have");
} catch (ElasticsearchIllegalArgumentException e) {
// good
}
}