From b3357f09feb228c0f9ccc7dc75cadce803337b87 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Wed, 14 Oct 2015 05:41:41 -0400 Subject: [PATCH] a start --- .../elasticsearch/cluster/ClusterModule.java | 3 +- .../metadata/MetaDataIndexUpgradeService.java | 3 +- .../elasticsearch/index/engine/Engine.java | 6 +- .../index/engine/EngineConfig.java | 67 +--- .../index/engine/InternalEngine.java | 45 +-- .../index/engine/ShadowEngine.java | 4 +- .../elasticsearch/index/shard/IndexShard.java | 112 ++----- .../translog/BufferingTranslogWriter.java | 12 - .../index/translog/Translog.java | 9 +- .../index/translog/TranslogConfig.java | 15 +- .../index/translog/TranslogWriter.java | 2 +- .../memory/IndexingMemoryController.java | 307 ++++-------------- .../engine/InternalEngineSettingsTests.java | 46 --- .../index/engine/InternalEngineTests.java | 15 +- .../index/shard/IndexShardTests.java | 2 - .../memory/IndexingMemoryControllerIT.java | 104 ------ .../memory/IndexingMemoryControllerTests.java | 183 +++-------- 17 files changed, 174 insertions(+), 761 deletions(-) delete mode 100644 core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 581bee10369..df1529a1957 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -231,7 +231,6 @@ public class ClusterModule extends AbstractModule { registerIndexDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN); registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); - registerIndexDynamicSetting(EngineConfig.INDEX_VERSION_MAP_SIZE, Validator.BYTES_SIZE_OR_PERCENTAGE); registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME); registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME); registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME); @@ -324,4 +323,4 @@ public class ClusterModule extends AbstractModule { bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index cdde49170d4..29a3dcaeb9f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -125,7 +125,8 @@ public class MetaDataIndexUpgradeService extends AbstractComponent { "index.store.throttle.max_bytes_per_sec", "index.translog.flush_threshold_size", "index.translog.fs.buffer_size", - "index.version_map_size")); + "index.version_map_size", + "index.buffer_size")); /** All known time settings for an index. */ public static final Set INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet( diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index c07be064489..90640f0695e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -361,8 +361,8 @@ public abstract class Engine implements Closeable { stats.addIndexWriterMaxMemoryInBytes(0); } - /** How much heap Lucene's IndexWriter is using */ - abstract public long indexWriterRAMBytesUsed(); + /** How much heap is used that would be freed by a refresh */ + abstract public long indexBufferRAMBytesUsed(); protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) { ensureOpen(); @@ -460,7 +460,7 @@ public abstract class Engine implements Closeable { } /** - * Refreshes the engine for new search operations to reflect the latest + * Synchronously refreshes the engine for new search operations to reflect the latest * changes. */ public abstract void refresh(String source) throws EngineException; diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index fd4b5daf4ee..e76aa1d38b9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -54,9 +54,7 @@ public final class EngineConfig { private final ShardId shardId; private final TranslogRecoveryPerformer translogRecoveryPerformer; private final Settings indexSettings; - private volatile ByteSizeValue indexingBufferSize; - private volatile ByteSizeValue versionMapSize; - private volatile String versionMapSizeSetting; + private final ByteSizeValue indexingBufferSize; private volatile boolean compoundOnFlush = true; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); private volatile boolean enableGcDeletes = true; @@ -96,21 +94,17 @@ public final class EngineConfig { public static final String INDEX_CODEC_SETTING = "index.codec"; /** - * The maximum size the version map should grow to before issuing a refresh. Can be an absolute value or a percentage of - * the current index memory buffer (defaults to 25%) + * Index setting to control the index buffer size. + * This setting is not realtime updateable. */ - public static final String INDEX_VERSION_MAP_SIZE = "index.version_map_size"; - + public static final String INDEX_BUFFER_SIZE_SETTING = "index.buffer_size"; /** if set to true the engine will start even if the translog id in the commit point can not be found */ public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog"; - public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS); public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60); - public static final String DEFAULT_VERSION_MAP_SIZE = "25%"; - private static final String DEFAULT_CODEC_NAME = "default"; private TranslogConfig translogConfig; private boolean create = false; @@ -136,13 +130,12 @@ public final class EngineConfig { this.similarity = similarity; this.codecService = codecService; this.failedEngineListener = failedEngineListener; - this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); - codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME); - // We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing: - indexingBufferSize = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER; - gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis(); - versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE); - updateVersionMapSize(); + this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); + codecName = indexSettings.get(INDEX_CODEC_SETTING, DEFAULT_CODEC_NAME); + // We tell IndexWriter to use large heap, but IndexingMemoryController checks periodically and refreshes the most heap-consuming + // shards when total indexing heap usage is too high: + indexingBufferSize = indexSettings.getAsBytesSize(INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.MB)); + gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, DEFAULT_GC_DELETES).millis(); this.translogRecoveryPerformer = translogRecoveryPerformer; this.forceNewTranslog = indexSettings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false); this.queryCache = queryCache; @@ -150,51 +143,11 @@ public final class EngineConfig { this.translogConfig = translogConfig; } - /** updates {@link #versionMapSize} based on current setting and {@link #indexingBufferSize} */ - private void updateVersionMapSize() { - if (versionMapSizeSetting.endsWith("%")) { - double percent = Double.parseDouble(versionMapSizeSetting.substring(0, versionMapSizeSetting.length() - 1)); - versionMapSize = new ByteSizeValue((long) ((double) indexingBufferSize.bytes() * (percent / 100))); - } else { - versionMapSize = ByteSizeValue.parseBytesSizeValue(versionMapSizeSetting, INDEX_VERSION_MAP_SIZE); - } - } - - /** - * Settings the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details. - */ - public void setVersionMapSizeSetting(String versionMapSizeSetting) { - this.versionMapSizeSetting = versionMapSizeSetting; - updateVersionMapSize(); - } - - /** - * current setting for the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details. - */ - public String getVersionMapSizeSetting() { - return versionMapSizeSetting; - } - /** if true the engine will start even if the translog id in the commit point can not be found */ public boolean forceNewTranslog() { return forceNewTranslog; } - /** - * returns the size of the version map that should trigger a refresh - */ - public ByteSizeValue getVersionMapSize() { - return versionMapSize; - } - - /** - * Sets the indexing buffer - */ - public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) { - this.indexingBufferSize = indexingBufferSize; - updateVersionMapSize(); - } - /** * Enables / disables gc deletes * diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3973b47f3ac..f264af2a0aa 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -101,6 +101,8 @@ public class InternalEngine extends Engine { private volatile SegmentInfos lastCommittedSegmentInfos; + private volatile boolean refreshing; + private final IndexThrottle throttle; public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException { @@ -295,7 +297,6 @@ public class InternalEngine extends Engine { private void updateIndexWriterSettings() { try { final LiveIndexWriterConfig iwc = indexWriter.getConfig(); - iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().mbFrac()); iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush()); } catch (AlreadyClosedException ex) { // ignore @@ -346,7 +347,6 @@ public class InternalEngine extends Engine { maybeFailEngine("index", t); throw new IndexFailedEngineException(shardId, index.type(), index.id(), t); } - checkVersionMapRefresh(); return created; } @@ -411,33 +411,6 @@ public class InternalEngine extends Engine { } } - /** - * Forces a refresh if the versionMap is using too much RAM - */ - private void checkVersionMapRefresh() { - if (versionMap.ramBytesUsedForRefresh() > config().getVersionMapSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) { - try { - if (isClosed.get()) { - // no point... - return; - } - // Now refresh to clear versionMap: - engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() { - @Override - public void run() { - try { - refresh("version_table_full"); - } catch (EngineClosedException ex) { - // ignore - } - } - }); - } catch (EsRejectedExecutionException ex) { - // that is fine too.. we might be shutting down - } - } - } - @Override public void delete(Delete delete) throws EngineException { try (ReleasableLock lock = readLock.acquire()) { @@ -450,7 +423,6 @@ public class InternalEngine extends Engine { } maybePruneDeletedTombstones(); - checkVersionMapRefresh(); } private void maybePruneDeletedTombstones() { @@ -516,6 +488,7 @@ public class InternalEngine extends Engine { // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); + refreshing = true; searcherManager.maybeRefreshBlocking(); } catch (AlreadyClosedException e) { ensureOpen(); @@ -525,6 +498,8 @@ public class InternalEngine extends Engine { } catch (Throwable t) { failEngine("refresh failed", t); throw new RefreshFailedEngineException(shardId, t); + } finally { + refreshing = false; } // TODO: maybe we should just put a scheduled job in threadPool? @@ -782,8 +757,12 @@ public class InternalEngine extends Engine { } @Override - public long indexWriterRAMBytesUsed() { - return indexWriter.ramBytesUsed(); + public long indexBufferRAMBytesUsed() { + if (refreshing) { + return 0; + } else { + return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh(); + } } @Override @@ -1098,8 +1077,6 @@ public class InternalEngine extends Engine { public void onSettingsChanged() { mergeScheduler.refreshConfig(); updateIndexWriterSettings(); - // config().getVersionMapSize() may have changed: - checkVersionMapRefresh(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletedTombstones(); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 921f1167f43..82aee8340fd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -234,8 +234,8 @@ public class ShadowEngine extends Engine { } @Override - public long indexWriterRAMBytesUsed() { - // No IndexWriter + public long indexBufferRAMBytesUsed() { + // No IndexWriter nor version map throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2ff8c37db3b..292ef01b8d1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -189,11 +189,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett private final IndexSearcherWrapper searcherWrapper; - /** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link - * IndexingMemoryController}). */ - private final AtomicBoolean active = new AtomicBoolean(); - - private volatile long lastWriteNS; private final IndexingMemoryController indexingMemoryController; @Inject @@ -253,9 +248,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) { percolatorQueriesRegistry.enableRealTimePercolator(); } - - // We start up inactive - active.set(false); } public Store store() { @@ -458,7 +450,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett */ public boolean index(Engine.Index index) { ensureWriteAllowed(index); - markLastWrite(index); index = indexingService.preIndex(index); final boolean created; try { @@ -483,7 +474,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett public void delete(Engine.Delete delete) { ensureWriteAllowed(delete); - markLastWrite(delete); delete = indexingService.preDelete(delete); try { if (logger.isTraceEnabled()) { @@ -893,22 +883,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett } } - /** Returns timestamp of last indexing operation */ - public long getLastWriteNS() { - return lastWriteNS; - } - - /** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */ - private void markLastWrite(Engine.Operation op) { - lastWriteNS = op.startTime(); - if (active.getAndSet(true) == false) { - // We are currently inactive, but a new write operation just showed up, so we now notify IMC - // to wake up and fix our indexing buffer. We could do this async instead, but cost should - // be low, and it's rare this happens. - indexingMemoryController.forceCheck(); - } - } - private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException { Engine.Operation.Origin origin = op.origin(); IndexShardState state = this.state; // one time volatile read @@ -972,70 +946,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett this.failedEngineListener.delegates.add(failedEngineListener); } - /** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than - * the new buffering indexing size then we do a refresh to free up the heap. */ - public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { - - final EngineConfig config = engineConfig; - final ByteSizeValue preValue = config.getIndexingBufferSize(); - - config.setIndexingBufferSize(shardIndexingBufferSize); - + public long getIndexBufferRAMBytesUsed() { Engine engine = getEngineOrNull(); if (engine == null) { - logger.debug("updateBufferSize: engine is closed; skipping"); - return; + return 0; } - - // update engine if it is already started. - if (preValue.bytes() != shardIndexingBufferSize.bytes()) { - // so we push changes these changes down to IndexWriter: - engine.onSettingsChanged(); - - long iwBytesUsed = engine.indexWriterRAMBytesUsed(); - - String message = LoggerMessageFormat.format("updating index_buffer_size from [{}] to [{}]; IndexWriter now using [{}] bytes", - preValue, shardIndexingBufferSize, iwBytesUsed); - - if (iwBytesUsed > shardIndexingBufferSize.bytes()) { - // our allowed buffer was changed to less than we are currently using; we ask IW to refresh - // so it clears its buffers (otherwise it won't clear until the next indexing/delete op) - logger.debug(message + "; now refresh to clear IndexWriter memory"); - - // TODO: should IW have an API to move segments to disk, but not refresh? Its flush method is protected... - try { - refresh("update index buffer"); - } catch (Throwable e) { - logger.warn("failed to refresh after decreasing index buffer", e); - } - } else { - logger.debug(message); - } - } - - engine.getTranslog().updateBuffer(shardTranslogBufferSize); - } - - /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last - * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true - * if the shard is inactive. */ - public boolean checkIdle(long inactiveTimeNS) { - if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) { - boolean wasActive = active.getAndSet(false); - if (wasActive) { - updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); - logger.debug("shard is now inactive"); - indicesLifecycle.onShardInactive(this); - } - } - - return active.get() == false; - } - - /** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link - * IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */ - public boolean getActive() { - return active.get(); + return engine.indexBufferRAMBytesUsed(); } public final boolean isFlushOnClose() { @@ -1163,10 +1079,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett config.setCompoundOnFlush(compoundOnFlush); change = true; } - final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting()); - if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) { - config.setVersionMapSizeSetting(versionMapSize); - } final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount()); if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) { @@ -1219,6 +1131,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett return percolatorQueriesRegistry.stats(); } + /** + * Asynchronously refreshes the engine for new search operations to reflect the latest + * changes. + */ + public void refreshAsync(final String reason) { + // nocommit this really is async??? + engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() { + @Override + public void run() { + try { + refresh(reason); + } catch (EngineClosedException ex) { + // ignore + } + } + }); + } + class EngineRefresher implements Runnable { @Override public void run() { diff --git a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java index 0e84c73f47a..05b5b917879 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java @@ -116,18 +116,6 @@ public final class BufferingTranslogWriter extends TranslogWriter { } } - - public void updateBufferSize(int bufferSize) { - try (ReleasableLock lock = writeLock.acquire()) { - if (this.buffer.length != bufferSize) { - flush(); - this.buffer = new byte[bufferSize]; - } - } catch (IOException e) { - throw new TranslogException(shardId, "failed to flush", e); - } - } - class WrapperOutputStream extends OutputStream { @Override diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 4265d611fbf..64c9c456611 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -252,13 +252,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC throw new IllegalArgumentException("can't parse id from file: " + fileName); } - public void updateBuffer(ByteSizeValue bufferSize) { - config.setBufferSize(bufferSize.bytesAsInt()); - try (ReleasableLock lock = writeLock.acquire()) { - current.updateBufferSize(config.getBufferSize()); - } - } - boolean isOpen() { return closed.get() == false; } @@ -335,7 +328,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC TranslogWriter createWriter(long fileGeneration) throws IOException { TranslogWriter newFile; try { - newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize()); + newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSizeBytes()); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index 30ab8144e1e..c831eb5aafb 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -48,7 +48,7 @@ public final class TranslogConfig { private final BigArrays bigArrays; private final ThreadPool threadPool; private final boolean syncOnEachOperation; - private volatile int bufferSize; + private final int bufferSizeBytes; private volatile TranslogGeneration translogGeneration; private volatile Translog.Durabilty durabilty = Translog.Durabilty.REQUEST; private volatile TranslogWriter.Type type; @@ -73,7 +73,7 @@ public final class TranslogConfig { this.threadPool = threadPool; this.bigArrays = bigArrays; this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name())); - this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER).bytes(); // Not really interesting, updated by IndexingMemoryController... + this.bufferSizeBytes = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.SHARD_TRANSLOG_BUFFER).bytes(); syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5)); if (syncInterval.millis() > 0 && threadPool != null) { @@ -130,15 +130,8 @@ public final class TranslogConfig { /** * Retruns the current translog buffer size. */ - public int getBufferSize() { - return bufferSize; - } - - /** - * Sets the current buffer size - for setting a live setting use {@link Translog#updateBuffer(ByteSizeValue)} - */ - public void setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; + public int getBufferSizeBytes() { + return bufferSizeBytes; } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 2290dd69d87..dee4c55b06a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -80,7 +80,7 @@ public class TranslogWriter extends TranslogReader { writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE); final TranslogWriter writer = type.create(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize); return writer; - } catch (Throwable throwable){ + } catch (Throwable throwable) { IOUtils.closeWhileHandlingException(channel); try { Files.delete(file); // remove the file as well diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 90bb4c41a38..aebaeccf483 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -19,6 +19,9 @@ package org.elasticsearch.indices.memory; +import java.util.*; +import java.util.concurrent.ScheduledFuture; + import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -37,9 +40,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; -import java.util.*; -import java.util.concurrent.ScheduledFuture; - public class IndexingMemoryController extends AbstractLifecycleComponent { /** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */ @@ -51,51 +51,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponentindices.memory.index_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size"; - /** Sets a floor on the per-shard index buffer size (default: 4 MB). */ - public static final String MIN_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_shard_index_buffer_size"; + /** How frequently we check indexing memory usage (default: 5 seconds). */ + public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval"; - /** Sets a ceiling on the per-shard index buffer size (default: 512 MB). */ - public static final String MAX_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_shard_index_buffer_size"; - - /** How much heap (% or bytes) we will share across all actively indexing shards for the translog buffer (default: 1%). */ - public static final String TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.translog_buffer_size"; - - /** Only applies when indices.memory.translog_buffer_size is a %, to set a floor on the actual size in bytes (default: 256 KB). */ - public static final String MIN_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_translog_buffer_size"; - - /** Only applies when indices.memory.translog_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ - public static final String MAX_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_translog_buffer_size"; - - /** Sets a floor on the per-shard translog buffer size (default: 2 KB). */ - public static final String MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_shard_translog_buffer_size"; - - /** Sets a ceiling on the per-shard translog buffer size (default: 64 KB). */ - public static final String MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_shard_translog_buffer_size"; - - /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */ - public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time"; - - /** How frequently we check shards to find inactive ones (default: 30 seconds). */ - public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval"; - - /** Once a shard becomes inactive, we reduce the {@code IndexWriter} buffer to this value (500 KB) to let active shards use the heap instead. */ - public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER"); - - /** Once a shard becomes inactive, we reduce the {@code Translog} buffer to this value (1 KB) to let active shards use the heap instead. */ - public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER"); + /** Hardwired translog buffer size */ + public static final ByteSizeValue SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("32kb", "SHARD_TRANSLOG_BUFFER"); private final ThreadPool threadPool; private final IndicesService indicesService; private final ByteSizeValue indexingBuffer; - private final ByteSizeValue minShardIndexBufferSize; - private final ByteSizeValue maxShardIndexBufferSize; - private final ByteSizeValue translogBuffer; - private final ByteSizeValue minShardTranslogBufferSize; - private final ByteSizeValue maxShardTranslogBufferSize; - - private final TimeValue inactiveTime; private final TimeValue interval; private volatile ScheduledFuture scheduler; @@ -134,43 +100,13 @@ public class IndexingMemoryController extends AbstractLifecycleComponent maxTranslogBuffer.bytes()) { - translogBuffer = maxTranslogBuffer; - } - } else { - translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, TRANSLOG_BUFFER_SIZE_SETTING); - } - this.translogBuffer = translogBuffer; - this.minShardTranslogBufferSize = this.settings.getAsBytesSize(MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(2, ByteSizeUnit.KB)); - this.maxShardTranslogBufferSize = this.settings.getAsBytesSize(MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(64, ByteSizeUnit.KB)); - - this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)); - // we need to have this relatively small to move a shard from inactive to active fast (enough) - this.interval = this.settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30)); + // we need to have this relatively small to free up heap quickly enough + this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(5)); this.statusChecker = new ShardsIndicesStatusChecker(); - logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]", - this.indexingBuffer, - MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize, - MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize, - SHARD_INACTIVE_TIME_SETTING, this.inactiveTime, - SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval); + logger.debug("using indexing buffer size [{}] with {} [{}]", this.indexingBuffer, SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval); } @Override @@ -197,15 +133,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent availableShards() { ArrayList list = new ArrayList<>(); @@ -224,6 +151,24 @@ public class IndexingMemoryController extends AbstractLifecycleComponent { + final long bytesUsed; + final ShardId shardId; - // True if the shard was active last time we checked - private final Map shardWasActive = new HashMap<>(); + public ShardAndBytesUsed(long bytesUsed, ShardId shardId) { + this.bytesUsed = bytesUsed; + this.shardId = shardId; + } + + @Override + public int compareTo(ShardAndBytesUsed other) { + // Sort larger shards first: + return Long.compare(other.bytesUsed, bytesUsed); + } + } + + class ShardsIndicesStatusChecker implements Runnable { @Override public synchronized void run() { - EnumSet changes = purgeDeletedAndClosedShards(); - updateShardStatuses(changes); - - if (changes.isEmpty() == false) { - // Something changed: recompute indexing buffers: - calcAndSetShardBuffers("[" + changes + "]"); - } - } - - /** - * goes through all existing shards and check whether there are changes in their active status - */ - private void updateShardStatuses(EnumSet changes) { + // Fast check to sum up how much heap all shards' indexing buffers are using now: + long totalBytesUsed = 0; for (ShardId shardId : availableShards()) { - - // Is the shard active now? - Boolean isActive = getShardActive(shardId); - - if (isActive == null) { - // shard was closed.. - continue; + long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId); + if (shardBytesUsed > 0) { + totalBytesUsed += shardBytesUsed; } + } - // Was the shard active last time we checked? - Boolean wasActive = shardWasActive.get(shardId); + if (totalBytesUsed > indexingBuffer.bytes()) { + // OK we are using too much; make a queue and ask largest shard(s) to refresh: + logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer); - if (wasActive == null) { - // First time we are seeing this shard - shardWasActive.put(shardId, isActive); - changes.add(ShardStatusChangeType.ADDED); - } else if (isActive) { - // Shard is active now - if (wasActive == false) { - // Shard became active itself, since we last checked (due to new indexing op arriving) - changes.add(ShardStatusChangeType.BECAME_ACTIVE); - logger.debug("marking shard {} as active indexing wise", shardId); - shardWasActive.put(shardId, true); - } else if (checkIdle(shardId, inactiveTime.nanos()) == Boolean.TRUE) { - // Make shard inactive now - changes.add(ShardStatusChangeType.BECAME_INACTIVE); - logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise", - shardId, - inactiveTime); - shardWasActive.put(shardId, false); + PriorityQueue queue = new PriorityQueue<>(); + for (ShardId shardId : availableShards()) { + long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId); + if (shardBytesUsed > 0) { + queue.add(new ShardAndBytesUsed(shardBytesUsed, shardId)); } } - } - } - /** - * purge any existing statuses that are no longer updated - * - * @return the changes applied - */ - private EnumSet purgeDeletedAndClosedShards() { - EnumSet changes = EnumSet.noneOf(ShardStatusChangeType.class); - - Iterator statusShardIdIterator = shardWasActive.keySet().iterator(); - while (statusShardIdIterator.hasNext()) { - ShardId shardId = statusShardIdIterator.next(); - if (shardAvailable(shardId) == false) { - changes.add(ShardStatusChangeType.DELETED); - statusShardIdIterator.remove(); - } - } - return changes; - } - - private void calcAndSetShardBuffers(String reason) { - - // Count how many shards are now active: - int activeShardCount = 0; - for (Map.Entry ent : shardWasActive.entrySet()) { - if (ent.getValue()) { - activeShardCount++; - } - } - - // TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard - // is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not - // get the same indexing buffer as large indices. But it quickly gets tricky... - if (activeShardCount == 0) { - logger.debug("no active shards (reason={})", reason); - return; - } - - ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShardCount); - if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) { - shardIndexingBufferSize = minShardIndexBufferSize; - } - if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) { - shardIndexingBufferSize = maxShardIndexBufferSize; - } - - ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShardCount); - if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) { - shardTranslogBufferSize = minShardTranslogBufferSize; - } - if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) { - shardTranslogBufferSize = maxShardTranslogBufferSize; - } - - logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize); - - for (Map.Entry ent : shardWasActive.entrySet()) { - if (ent.getValue()) { - // This shard is active - updateShardBuffers(ent.getKey(), shardIndexingBufferSize, shardTranslogBufferSize); + while (totalBytesUsed > indexingBuffer.bytes() && queue.isEmpty() == false) { + ShardAndBytesUsed largest = queue.poll(); + logger.debug("refresh shard [{}] to free up its [{}] indexing buffer", largest.shardId, new ByteSizeValue(largest.bytesUsed)); + refreshShardAsync(largest.shardId); + totalBytesUsed -= largest.bytesUsed; } } } } - - protected long currentTimeInNanos() { - return System.nanoTime(); - } - - /** ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so. returns Boolean.TRUE if - * it did deactive, Boolean.FALSE if it did not, and null if the shard is unknown */ - protected Boolean checkIdle(ShardId shardId, long inactiveTimeNS) { - String ignoreReason = null; - final IndexShard shard = getShard(shardId); - if (shard != null) { - try { - return shard.checkIdle(inactiveTimeNS); - } catch (EngineClosedException e) { - // ignore - ignoreReason = "EngineClosedException"; - } catch (FlushNotAllowedEngineException e) { - // ignore - ignoreReason = "FlushNotAllowedEngineException"; - } - } else { - ignoreReason = "shard not found"; - } - if (ignoreReason != null) { - logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId); - } - return null; - } - - private static enum ShardStatusChangeType { - ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE - } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTests.java index 1ed022dbefa..1d4eb6159e7 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTests.java @@ -44,8 +44,6 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase { // VERSION MAP SIZE long indexBufferSize = engine.config().getIndexingBufferSize().bytes(); - long versionMapSize = engine.config().getVersionMapSize().bytes(); - assertThat(versionMapSize, equalTo((long) (indexBufferSize * 0.25))); final int iters = between(1, 20); for (int i = 0; i < iters; i++) { @@ -55,15 +53,9 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase { // the full long range here else the assert below fails: long gcDeletes = random().nextLong() & (Long.MAX_VALUE >> 11); - boolean versionMapAsPercent = randomBoolean(); - double versionMapPercent = randomIntBetween(0, 100); - long versionMapSizeInMB = randomIntBetween(10, 20); - String versionMapString = versionMapAsPercent ? versionMapPercent + "%" : versionMapSizeInMB + "mb"; - Settings build = Settings.builder() .put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush) .put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes, TimeUnit.MILLISECONDS) - .put(EngineConfig.INDEX_VERSION_MAP_SIZE, versionMapString) .build(); assertEquals(gcDeletes, build.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, null).millis()); @@ -77,12 +69,6 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase { assertEquals(engine.getGcDeletesInMillis(), gcDeletes); indexBufferSize = engine.config().getIndexingBufferSize().bytes(); - versionMapSize = engine.config().getVersionMapSize().bytes(); - if (versionMapAsPercent) { - assertThat(versionMapSize, equalTo((long) (indexBufferSize * (versionMapPercent / 100)))); - } else { - assertThat(versionMapSize, equalTo(1024 * 1024 * versionMapSizeInMB)); - } } Settings settings = Settings.builder() @@ -107,37 +93,5 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase { client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get(); assertEquals(engine.getGcDeletesInMillis(), 1000); assertTrue(engine.config().isEnableGcDeletes()); - - settings = Settings.builder() - .put(EngineConfig.INDEX_VERSION_MAP_SIZE, "sdfasfd") - .build(); - try { - client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get(); - fail("settings update didn't fail, but should have"); - } catch (IllegalArgumentException e) { - // good - } - - settings = Settings.builder() - .put(EngineConfig.INDEX_VERSION_MAP_SIZE, "-12%") - .build(); - try { - client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get(); - fail("settings update didn't fail, but should have"); - } catch (IllegalArgumentException e) { - // good - } - - settings = Settings.builder() - .put(EngineConfig.INDEX_VERSION_MAP_SIZE, "130%") - .build(); - try { - client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get(); - fail("settings update didn't fail, but should have"); - } catch (IllegalArgumentException e) { - // good - } } - - } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5f6e1db42b9..a224bc87fec 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -444,7 +444,7 @@ public class InternalEngineTests extends ESTestCase { @Test public void testSegmentsWithMergeFlag() throws Exception { try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); engine.index(index); @@ -779,7 +779,7 @@ public class InternalEngineTests extends ESTestCase { public void testSyncedFlush() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new LogByteSizeMergePolicy()), false)) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); @@ -985,7 +985,7 @@ public class InternalEngineTests extends ESTestCase { public void testForceMerge() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { @@ -1435,7 +1435,7 @@ public class InternalEngineTests extends ESTestCase { @Test public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { engine.config().setEnableGcDeletes(false); // Add document @@ -1562,10 +1562,11 @@ public class InternalEngineTests extends ESTestCase { // #10312 @Test public void testDeletesAloneCanTriggerRefresh() throws Exception { + Settings settings = Settings.builder() + .put(defaultSettings) + .put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb").build(); try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), - false)) { - engine.config().setIndexingBufferSize(new ByteSizeValue(1, ByteSizeUnit.KB)); + Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { for (int i = 0; i < 100; i++) { String id = Integer.toString(i); ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b891219d636..ce3e4e51aba 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -339,8 +339,6 @@ public class IndexShardTests extends ESSingleNodeTestCase { client().prepareIndex("test", "test").setSource("{}").get(); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); - Boolean result = indicesService.indexService("test").getShardOrNull(0).checkIdle(0); - assertEquals(Boolean.TRUE, result); assertBusy(new Runnable() { // should be very very quick @Override public void run() { diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java deleted file mode 100644 index e14cc226646..00000000000 --- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.memory; - -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.node.internal.InternalSettingsPreparer; -import org.elasticsearch.test.ESIntegTestCase; -import org.junit.Test; - - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) -public class IndexingMemoryControllerIT extends ESIntegTestCase { - - private long getIWBufferSize(String indexName) { - return client().admin().indices().prepareStats(indexName).get().getTotal().getSegments().getIndexWriterMaxMemoryInBytes(); - } - - @Test - public void testIndexBufferPushedToEngine() throws InterruptedException { - createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100000h", - IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb", - IndexShard.INDEX_REFRESH_INTERVAL, "-1").build()); - - // Create two active indices, sharing 32 MB indexing buffer: - prepareCreate("test3").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - prepareCreate("test4").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - - ensureGreen(); - - index("test3", "type", "1", "f", 1); - index("test4", "type", "1", "f", 1); - - // .. then make sure we really pushed the update (16 MB for each) down to the IndexWriter, even if refresh nor flush occurs: - if (awaitBusy(() -> getIWBufferSize("test3") == 16*1024*1024) == false) { - fail("failed to update shard indexing buffer size for test3 index to 16 MB; got: " + getIWBufferSize("test3")); - } - if (awaitBusy(() -> getIWBufferSize("test4") == 16*1024*1024) == false) { - fail("failed to update shard indexing buffer size for test4 index to 16 MB; got: " + getIWBufferSize("test4")); - } - - client().admin().indices().prepareDelete("test4").get(); - if (awaitBusy(() -> getIWBufferSize("test3") == 32 * 1024 * 1024) == false) { - fail("failed to update shard indexing buffer size for test3 index to 32 MB; got: " + getIWBufferSize("test4")); - } - - } - - @Test - public void testInactivePushedToShard() throws InterruptedException { - createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms", - IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms", - IndexShard.INDEX_REFRESH_INTERVAL, "-1").build()); - - // Create two active indices, sharing 32 MB indexing buffer: - prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - - ensureGreen(); - - index("test1", "type", "1", "f", 1); - - // make shard the shard buffer was set to inactive size - final ByteSizeValue inactiveBuffer = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER; - if (awaitBusy(() -> getIWBufferSize("test1") == inactiveBuffer.bytes()) == false) { - fail("failed to update shard indexing buffer size for test1 index to [" + inactiveBuffer + "]; got: " + getIWBufferSize("test1")); - } - } - - private void createNode(Settings settings) { - internalCluster().startNode(Settings.builder() - .put(ClusterName.SETTING, "IndexingMemoryControllerIT") - .put("node.name", "IndexingMemoryControllerIT") - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(EsExecutors.PROCESSORS, 1) // limit the number of threads created - .put("http.enabled", false) - .put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true) // make sure we get what we set :) - .put(IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms") - .put(settings) - ); - } -} diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java index d3d9e961a61..033d625a26d 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -41,115 +41,80 @@ public class IndexingMemoryControllerTests extends ESTestCase { static class MockController extends IndexingMemoryController { - final static ByteSizeValue INACTIVE = new ByteSizeValue(-1); - - final Map indexingBuffers = new HashMap<>(); - final Map translogBuffers = new HashMap<>(); - - final Map lastIndexTimeNanos = new HashMap<>(); - final Set activeShards = new HashSet<>(); - - long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds(); + final Map indexBufferRAMBytesUsed = new HashMap<>(); public MockController(Settings settings) { super(Settings.builder() - .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it - .put(SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate + .put(SHARD_MEMORY_INTERVAL_TIME_SETTING, "200h") // disable it .put(settings) .build(), null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb } - public void deleteShard(ShardId id) { - indexingBuffers.remove(id); - translogBuffers.remove(id); - } - - public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) { - assertThat(indexingBuffers.get(id), equalTo(indexing)); - assertThat(translogBuffers.get(id), equalTo(translog)); - } - - public void assertInActive(ShardId id) { - assertThat(indexingBuffers.get(id), equalTo(INACTIVE)); - assertThat(translogBuffers.get(id), equalTo(INACTIVE)); - } - - @Override - protected long currentTimeInNanos() { - return TimeValue.timeValueSeconds(currentTimeSec).nanos(); + public void deleteShard(ShardId shardId) { + indexBufferRAMBytesUsed.remove(shardId); } @Override protected List availableShards() { - return new ArrayList<>(indexingBuffers.keySet()); + return new ArrayList<>(indexBufferRAMBytesUsed.keySet()); } @Override protected boolean shardAvailable(ShardId shardId) { - return indexingBuffers.containsKey(shardId); + return indexBufferRAMBytesUsed.containsKey(shardId); } @Override - protected Boolean getShardActive(ShardId shardId) { - return activeShards.contains(shardId); - } - - @Override - protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { - indexingBuffers.put(shardId, shardIndexingBufferSize); - translogBuffers.put(shardId, shardTranslogBufferSize); - } - - @Override - protected Boolean checkIdle(ShardId shardId, long inactiveTimeNS) { - Long ns = lastIndexTimeNanos.get(shardId); - if (ns == null) { - return null; - } else if (currentTimeInNanos() - ns >= inactiveTimeNS) { - indexingBuffers.put(shardId, INACTIVE); - translogBuffers.put(shardId, INACTIVE); - activeShards.remove(shardId); - return true; + protected long getIndexBufferRAMBytesUsed(ShardId shardId) { + Long used = indexBufferRAMBytesUsed.get(shardId); + if (used == null) { + return 0; } else { - return false; + return used; } } - public void incrementTimeSec(int sec) { - currentTimeSec += sec; + @Override + public void refreshShardAsync(ShardId shardId) { + indexBufferRAMBytesUsed.put(shardId, 0L); + } + + public void assertBuffer(ShardId shardId, ByteSizeValue expected) { + Long actual = indexBufferRAMBytesUsed.get(shardId); + assertEquals(expected.bytes(), actual.longValue()); } public void simulateIndexing(ShardId shardId) { - lastIndexTimeNanos.put(shardId, currentTimeInNanos()); - if (indexingBuffers.containsKey(shardId) == false) { - // First time we are seeing this shard; start it off with inactive buffers as IndexShard does: - indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER); - translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); + Long bytes = indexBufferRAMBytesUsed.get(shardId); + if (bytes == null) { + bytes = 0L; } - activeShards.add(shardId); + // Each doc we index takes up a megabyte! + bytes += 1024*1024; + indexBufferRAMBytesUsed.put(shardId, bytes); forceCheck(); } } public void testShardAdditionAndRemoval() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build()); final ShardId shard1 = new ShardId("test", 1); controller.simulateIndexing(shard1); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB)); + // add another shard final ShardId shard2 = new ShardId("test", 2); controller.simulateIndexing(shard2); - controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB)); + controller.assertBuffer(shard2, new ByteSizeValue(1, ByteSizeUnit.MB)); // remove first shard controller.deleteShard(shard1); controller.forceCheck(); - controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + controller.assertBuffer(shard2, new ByteSizeValue(1, ByteSizeUnit.MB)); // remove second shard controller.deleteShard(shard2); @@ -158,116 +123,62 @@ public class IndexingMemoryControllerTests extends ESTestCase { // add a new one final ShardId shard3 = new ShardId("test", 3); controller.simulateIndexing(shard3); - controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + controller.assertBuffer(shard3, new ByteSizeValue(1, ByteSizeUnit.MB)); } public void testActiveInactive() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") - .put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "5s") + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "5mb") .build()); final ShardId shard1 = new ShardId("test", 1); controller.simulateIndexing(shard1); final ShardId shard2 = new ShardId("test", 2); controller.simulateIndexing(shard2); - controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB)); + controller.assertBuffer(shard2, new ByteSizeValue(1, ByteSizeUnit.MB)); - // index into both shards, move the clock and see that they are still active controller.simulateIndexing(shard1); controller.simulateIndexing(shard2); - controller.incrementTimeSec(10); - controller.forceCheck(); + controller.assertBuffer(shard1, new ByteSizeValue(2, ByteSizeUnit.MB)); + controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB)); - // both shards now inactive - controller.assertInActive(shard1); - controller.assertInActive(shard2); - - // index into one shard only, see it becomes active + // index into one shard only, hits the 5mb limit, so shard1 is refreshed controller.simulateIndexing(shard1); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); - controller.assertInActive(shard2); + controller.assertBuffer(shard1, new ByteSizeValue(0, ByteSizeUnit.MB)); + controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB)); - controller.incrementTimeSec(3); // increment but not enough to become inactive - controller.forceCheck(); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); - controller.assertInActive(shard2); - - controller.incrementTimeSec(3); // increment some more - controller.forceCheck(); - controller.assertInActive(shard1); - controller.assertInActive(shard2); - - // index some and shard becomes immediately active controller.simulateIndexing(shard2); - controller.assertInActive(shard1); - controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); - } - - public void testMinShardBufferSizes() { - MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") - .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); - - assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB)); - } - - public void testMaxShardBufferSizes() { - MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") - .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") - .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); - - assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB)); + controller.simulateIndexing(shard2); + controller.assertBuffer(shard2, new ByteSizeValue(4, ByteSizeUnit.MB)); + controller.simulateIndexing(shard2); + // shard2 used up the full 5 mb and is now cleared: + controller.assertBuffer(shard2, new ByteSizeValue(0, ByteSizeUnit.MB)); } public void testRelativeBufferSizes() { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") .build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB))); - assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); } public void testMinBufferSizes() { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") - .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb").build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); - assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); } public void testMaxBufferSizes() { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") - .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb").build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); - assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); } - - protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) { - final ShardId shard1 = new ShardId("test", 1); - controller.simulateIndexing(shard1); - final ShardId shard2 = new ShardId("test", 2); - controller.simulateIndexing(shard2); - controller.assertBuffers(shard1, indexBufferSize, translogBufferSize); - controller.assertBuffers(shard2, indexBufferSize, translogBufferSize); - - } - }