This commit is contained in:
Michael McCandless 2015-10-14 05:41:41 -04:00 committed by mikemccand
parent 077a401c28
commit b3357f09fe
17 changed files with 174 additions and 761 deletions

View File

@ -231,7 +231,6 @@ public class ClusterModule extends AbstractModule {
registerIndexDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
registerIndexDynamicSetting(EngineConfig.INDEX_VERSION_MAP_SIZE, Validator.BYTES_SIZE_OR_PERCENTAGE);
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
@ -324,4 +323,4 @@ public class ClusterModule extends AbstractModule {

View File

@ -125,7 +125,8 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
/** All known time settings for an index. */
public static final Set<String> INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet(

View File

@ -361,8 +361,8 @@ public abstract class Engine implements Closeable {
/** How much heap Lucene's IndexWriter is using */
abstract public long indexWriterRAMBytesUsed();
/** How much heap is used that would be freed by a refresh */
abstract public long indexBufferRAMBytesUsed();
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
@ -460,7 +460,7 @@ public abstract class Engine implements Closeable {
* Refreshes the engine for new search operations to reflect the latest
* Synchronously refreshes the engine for new search operations to reflect the latest
* changes.
public abstract void refresh(String source) throws EngineException;

View File

@ -54,9 +54,7 @@ public final class EngineConfig {
private final ShardId shardId;
private final TranslogRecoveryPerformer translogRecoveryPerformer;
private final Settings indexSettings;
private volatile ByteSizeValue indexingBufferSize;
private volatile ByteSizeValue versionMapSize;
private volatile String versionMapSizeSetting;
private final ByteSizeValue indexingBufferSize;
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean enableGcDeletes = true;
@ -96,21 +94,17 @@ public final class EngineConfig {
public static final String INDEX_CODEC_SETTING = "index.codec";
* The maximum size the version map should grow to before issuing a refresh. Can be an absolute value or a percentage of
* the current index memory buffer (defaults to 25%)
* Index setting to control the index buffer size.
* This setting is <b>not</b> realtime updateable.
public static final String INDEX_VERSION_MAP_SIZE = "index.version_map_size";
public static final String INDEX_BUFFER_SIZE_SETTING = "index.buffer_size";
/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final String DEFAULT_VERSION_MAP_SIZE = "25%";
private static final String DEFAULT_CODEC_NAME = "default";
private TranslogConfig translogConfig;
private boolean create = false;
@ -136,13 +130,12 @@ public final class EngineConfig {
this.similarity = similarity;
this.codecService = codecService;
this.failedEngineListener = failedEngineListener;
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
indexingBufferSize = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(INDEX_CODEC_SETTING, DEFAULT_CODEC_NAME);
// We tell IndexWriter to use large heap, but IndexingMemoryController checks periodically and refreshes the most heap-consuming
// shards when total indexing heap usage is too high:
indexingBufferSize = indexSettings.getAsBytesSize(INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.MB));
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, DEFAULT_GC_DELETES).millis();
this.translogRecoveryPerformer = translogRecoveryPerformer;
this.forceNewTranslog = indexSettings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false);
this.queryCache = queryCache;
@ -150,51 +143,11 @@ public final class EngineConfig {
this.translogConfig = translogConfig;
/** updates {@link #versionMapSize} based on current setting and {@link #indexingBufferSize} */
private void updateVersionMapSize() {
if (versionMapSizeSetting.endsWith("%")) {
double percent = Double.parseDouble(versionMapSizeSetting.substring(0, versionMapSizeSetting.length() - 1));
versionMapSize = new ByteSizeValue((long) ((double) indexingBufferSize.bytes() * (percent / 100)));
} else {
versionMapSize = ByteSizeValue.parseBytesSizeValue(versionMapSizeSetting, INDEX_VERSION_MAP_SIZE);
* Settings the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
public void setVersionMapSizeSetting(String versionMapSizeSetting) {
this.versionMapSizeSetting = versionMapSizeSetting;
* current setting for the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
public String getVersionMapSizeSetting() {
return versionMapSizeSetting;
/** if true the engine will start even if the translog id in the commit point can not be found */
public boolean forceNewTranslog() {
return forceNewTranslog;
* returns the size of the version map that should trigger a refresh
public ByteSizeValue getVersionMapSize() {
return versionMapSize;
* Sets the indexing buffer
public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) {
this.indexingBufferSize = indexingBufferSize;
* Enables / disables gc deletes

View File

@ -101,6 +101,8 @@ public class InternalEngine extends Engine {
private volatile SegmentInfos lastCommittedSegmentInfos;
private volatile boolean refreshing;
private final IndexThrottle throttle;
public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
@ -295,7 +297,6 @@ public class InternalEngine extends Engine {
private void updateIndexWriterSettings() {
try {
final LiveIndexWriterConfig iwc = indexWriter.getConfig();
} catch (AlreadyClosedException ex) {
// ignore
@ -346,7 +347,6 @@ public class InternalEngine extends Engine {
maybeFailEngine("index", t);
throw new IndexFailedEngineException(shardId, index.type(),, t);
return created;
@ -411,33 +411,6 @@ public class InternalEngine extends Engine {
* Forces a refresh if the versionMap is using too much RAM
private void checkVersionMapRefresh() {
if (versionMap.ramBytesUsedForRefresh() > config().getVersionMapSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
try {
if (isClosed.get()) {
// no point...
// Now refresh to clear versionMap:
engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
public void run() {
try {
} catch (EngineClosedException ex) {
// ignore
} catch (EsRejectedExecutionException ex) {
// that is fine too.. we might be shutting down
public void delete(Delete delete) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
@ -450,7 +423,6 @@ public class InternalEngine extends Engine {
private void maybePruneDeletedTombstones() {
@ -516,6 +488,7 @@ public class InternalEngine extends Engine {
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
refreshing = true;
} catch (AlreadyClosedException e) {
@ -525,6 +498,8 @@ public class InternalEngine extends Engine {
} catch (Throwable t) {
failEngine("refresh failed", t);
throw new RefreshFailedEngineException(shardId, t);
} finally {
refreshing = false;
// TODO: maybe we should just put a scheduled job in threadPool?
@ -782,8 +757,12 @@ public class InternalEngine extends Engine {
public long indexWriterRAMBytesUsed() {
return indexWriter.ramBytesUsed();
public long indexBufferRAMBytesUsed() {
if (refreshing) {
return 0;
} else {
return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
@ -1098,8 +1077,6 @@ public class InternalEngine extends Engine {
public void onSettingsChanged() {
// config().getVersionMapSize() may have changed:
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:

View File

@ -234,8 +234,8 @@ public class ShadowEngine extends Engine {
public long indexWriterRAMBytesUsed() {
// No IndexWriter
public long indexBufferRAMBytesUsed() {
// No IndexWriter nor version map
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");

View File

@ -189,11 +189,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final IndexSearcherWrapper searcherWrapper;
/** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
* IndexingMemoryController}). */
private final AtomicBoolean active = new AtomicBoolean();
private volatile long lastWriteNS;
private final IndexingMemoryController indexingMemoryController;
@ -253,9 +248,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
// We start up inactive
public Store store() {
@ -458,7 +450,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public boolean index(Engine.Index index) {
index = indexingService.preIndex(index);
final boolean created;
try {
@ -483,7 +474,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public void delete(Engine.Delete delete) {
delete = indexingService.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
@ -893,22 +883,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
/** Returns timestamp of last indexing operation */
public long getLastWriteNS() {
return lastWriteNS;
/** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
private void markLastWrite(Engine.Operation op) {
lastWriteNS = op.startTime();
if (active.getAndSet(true) == false) {
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
// be low, and it's rare this happens.
private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
Engine.Operation.Origin origin = op.origin();
IndexShardState state = this.state; // one time volatile read
@ -972,70 +946,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
* the new buffering indexing size then we do a refresh to free up the heap. */
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
final EngineConfig config = engineConfig;
final ByteSizeValue preValue = config.getIndexingBufferSize();
public long getIndexBufferRAMBytesUsed() {
Engine engine = getEngineOrNull();
if (engine == null) {
logger.debug("updateBufferSize: engine is closed; skipping");
return 0;
// update engine if it is already started.
if (preValue.bytes() != shardIndexingBufferSize.bytes()) {
// so we push changes these changes down to IndexWriter:
long iwBytesUsed = engine.indexWriterRAMBytesUsed();
String message = LoggerMessageFormat.format("updating index_buffer_size from [{}] to [{}]; IndexWriter now using [{}] bytes",
preValue, shardIndexingBufferSize, iwBytesUsed);
if (iwBytesUsed > shardIndexingBufferSize.bytes()) {
// our allowed buffer was changed to less than we are currently using; we ask IW to refresh
// so it clears its buffers (otherwise it won't clear until the next indexing/delete op)
logger.debug(message + "; now refresh to clear IndexWriter memory");
// TODO: should IW have an API to move segments to disk, but not refresh? Its flush method is protected...
try {
refresh("update index buffer");
} catch (Throwable e) {
logger.warn("failed to refresh after decreasing index buffer", e);
} else {
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
* if the shard is inactive. */
public boolean checkIdle(long inactiveTimeNS) {
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
logger.debug("shard is now inactive");
return active.get() == false;
/** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
* IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */
public boolean getActive() {
return active.get();
return engine.indexBufferRAMBytesUsed();
public final boolean isFlushOnClose() {
@ -1163,10 +1079,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
change = true;
final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting());
if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
@ -1219,6 +1131,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return percolatorQueriesRegistry.stats();
* Asynchronously refreshes the engine for new search operations to reflect the latest
* changes.
public void refreshAsync(final String reason) {
// nocommit this really is async???
engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
public void run() {
try {
} catch (EngineClosedException ex) {
// ignore
class EngineRefresher implements Runnable {
public void run() {

View File

@ -116,18 +116,6 @@ public final class BufferingTranslogWriter extends TranslogWriter {
public void updateBufferSize(int bufferSize) {
try (ReleasableLock lock = writeLock.acquire()) {
if (this.buffer.length != bufferSize) {
this.buffer = new byte[bufferSize];
} catch (IOException e) {
throw new TranslogException(shardId, "failed to flush", e);
class WrapperOutputStream extends OutputStream {

View File

@ -252,13 +252,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
throw new IllegalArgumentException("can't parse id from file: " + fileName);
public void updateBuffer(ByteSizeValue bufferSize) {
try (ReleasableLock lock = writeLock.acquire()) {
boolean isOpen() {
return closed.get() == false;
@ -335,7 +328,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
TranslogWriter createWriter(long fileGeneration) throws IOException {
TranslogWriter newFile;
try {
newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize());
newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSizeBytes());
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);

View File

@ -48,7 +48,7 @@ public final class TranslogConfig {
private final BigArrays bigArrays;
private final ThreadPool threadPool;
private final boolean syncOnEachOperation;
private volatile int bufferSize;
private final int bufferSizeBytes;
private volatile TranslogGeneration translogGeneration;
private volatile Translog.Durabilty durabilty = Translog.Durabilty.REQUEST;
private volatile TranslogWriter.Type type;
@ -73,7 +73,7 @@ public final class TranslogConfig {
this.threadPool = threadPool;
this.bigArrays = bigArrays;
this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE,;
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER).bytes(); // Not really interesting, updated by IndexingMemoryController...
this.bufferSizeBytes = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.SHARD_TRANSLOG_BUFFER).bytes();
syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
if (syncInterval.millis() > 0 && threadPool != null) {
@ -130,15 +130,8 @@ public final class TranslogConfig {
* Retruns the current translog buffer size.
public int getBufferSize() {
return bufferSize;
* Sets the current buffer size - for setting a live setting use {@link Translog#updateBuffer(ByteSizeValue)}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
public int getBufferSizeBytes() {
return bufferSizeBytes;

View File

@ -80,7 +80,7 @@ public class TranslogWriter extends TranslogReader {
writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE);
final TranslogWriter writer = type.create(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize);
return writer;
} catch (Throwable throwable){
} catch (Throwable throwable) {
try {
Files.delete(file); // remove the file as well

View File

@ -19,6 +19,9 @@
package org.elasticsearch.indices.memory;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -37,9 +40,6 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
@ -51,51 +51,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size";
/** Sets a floor on the per-shard index buffer size (default: 4 MB). */
public static final String MIN_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_shard_index_buffer_size";
/** How frequently we check indexing memory usage (default: 5 seconds). */
public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval";
/** Sets a ceiling on the per-shard index buffer size (default: 512 MB). */
public static final String MAX_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_shard_index_buffer_size";
/** How much heap (% or bytes) we will share across all actively indexing shards for the translog buffer (default: 1%). */
public static final String TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.translog_buffer_size";
/** Only applies when <code>indices.memory.translog_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 256 KB). */
public static final String MIN_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_translog_buffer_size";
/** Only applies when <code>indices.memory.translog_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
public static final String MAX_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_translog_buffer_size";
/** Sets a floor on the per-shard translog buffer size (default: 2 KB). */
public static final String MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_shard_translog_buffer_size";
/** Sets a ceiling on the per-shard translog buffer size (default: 64 KB). */
public static final String MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_shard_translog_buffer_size";
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
/** How frequently we check shards to find inactive ones (default: 30 seconds). */
public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval";
/** Once a shard becomes inactive, we reduce the {@code IndexWriter} buffer to this value (500 KB) to let active shards use the heap instead. */
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER");
/** Once a shard becomes inactive, we reduce the {@code Translog} buffer to this value (1 KB) to let active shards use the heap instead. */
public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER");
/** Hardwired translog buffer size */
public static final ByteSizeValue SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("32kb", "SHARD_TRANSLOG_BUFFER");
private final ThreadPool threadPool;
private final IndicesService indicesService;
private final ByteSizeValue indexingBuffer;
private final ByteSizeValue minShardIndexBufferSize;
private final ByteSizeValue maxShardIndexBufferSize;
private final ByteSizeValue translogBuffer;
private final ByteSizeValue minShardTranslogBufferSize;
private final ByteSizeValue maxShardTranslogBufferSize;
private final TimeValue inactiveTime;
private final TimeValue interval;
private volatile ScheduledFuture scheduler;
@ -134,43 +100,13 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, INDEX_BUFFER_SIZE_SETTING);
this.indexingBuffer = indexingBuffer;
this.minShardIndexBufferSize = this.settings.getAsBytesSize(MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(4, ByteSizeUnit.MB));
// LUCENE MONITOR: Based on this thread, currently (based on Mike), having a large buffer does not make a lot of sense:
this.maxShardIndexBufferSize = this.settings.getAsBytesSize(MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(512, ByteSizeUnit.MB));
ByteSizeValue translogBuffer;
String translogBufferSetting = this.settings.get(TRANSLOG_BUFFER_SIZE_SETTING, "1%");
if (translogBufferSetting.endsWith("%")) {
double percent = Double.parseDouble(translogBufferSetting.substring(0, translogBufferSetting.length() - 1));
translogBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100)));
ByteSizeValue minTranslogBuffer = this.settings.getAsBytesSize(MIN_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.KB));
ByteSizeValue maxTranslogBuffer = this.settings.getAsBytesSize(MAX_TRANSLOG_BUFFER_SIZE_SETTING, null);
if (translogBuffer.bytes() < minTranslogBuffer.bytes()) {
translogBuffer = minTranslogBuffer;
if (maxTranslogBuffer != null && translogBuffer.bytes() > maxTranslogBuffer.bytes()) {
translogBuffer = maxTranslogBuffer;
} else {
translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, TRANSLOG_BUFFER_SIZE_SETTING);
this.translogBuffer = translogBuffer;
this.minShardTranslogBufferSize = this.settings.getAsBytesSize(MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(2, ByteSizeUnit.KB));
this.maxShardTranslogBufferSize = this.settings.getAsBytesSize(MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(64, ByteSizeUnit.KB));
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
// we need to have this relatively small to move a shard from inactive to active fast (enough)
this.interval = this.settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30));
// we need to have this relatively small to free up heap quickly enough
this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(5));
this.statusChecker = new ShardsIndicesStatusChecker();
logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]",
logger.debug("using indexing buffer size [{}] with {} [{}]", this.indexingBuffer, SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
@ -197,15 +133,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return indexingBuffer;
* returns the current budget for the total amount of translog buffers of
* active shards on this node
public ByteSizeValue translogBufferSize() {
return translogBuffer;
protected List<ShardId> availableShards() {
ArrayList<ShardId> list = new ArrayList<>();
@ -224,6 +151,24 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return shardAvailable(getShard(shardId));
/** returns how much heap this shard is using for its indexing buffer */
protected long getIndexBufferRAMBytesUsed(ShardId shardId) {
IndexShard shard = getShard(shardId);
if (shard == null) {
return 0;
return shard.getIndexBufferRAMBytesUsed();
/** ask this shard to refresh, in the background, to free up heap */
public void refreshShardAsync(ShardId shardId) {
IndexShard shard = getShard(shardId);
if (shard != null) {
/** returns true if shard exists and is availabe for updates */
protected boolean shardAvailable(@Nullable IndexShard shard) {
// shadow replica doesn't have an indexing buffer
@ -240,186 +185,60 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return null;
/** set new indexing and translog buffers on this shard. this may cause the shard to refresh to free up heap. */
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
final IndexShard shard = getShard(shardId);
if (shard != null) {
try {
shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize);
} catch (EngineClosedException e) {
// ignore
} catch (FlushNotAllowedEngineException e) {
// ignore
} catch (Exception e) {
logger.warn("failed to set shard {} index buffer to [{}]", shardId, shardIndexingBufferSize);
/** returns {@link IndexShard#getActive} if the shard exists, else null */
protected Boolean getShardActive(ShardId shardId) {
final IndexShard indexShard = getShard(shardId);
if (indexShard == null) {
return null;
return indexShard.getActive();
/** check if any shards active status changed, now. */
public void forceCheck() {;
class ShardsIndicesStatusChecker implements Runnable {
static class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
final long bytesUsed;
final ShardId shardId;
// True if the shard was active last time we checked
private final Map<ShardId,Boolean> shardWasActive = new HashMap<>();
public ShardAndBytesUsed(long bytesUsed, ShardId shardId) {
this.bytesUsed = bytesUsed;
this.shardId = shardId;
public int compareTo(ShardAndBytesUsed other) {
// Sort larger shards first:
return, bytesUsed);
class ShardsIndicesStatusChecker implements Runnable {
public synchronized void run() {
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();
if (changes.isEmpty() == false) {
// Something changed: recompute indexing buffers:
calcAndSetShardBuffers("[" + changes + "]");
* goes through all existing shards and check whether there are changes in their active status
private void updateShardStatuses(EnumSet<ShardStatusChangeType> changes) {
// Fast check to sum up how much heap all shards' indexing buffers are using now:
long totalBytesUsed = 0;
for (ShardId shardId : availableShards()) {
// Is the shard active now?
Boolean isActive = getShardActive(shardId);
if (isActive == null) {
// shard was closed..
long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId);
if (shardBytesUsed > 0) {
totalBytesUsed += shardBytesUsed;
// Was the shard active last time we checked?
Boolean wasActive = shardWasActive.get(shardId);
if (totalBytesUsed > indexingBuffer.bytes()) {
// OK we are using too much; make a queue and ask largest shard(s) to refresh:
logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer);
if (wasActive == null) {
// First time we are seeing this shard
shardWasActive.put(shardId, isActive);
} else if (isActive) {
// Shard is active now
if (wasActive == false) {
// Shard became active itself, since we last checked (due to new indexing op arriving)
logger.debug("marking shard {} as active indexing wise", shardId);
shardWasActive.put(shardId, true);
} else if (checkIdle(shardId, inactiveTime.nanos()) == Boolean.TRUE) {
// Make shard inactive now
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
shardWasActive.put(shardId, false);
PriorityQueue<ShardAndBytesUsed> queue = new PriorityQueue<>();
for (ShardId shardId : availableShards()) {
long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId);
if (shardBytesUsed > 0) {
queue.add(new ShardAndBytesUsed(shardBytesUsed, shardId));
* purge any existing statuses that are no longer updated
* @return the changes applied
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
Iterator<ShardId> statusShardIdIterator = shardWasActive.keySet().iterator();
while (statusShardIdIterator.hasNext()) {
ShardId shardId =;
if (shardAvailable(shardId) == false) {
return changes;
private void calcAndSetShardBuffers(String reason) {
// Count how many shards are now active:
int activeShardCount = 0;
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
if (ent.getValue()) {
// TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard
// is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not
// get the same indexing buffer as large indices. But it quickly gets tricky...
if (activeShardCount == 0) {
logger.debug("no active shards (reason={})", reason);
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShardCount);
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = minShardIndexBufferSize;
if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = maxShardIndexBufferSize;
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShardCount);
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = minShardTranslogBufferSize;
if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = maxShardTranslogBufferSize;
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
if (ent.getValue()) {
// This shard is active
updateShardBuffers(ent.getKey(), shardIndexingBufferSize, shardTranslogBufferSize);
while (totalBytesUsed > indexingBuffer.bytes() && queue.isEmpty() == false) {
ShardAndBytesUsed largest = queue.poll();
logger.debug("refresh shard [{}] to free up its [{}] indexing buffer", largest.shardId, new ByteSizeValue(largest.bytesUsed));
totalBytesUsed -= largest.bytesUsed;
protected long currentTimeInNanos() {
return System.nanoTime();
/** ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so. returns Boolean.TRUE if
* it did deactive, Boolean.FALSE if it did not, and null if the shard is unknown */
protected Boolean checkIdle(ShardId shardId, long inactiveTimeNS) {
String ignoreReason = null;
final IndexShard shard = getShard(shardId);
if (shard != null) {
try {
return shard.checkIdle(inactiveTimeNS);
} catch (EngineClosedException e) {
// ignore
ignoreReason = "EngineClosedException";
} catch (FlushNotAllowedEngineException e) {
// ignore
ignoreReason = "FlushNotAllowedEngineException";
} else {
ignoreReason = "shard not found";
if (ignoreReason != null) {
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId);
return null;
private static enum ShardStatusChangeType {

View File

@ -44,8 +44,6 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
long indexBufferSize = engine.config().getIndexingBufferSize().bytes();
long versionMapSize = engine.config().getVersionMapSize().bytes();
assertThat(versionMapSize, equalTo((long) (indexBufferSize * 0.25)));
final int iters = between(1, 20);
for (int i = 0; i < iters; i++) {
@ -55,15 +53,9 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
// the full long range here else the assert below fails:
long gcDeletes = random().nextLong() & (Long.MAX_VALUE >> 11);
boolean versionMapAsPercent = randomBoolean();
double versionMapPercent = randomIntBetween(0, 100);
long versionMapSizeInMB = randomIntBetween(10, 20);
String versionMapString = versionMapAsPercent ? versionMapPercent + "%" : versionMapSizeInMB + "mb";
Settings build = Settings.builder()
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes, TimeUnit.MILLISECONDS)
.put(EngineConfig.INDEX_VERSION_MAP_SIZE, versionMapString)
assertEquals(gcDeletes, build.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, null).millis());
@ -77,12 +69,6 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
indexBufferSize = engine.config().getIndexingBufferSize().bytes();
versionMapSize = engine.config().getVersionMapSize().bytes();
if (versionMapAsPercent) {
assertThat(versionMapSize, equalTo((long) (indexBufferSize * (versionMapPercent / 100))));
} else {
assertThat(versionMapSize, equalTo(1024 * 1024 * versionMapSizeInMB));
Settings settings = Settings.builder()
@ -107,37 +93,5 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
assertEquals(engine.getGcDeletesInMillis(), 1000);
settings = Settings.builder()
.put(EngineConfig.INDEX_VERSION_MAP_SIZE, "sdfasfd")
try {
fail("settings update didn't fail, but should have");
} catch (IllegalArgumentException e) {
// good
settings = Settings.builder()
.put(EngineConfig.INDEX_VERSION_MAP_SIZE, "-12%")
try {
fail("settings update didn't fail, but should have");
} catch (IllegalArgumentException e) {
// good
settings = Settings.builder()
.put(EngineConfig.INDEX_VERSION_MAP_SIZE, "130%")
try {
fail("settings update didn't fail, but should have");
} catch (IllegalArgumentException e) {
// good

View File

@ -444,7 +444,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSegmentsWithMergeFlag() throws Exception {
try (Store store = createStore();
Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) {
Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
@ -779,7 +779,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSyncedFlush() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
new LogByteSizeMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
@ -985,7 +985,7 @@ public class InternalEngineTests extends ESTestCase {
public void testForceMerge() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
@ -1435,7 +1435,7 @@ public class InternalEngineTests extends ESTestCase {
public void testEnableGcDeletes() throws Exception {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
// Add document
@ -1562,10 +1562,11 @@ public class InternalEngineTests extends ESTestCase {
// #10312
public void testDeletesAloneCanTriggerRefresh() throws Exception {
Settings settings = Settings.builder()
.put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()),
false)) {
engine.config().setIndexingBufferSize(new ByteSizeValue(1, ByteSizeUnit.KB));
Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null);

View File

@ -339,8 +339,6 @@ public class IndexShardTests extends ESSingleNodeTestCase {
client().prepareIndex("test", "test").setSource("{}").get();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
Boolean result = indicesService.indexService("test").getShardOrNull(0).checkIdle(0);
assertEquals(Boolean.TRUE, result);
assertBusy(new Runnable() { // should be very very quick
public void run() {

View File

@ -1,104 +0,0 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.indices.memory;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndexingMemoryControllerIT extends ESIntegTestCase {
private long getIWBufferSize(String indexName) {
return client().admin().indices().prepareStats(indexName).get().getTotal().getSegments().getIndexWriterMaxMemoryInBytes();
public void testIndexBufferPushedToEngine() throws InterruptedException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100000h",
IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb",
IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());
// Create two active indices, sharing 32 MB indexing buffer:
prepareCreate("test3").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
prepareCreate("test4").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
index("test3", "type", "1", "f", 1);
index("test4", "type", "1", "f", 1);
// .. then make sure we really pushed the update (16 MB for each) down to the IndexWriter, even if refresh nor flush occurs:
if (awaitBusy(() -> getIWBufferSize("test3") == 16*1024*1024) == false) {
fail("failed to update shard indexing buffer size for test3 index to 16 MB; got: " + getIWBufferSize("test3"));
if (awaitBusy(() -> getIWBufferSize("test4") == 16*1024*1024) == false) {
fail("failed to update shard indexing buffer size for test4 index to 16 MB; got: " + getIWBufferSize("test4"));
if (awaitBusy(() -> getIWBufferSize("test3") == 32 * 1024 * 1024) == false) {
fail("failed to update shard indexing buffer size for test3 index to 32 MB; got: " + getIWBufferSize("test4"));
public void testInactivePushedToShard() throws InterruptedException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms",
IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms",
IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());
// Create two active indices, sharing 32 MB indexing buffer:
prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
index("test1", "type", "1", "f", 1);
// make shard the shard buffer was set to inactive size
final ByteSizeValue inactiveBuffer = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
if (awaitBusy(() -> getIWBufferSize("test1") == inactiveBuffer.bytes()) == false) {
fail("failed to update shard indexing buffer size for test1 index to [" + inactiveBuffer + "]; got: " + getIWBufferSize("test1"));
private void createNode(Settings settings) {
.put(ClusterName.SETTING, "IndexingMemoryControllerIT")
.put("", "IndexingMemoryControllerIT")
.put(EsExecutors.PROCESSORS, 1) // limit the number of threads created
.put("http.enabled", false)
.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true) // make sure we get what we set :)
.put(IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms")

View File

@ -41,115 +41,80 @@ public class IndexingMemoryControllerTests extends ESTestCase {
static class MockController extends IndexingMemoryController {
final static ByteSizeValue INACTIVE = new ByteSizeValue(-1);
final Map<ShardId, ByteSizeValue> indexingBuffers = new HashMap<>();
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
final Map<ShardId, Long> lastIndexTimeNanos = new HashMap<>();
final Set<ShardId> activeShards = new HashSet<>();
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
final Map<ShardId, Long> indexBufferRAMBytesUsed = new HashMap<>();
public MockController(Settings settings) {
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
.put(SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate
.put(SHARD_MEMORY_INTERVAL_TIME_SETTING, "200h") // disable it
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
public void deleteShard(ShardId id) {
public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) {
assertThat(indexingBuffers.get(id), equalTo(indexing));
assertThat(translogBuffers.get(id), equalTo(translog));
public void assertInActive(ShardId id) {
assertThat(indexingBuffers.get(id), equalTo(INACTIVE));
assertThat(translogBuffers.get(id), equalTo(INACTIVE));
protected long currentTimeInNanos() {
return TimeValue.timeValueSeconds(currentTimeSec).nanos();
public void deleteShard(ShardId shardId) {
protected List<ShardId> availableShards() {
return new ArrayList<>(indexingBuffers.keySet());
return new ArrayList<>(indexBufferRAMBytesUsed.keySet());
protected boolean shardAvailable(ShardId shardId) {
return indexingBuffers.containsKey(shardId);
return indexBufferRAMBytesUsed.containsKey(shardId);
protected Boolean getShardActive(ShardId shardId) {
return activeShards.contains(shardId);
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
indexingBuffers.put(shardId, shardIndexingBufferSize);
translogBuffers.put(shardId, shardTranslogBufferSize);
protected Boolean checkIdle(ShardId shardId, long inactiveTimeNS) {
Long ns = lastIndexTimeNanos.get(shardId);
if (ns == null) {
return null;
} else if (currentTimeInNanos() - ns >= inactiveTimeNS) {
indexingBuffers.put(shardId, INACTIVE);
translogBuffers.put(shardId, INACTIVE);
return true;
protected long getIndexBufferRAMBytesUsed(ShardId shardId) {
Long used = indexBufferRAMBytesUsed.get(shardId);
if (used == null) {
return 0;
} else {
return false;
return used;
public void incrementTimeSec(int sec) {
currentTimeSec += sec;
public void refreshShardAsync(ShardId shardId) {
indexBufferRAMBytesUsed.put(shardId, 0L);
public void assertBuffer(ShardId shardId, ByteSizeValue expected) {
Long actual = indexBufferRAMBytesUsed.get(shardId);
assertEquals(expected.bytes(), actual.longValue());
public void simulateIndexing(ShardId shardId) {
lastIndexTimeNanos.put(shardId, currentTimeInNanos());
if (indexingBuffers.containsKey(shardId) == false) {
// First time we are seeing this shard; start it off with inactive buffers as IndexShard does:
indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
Long bytes = indexBufferRAMBytesUsed.get(shardId);
if (bytes == null) {
bytes = 0L;
// Each doc we index takes up a megabyte!
bytes += 1024*1024;
indexBufferRAMBytesUsed.put(shardId, bytes);
public void testShardAdditionAndRemoval() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build());
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
final ShardId shard1 = new ShardId("test", 1);
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB));
// add another shard
final ShardId shard2 = new ShardId("test", 2);
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard2, new ByteSizeValue(1, ByteSizeUnit.MB));
// remove first shard
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
controller.assertBuffer(shard2, new ByteSizeValue(1, ByteSizeUnit.MB));
// remove second shard
@ -158,116 +123,62 @@ public class IndexingMemoryControllerTests extends ESTestCase {
// add a new one
final ShardId shard3 = new ShardId("test", 3);
controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
controller.assertBuffer(shard3, new ByteSizeValue(1, ByteSizeUnit.MB));
public void testActiveInactive() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb")
.put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "5s")
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "5mb")
final ShardId shard1 = new ShardId("test", 1);
final ShardId shard2 = new ShardId("test", 2);
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard2, new ByteSizeValue(1, ByteSizeUnit.MB));
// index into both shards, move the clock and see that they are still active
controller.assertBuffer(shard1, new ByteSizeValue(2, ByteSizeUnit.MB));
controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB));
// both shards now inactive
// index into one shard only, see it becomes active
// index into one shard only, hits the 5mb limit, so shard1 is refreshed
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.assertBuffer(shard1, new ByteSizeValue(0, ByteSizeUnit.MB));
controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB));
controller.incrementTimeSec(3); // increment but not enough to become inactive
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.incrementTimeSec(3); // increment some more
// index some and shard becomes immediately active
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
public void testMinShardBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb")
.put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb")
.put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build());
assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB));
public void testMaxShardBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb")
.put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb")
.put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build());
assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB));
controller.assertBuffer(shard2, new ByteSizeValue(4, ByteSizeUnit.MB));
// shard2 used up the full 5 mb and is now cleared:
controller.assertBuffer(shard2, new ByteSizeValue(0, ByteSizeUnit.MB));
public void testRelativeBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%")
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB)));
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
public void testMinBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%")
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb")
.put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build());
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb").build());
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
public void testMaxBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%")
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb")
.put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build());
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb").build());
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) {
final ShardId shard1 = new ShardId("test", 1);
final ShardId shard2 = new ShardId("test", 2);
controller.assertBuffers(shard1, indexBufferSize, translogBufferSize);
controller.assertBuffers(shard2, indexBufferSize, translogBufferSize);