Merge pull request #13918 from mikemccand/immediate_shard_active
When shard becomes active again, immediately increase its indexing buffer instead of waiting for up to 30 seconds while indexing with a tiny (500 KB) indexing buffer.
This commit is contained in:
commit
9688e86b38
|
@ -16,6 +16,7 @@
|
||||||
* specific language governing permissions and limitations
|
* specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.index;
|
package org.elasticsearch.index;
|
||||||
|
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.index.termvectors.TermVectorsService;
|
||||||
import org.elasticsearch.indices.IndicesLifecycle;
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
import org.elasticsearch.indices.IndicesWarmer;
|
import org.elasticsearch.indices.IndicesWarmer;
|
||||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||||
|
import org.elasticsearch.indices.memory.IndexingMemoryController;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -58,9 +60,10 @@ public final class IndexServicesProvider {
|
||||||
private final EngineFactory factory;
|
private final EngineFactory factory;
|
||||||
private final BigArrays bigArrays;
|
private final BigArrays bigArrays;
|
||||||
private final IndexSearcherWrapper indexSearcherWrapper;
|
private final IndexSearcherWrapper indexSearcherWrapper;
|
||||||
|
private final IndexingMemoryController indexingMemoryController;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper) {
|
public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingMemoryController indexingMemoryController) {
|
||||||
this.indicesLifecycle = indicesLifecycle;
|
this.indicesLifecycle = indicesLifecycle;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.mapperService = mapperService;
|
this.mapperService = mapperService;
|
||||||
|
@ -76,6 +79,7 @@ public final class IndexServicesProvider {
|
||||||
this.factory = factory;
|
this.factory = factory;
|
||||||
this.bigArrays = bigArrays;
|
this.bigArrays = bigArrays;
|
||||||
this.indexSearcherWrapper = indexSearcherWrapper;
|
this.indexSearcherWrapper = indexSearcherWrapper;
|
||||||
|
this.indexingMemoryController = indexingMemoryController;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndicesLifecycle getIndicesLifecycle() {
|
public IndicesLifecycle getIndicesLifecycle() {
|
||||||
|
@ -134,5 +138,11 @@ public final class IndexServicesProvider {
|
||||||
return bigArrays;
|
return bigArrays;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexSearcherWrapper getIndexSearcherWrapper() { return indexSearcherWrapper; }
|
public IndexSearcherWrapper getIndexSearcherWrapper() {
|
||||||
|
return indexSearcherWrapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IndexingMemoryController getIndexingMemoryController() {
|
||||||
|
return indexingMemoryController;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -365,6 +365,9 @@ public abstract class Engine implements Closeable {
|
||||||
stats.addIndexWriterMaxMemoryInBytes(0);
|
stats.addIndexWriterMaxMemoryInBytes(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** How much heap Lucene's IndexWriter is using */
|
||||||
|
abstract public long indexWriterRAMBytesUsed();
|
||||||
|
|
||||||
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
|
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
Map<String, Segment> segments = new HashMap<>();
|
Map<String, Segment> segments = new HashMap<>();
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.TranslogConfig;
|
import org.elasticsearch.index.translog.TranslogConfig;
|
||||||
import org.elasticsearch.indices.IndicesWarmer;
|
import org.elasticsearch.indices.IndicesWarmer;
|
||||||
|
import org.elasticsearch.indices.memory.IndexingMemoryController;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -107,8 +108,6 @@ public final class EngineConfig {
|
||||||
|
|
||||||
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
|
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
|
||||||
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
|
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
|
||||||
public static final ByteSizeValue DEFAULT_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
|
|
||||||
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER");
|
|
||||||
|
|
||||||
public static final String DEFAULT_VERSION_MAP_SIZE = "25%";
|
public static final String DEFAULT_VERSION_MAP_SIZE = "25%";
|
||||||
|
|
||||||
|
@ -139,7 +138,8 @@ public final class EngineConfig {
|
||||||
this.failedEngineListener = failedEngineListener;
|
this.failedEngineListener = failedEngineListener;
|
||||||
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
|
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
|
||||||
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
|
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
|
||||||
indexingBufferSize = DEFAULT_INDEX_BUFFER_SIZE;
|
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
|
||||||
|
indexingBufferSize = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
|
||||||
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
|
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
|
||||||
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
|
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
|
||||||
updateVersionMapSize();
|
updateVersionMapSize();
|
||||||
|
|
|
@ -825,6 +825,11 @@ public class InternalEngine extends Engine {
|
||||||
stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB() * 1024 * 1024));
|
stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB() * 1024 * 1024));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long indexWriterRAMBytesUsed() {
|
||||||
|
return indexWriter.ramBytesUsed();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Segment> segments(boolean verbose) {
|
public List<Segment> segments(boolean verbose) {
|
||||||
try (ReleasableLock lock = readLock.acquire()) {
|
try (ReleasableLock lock = readLock.acquire()) {
|
||||||
|
|
|
@ -240,4 +240,9 @@ public class ShadowEngine extends Engine {
|
||||||
return lastCommittedSegmentInfos;
|
return lastCommittedSegmentInfos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long indexWriterRAMBytesUsed() {
|
||||||
|
// No IndexWriter
|
||||||
|
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.metrics.MeanMetric;
|
import org.elasticsearch.common.metrics.MeanMetric;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -83,8 +84,8 @@ import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||||
import org.elasticsearch.index.similarity.SimilarityService;
|
import org.elasticsearch.index.similarity.SimilarityService;
|
||||||
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||||
import org.elasticsearch.index.store.Store;
|
|
||||||
import org.elasticsearch.index.store.Store.MetadataSnapshot;
|
import org.elasticsearch.index.store.Store.MetadataSnapshot;
|
||||||
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||||
import org.elasticsearch.index.store.StoreStats;
|
import org.elasticsearch.index.store.StoreStats;
|
||||||
import org.elasticsearch.index.suggest.stats.ShardSuggestMetric;
|
import org.elasticsearch.index.suggest.stats.ShardSuggestMetric;
|
||||||
|
@ -99,6 +100,7 @@ import org.elasticsearch.index.warmer.WarmerStats;
|
||||||
import org.elasticsearch.indices.IndicesWarmer;
|
import org.elasticsearch.indices.IndicesWarmer;
|
||||||
import org.elasticsearch.indices.InternalIndicesLifecycle;
|
import org.elasticsearch.indices.InternalIndicesLifecycle;
|
||||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||||
|
import org.elasticsearch.indices.memory.IndexingMemoryController;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
import org.elasticsearch.percolator.PercolatorService;
|
import org.elasticsearch.percolator.PercolatorService;
|
||||||
|
@ -117,6 +119,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
|
||||||
public class IndexShard extends AbstractIndexShardComponent implements IndexSettingsService.Listener {
|
public class IndexShard extends AbstractIndexShardComponent implements IndexSettingsService.Listener {
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
@ -189,6 +192,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
|
|
||||||
private final IndexSearcherWrapper searcherWrapper;
|
private final IndexSearcherWrapper searcherWrapper;
|
||||||
|
|
||||||
|
/** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
|
||||||
|
* IndexingMemoryController}). */
|
||||||
|
private final AtomicBoolean active = new AtomicBoolean();
|
||||||
|
|
||||||
|
private volatile long lastWriteNS;
|
||||||
|
private final IndexingMemoryController indexingMemoryController;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) {
|
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
|
@ -241,11 +251,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
|
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
|
||||||
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
|
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
|
||||||
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
|
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
|
||||||
|
this.indexingMemoryController = provider.getIndexingMemoryController();
|
||||||
|
|
||||||
this.searcherWrapper = provider.getIndexSearcherWrapper();
|
this.searcherWrapper = provider.getIndexSearcherWrapper();
|
||||||
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, mapperService, indexFieldDataService);
|
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, mapperService, indexFieldDataService);
|
||||||
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
|
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
|
||||||
percolatorQueriesRegistry.enableRealTimePercolator();
|
percolatorQueriesRegistry.enableRealTimePercolator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We start up inactive
|
||||||
|
active.set(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Store store() {
|
public Store store() {
|
||||||
|
@ -447,7 +462,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
* updated.
|
* updated.
|
||||||
*/
|
*/
|
||||||
public boolean index(Engine.Index index) {
|
public boolean index(Engine.Index index) {
|
||||||
writeAllowed(index.origin());
|
ensureWriteAllowed(index);
|
||||||
|
markLastWrite(index);
|
||||||
index = indexingService.preIndex(index);
|
index = indexingService.preIndex(index);
|
||||||
final boolean created;
|
final boolean created;
|
||||||
try {
|
try {
|
||||||
|
@ -471,7 +487,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
}
|
}
|
||||||
|
|
||||||
public void delete(Engine.Delete delete) {
|
public void delete(Engine.Delete delete) {
|
||||||
writeAllowed(delete.origin());
|
ensureWriteAllowed(delete);
|
||||||
|
markLastWrite(delete);
|
||||||
delete = indexingService.preDelete(delete);
|
delete = indexingService.preDelete(delete);
|
||||||
try {
|
try {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -881,7 +898,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException {
|
/** Returns timestamp of last indexing operation */
|
||||||
|
public long getLastWriteNS() {
|
||||||
|
return lastWriteNS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
|
||||||
|
private void markLastWrite(Engine.Operation op) {
|
||||||
|
lastWriteNS = op.startTime();
|
||||||
|
if (active.getAndSet(true) == false) {
|
||||||
|
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
|
||||||
|
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
|
||||||
|
// be low, and it's rare this happens.
|
||||||
|
indexingMemoryController.forceCheck();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
|
||||||
|
Engine.Operation.Origin origin = op.origin();
|
||||||
IndexShardState state = this.state; // one time volatile read
|
IndexShardState state = this.state; // one time volatile read
|
||||||
|
|
||||||
if (origin == Engine.Operation.Origin.PRIMARY) {
|
if (origin == Engine.Operation.Origin.PRIMARY) {
|
||||||
|
@ -943,6 +977,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
this.failedEngineListener.delegates.add(failedEngineListener);
|
this.failedEngineListener.delegates.add(failedEngineListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
|
||||||
|
* the new buffering indexing size then we do a refresh to free up the heap. */
|
||||||
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
||||||
|
|
||||||
final EngineConfig config = engineConfig;
|
final EngineConfig config = engineConfig;
|
||||||
|
@ -961,28 +997,51 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
// so we push changes these changes down to IndexWriter:
|
// so we push changes these changes down to IndexWriter:
|
||||||
engine.onSettingsChanged();
|
engine.onSettingsChanged();
|
||||||
|
|
||||||
if (shardIndexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) {
|
long iwBytesUsed = engine.indexWriterRAMBytesUsed();
|
||||||
// it's inactive: make sure we do a refresh / full IW flush in this case, since the memory
|
|
||||||
// changes only after a "data" change has happened to the writer
|
String message = LoggerMessageFormat.format("updating index_buffer_size from [{}] to [{}]; IndexWriter now using [{}] bytes",
|
||||||
// the index writer lazily allocates memory and a refresh will clean it all up.
|
preValue, shardIndexingBufferSize, iwBytesUsed);
|
||||||
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, shardIndexingBufferSize);
|
|
||||||
|
if (iwBytesUsed > shardIndexingBufferSize.bytes()) {
|
||||||
|
// our allowed buffer was changed to less than we are currently using; we ask IW to refresh
|
||||||
|
// so it clears its buffers (otherwise it won't clear until the next indexing/delete op)
|
||||||
|
logger.debug(message + "; now refresh to clear IndexWriter memory");
|
||||||
|
|
||||||
|
// TODO: should IW have an API to move segments to disk, but not refresh? Its flush method is protected...
|
||||||
try {
|
try {
|
||||||
refresh("update index buffer");
|
refresh("update index buffer");
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
logger.warn("failed to refresh after setting shard to inactive", e);
|
logger.warn("failed to refresh after decreasing index buffer", e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, shardIndexingBufferSize);
|
logger.debug(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
engine.getTranslog().updateBuffer(shardTranslogBufferSize);
|
engine.getTranslog().updateBuffer(shardTranslogBufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void markAsInactive() {
|
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
|
||||||
updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, TranslogConfig.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
|
||||||
|
* if the shard is inactive. */
|
||||||
|
public boolean checkIdle(long inactiveTimeNS) {
|
||||||
|
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) {
|
||||||
|
boolean wasActive = active.getAndSet(false);
|
||||||
|
if (wasActive) {
|
||||||
|
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
||||||
|
logger.debug("shard is now inactive");
|
||||||
indicesLifecycle.onShardInactive(this);
|
indicesLifecycle.onShardInactive(this);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return active.get() == false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
|
||||||
|
* IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */
|
||||||
|
public boolean getActive() {
|
||||||
|
return active.get();
|
||||||
|
}
|
||||||
|
|
||||||
public final boolean isFlushOnClose() {
|
public final boolean isFlushOnClose() {
|
||||||
return flushOnClose;
|
return flushOnClose;
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.index.shard;
|
package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.IndexServicesProvider;
|
import org.elasticsearch.index.IndexServicesProvider;
|
||||||
|
@ -28,8 +30,6 @@ import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.TranslogStats;
|
import org.elasticsearch.index.translog.TranslogStats;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ShadowIndexShard extends {@link IndexShard} to add file synchronization
|
* ShadowIndexShard extends {@link IndexShard} to add file synchronization
|
||||||
* from the primary when a flush happens. It also ensures that a replica being
|
* from the primary when a flush happens. It also ensures that a replica being
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.translog.Translog.TranslogGeneration;
|
import org.elasticsearch.index.translog.Translog.TranslogGeneration;
|
||||||
|
import org.elasticsearch.indices.memory.IndexingMemoryController;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
@ -42,7 +43,6 @@ public final class TranslogConfig {
|
||||||
public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type";
|
public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type";
|
||||||
public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size";
|
public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size";
|
||||||
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
|
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
|
||||||
public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER");
|
|
||||||
|
|
||||||
private final TimeValue syncInterval;
|
private final TimeValue syncInterval;
|
||||||
private final BigArrays bigArrays;
|
private final BigArrays bigArrays;
|
||||||
|
@ -73,7 +73,7 @@ public final class TranslogConfig {
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.bigArrays = bigArrays;
|
this.bigArrays = bigArrays;
|
||||||
this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name()));
|
this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name()));
|
||||||
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, ByteSizeValue.parseBytesSizeValue("64k", INDEX_TRANSLOG_BUFFER_SIZE)).bytes(); // Not really interesting, updated by IndexingMemoryController...
|
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER).bytes(); // Not really interesting, updated by IndexingMemoryController...
|
||||||
|
|
||||||
syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
|
syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
|
||||||
if (syncInterval.millis() > 0 && threadPool != null) {
|
if (syncInterval.millis() > 0 && threadPool != null) {
|
||||||
|
|
|
@ -29,12 +29,10 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.engine.EngineClosedException;
|
import org.elasticsearch.index.engine.EngineClosedException;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
|
||||||
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
|
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.IndexShardState;
|
import org.elasticsearch.index.shard.IndexShardState;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
@ -42,9 +40,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
|
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
|
||||||
|
|
||||||
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
|
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
|
||||||
|
@ -83,6 +78,12 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
/** How frequently we check shards to find inactive ones (default: 30 seconds). */
|
/** How frequently we check shards to find inactive ones (default: 30 seconds). */
|
||||||
public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval";
|
public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval";
|
||||||
|
|
||||||
|
/** Once a shard becomes inactive, we reduce the {@code IndexWriter} buffer to this value (500 KB) to let active shards use the heap instead. */
|
||||||
|
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER");
|
||||||
|
|
||||||
|
/** Once a shard becomes inactive, we reduce the {@code Translog} buffer to this value (1 KB) to let active shards use the heap instead. */
|
||||||
|
public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER");
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final IndicesService indicesService;
|
private final IndicesService indicesService;
|
||||||
|
|
||||||
|
@ -164,7 +165,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
|
|
||||||
this.statusChecker = new ShardsIndicesStatusChecker();
|
this.statusChecker = new ShardsIndicesStatusChecker();
|
||||||
|
|
||||||
|
|
||||||
logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]",
|
logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]",
|
||||||
this.indexingBuffer,
|
this.indexingBuffer,
|
||||||
MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
|
MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
|
||||||
|
@ -175,7 +175,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() {
|
protected void doStart() {
|
||||||
// its fine to run it on the scheduler thread, no busy work
|
// it's fine to run it on the scheduler thread, no busy work
|
||||||
this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval);
|
this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,6 +240,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** set new indexing and translog buffers on this shard. this may cause the shard to refresh to free up heap. */
|
||||||
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
||||||
final IndexShard shard = getShard(shardId);
|
final IndexShard shard = getShard(shardId);
|
||||||
if (shard != null) {
|
if (shard != null) {
|
||||||
|
@ -255,105 +256,86 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** returns {@link IndexShard#getActive} if the shard exists, else null */
|
||||||
/** returns the current translog status (generation id + ops) for the given shard id. Returns null if unavailable. */
|
protected Boolean getShardActive(ShardId shardId) {
|
||||||
protected ShardIndexingStatus getTranslogStatus(ShardId shardId) {
|
|
||||||
final IndexShard indexShard = getShard(shardId);
|
final IndexShard indexShard = getShard(shardId);
|
||||||
if (indexShard == null) {
|
if (indexShard == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
final Translog translog;
|
return indexShard.getActive();
|
||||||
try {
|
|
||||||
translog = indexShard.getTranslog();
|
|
||||||
} catch (EngineClosedException e) {
|
|
||||||
// not ready yet to be checked for activity
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardIndexingStatus status = new ShardIndexingStatus();
|
/** check if any shards active status changed, now. */
|
||||||
status.translogId = translog.currentFileGeneration();
|
public void forceCheck() {
|
||||||
status.translogNumberOfOperations = translog.totalOperations();
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
// used for tests
|
|
||||||
void forceCheck() {
|
|
||||||
statusChecker.run();
|
statusChecker.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
class ShardsIndicesStatusChecker implements Runnable {
|
class ShardsIndicesStatusChecker implements Runnable {
|
||||||
|
|
||||||
private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = new HashMap<>();
|
// True if the shard was active last time we checked
|
||||||
|
private final Map<ShardId,Boolean> shardWasActive = new HashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public synchronized void run() {
|
||||||
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();
|
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();
|
||||||
|
|
||||||
final List<ShardId> activeToInactiveIndexingShards = new ArrayList<>();
|
updateShardStatuses(changes);
|
||||||
final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards);
|
|
||||||
for (ShardId indexShard : activeToInactiveIndexingShards) {
|
|
||||||
markShardAsInactive(indexShard);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (changes.isEmpty() == false) {
|
if (changes.isEmpty() == false) {
|
||||||
// Something changed: recompute indexing buffers:
|
// Something changed: recompute indexing buffers:
|
||||||
calcAndSetShardBuffers(activeShards, "[" + changes + "]");
|
calcAndSetShardBuffers("[" + changes + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* goes through all existing shards and check whether the changes their active status
|
* goes through all existing shards and check whether there are changes in their active status
|
||||||
*
|
|
||||||
* @return the current count of active shards
|
|
||||||
*/
|
*/
|
||||||
private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<ShardId> activeToInactiveIndexingShards) {
|
private void updateShardStatuses(EnumSet<ShardStatusChangeType> changes) {
|
||||||
int activeShards = 0;
|
|
||||||
for (ShardId shardId : availableShards()) {
|
for (ShardId shardId : availableShards()) {
|
||||||
|
|
||||||
final ShardIndexingStatus currentStatus = getTranslogStatus(shardId);
|
// Is the shard active now?
|
||||||
|
Boolean isActive = getShardActive(shardId);
|
||||||
|
|
||||||
if (currentStatus == null) {
|
if (isActive == null) {
|
||||||
// shard was closed..
|
// shard was closed..
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardIndexingStatus status = shardsIndicesStatus.get(shardId);
|
// Was the shard active last time we checked?
|
||||||
if (status == null) {
|
Boolean wasActive = shardWasActive.get(shardId);
|
||||||
status = currentStatus;
|
|
||||||
shardsIndicesStatus.put(shardId, status);
|
if (wasActive == null) {
|
||||||
|
// First time we are seeing this shard
|
||||||
|
shardWasActive.put(shardId, isActive);
|
||||||
changes.add(ShardStatusChangeType.ADDED);
|
changes.add(ShardStatusChangeType.ADDED);
|
||||||
} else {
|
} else if (isActive) {
|
||||||
final boolean lastActiveIndexing = status.activeIndexing;
|
// Shard is active now
|
||||||
status.updateWith(currentTimeInNanos(), currentStatus, inactiveTime.nanos());
|
if (wasActive == false) {
|
||||||
if (lastActiveIndexing && (status.activeIndexing == false)) {
|
// Shard became active itself, since we last checked (due to new indexing op arriving)
|
||||||
activeToInactiveIndexingShards.add(shardId);
|
|
||||||
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
|
|
||||||
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise, setting size to [{}]",
|
|
||||||
shardId,
|
|
||||||
inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
|
|
||||||
} else if ((lastActiveIndexing == false) && status.activeIndexing) {
|
|
||||||
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
|
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
|
||||||
logger.debug("marking shard {} as active indexing wise", shardId);
|
logger.debug("marking shard {} as active indexing wise", shardId);
|
||||||
|
shardWasActive.put(shardId, true);
|
||||||
|
} else if (checkIdle(shardId, inactiveTime.nanos()) == Boolean.TRUE) {
|
||||||
|
// Make shard inactive now
|
||||||
|
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
|
||||||
|
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
|
||||||
|
shardId,
|
||||||
|
inactiveTime);
|
||||||
|
shardWasActive.put(shardId, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status.activeIndexing) {
|
|
||||||
activeShards++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return activeShards;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* purge any existing statuses that are no longer updated
|
* purge any existing statuses that are no longer updated
|
||||||
*
|
*
|
||||||
* @return true if any change
|
* @return the changes applied
|
||||||
*/
|
*/
|
||||||
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
|
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
|
||||||
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
|
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
|
||||||
|
|
||||||
Iterator<ShardId> statusShardIdIterator = shardsIndicesStatus.keySet().iterator();
|
Iterator<ShardId> statusShardIdIterator = shardWasActive.keySet().iterator();
|
||||||
while (statusShardIdIterator.hasNext()) {
|
while (statusShardIdIterator.hasNext()) {
|
||||||
ShardId shardId = statusShardIdIterator.next();
|
ShardId shardId = statusShardIdIterator.next();
|
||||||
if (shardAvailable(shardId) == false) {
|
if (shardAvailable(shardId) == false) {
|
||||||
|
@ -364,12 +346,25 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
return changes;
|
return changes;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void calcAndSetShardBuffers(int activeShards, String reason) {
|
private void calcAndSetShardBuffers(String reason) {
|
||||||
if (activeShards == 0) {
|
|
||||||
|
// Count how many shards are now active:
|
||||||
|
int activeShardCount = 0;
|
||||||
|
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
|
||||||
|
if (ent.getValue()) {
|
||||||
|
activeShardCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard
|
||||||
|
// is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not
|
||||||
|
// get the same indexing buffer as large indices. But it quickly gets tricky...
|
||||||
|
if (activeShardCount == 0) {
|
||||||
logger.debug("no active shards (reason={})", reason);
|
logger.debug("no active shards (reason={})", reason);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShards);
|
|
||||||
|
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShardCount);
|
||||||
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
|
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
|
||||||
shardIndexingBufferSize = minShardIndexBufferSize;
|
shardIndexingBufferSize = minShardIndexBufferSize;
|
||||||
}
|
}
|
||||||
|
@ -377,7 +372,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
shardIndexingBufferSize = maxShardIndexBufferSize;
|
shardIndexingBufferSize = maxShardIndexBufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShards);
|
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShardCount);
|
||||||
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
|
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
|
||||||
shardTranslogBufferSize = minShardTranslogBufferSize;
|
shardTranslogBufferSize = minShardTranslogBufferSize;
|
||||||
}
|
}
|
||||||
|
@ -385,11 +380,12 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
shardTranslogBufferSize = maxShardTranslogBufferSize;
|
shardTranslogBufferSize = maxShardTranslogBufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize);
|
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);
|
||||||
for (ShardId shardId : availableShards()) {
|
|
||||||
ShardIndexingStatus status = shardsIndicesStatus.get(shardId);
|
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
|
||||||
if (status == null || status.activeIndexing) {
|
if (ent.getValue()) {
|
||||||
updateShardBuffers(shardId, shardIndexingBufferSize, shardTranslogBufferSize);
|
// This shard is active
|
||||||
|
updateShardBuffers(ent.getKey(), shardIndexingBufferSize, shardTranslogBufferSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -399,13 +395,14 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
return System.nanoTime();
|
return System.nanoTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
// update inactive indexing buffer size
|
/** ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so. returns Boolean.TRUE if
|
||||||
protected void markShardAsInactive(ShardId shardId) {
|
* it did deactive, Boolean.FALSE if it did not, and null if the shard is unknown */
|
||||||
|
protected Boolean checkIdle(ShardId shardId, long inactiveTimeNS) {
|
||||||
String ignoreReason = null;
|
String ignoreReason = null;
|
||||||
final IndexShard shard = getShard(shardId);
|
final IndexShard shard = getShard(shardId);
|
||||||
if (shard != null) {
|
if (shard != null) {
|
||||||
try {
|
try {
|
||||||
shard.markAsInactive();
|
return shard.checkIdle(inactiveTimeNS);
|
||||||
} catch (EngineClosedException e) {
|
} catch (EngineClosedException e) {
|
||||||
// ignore
|
// ignore
|
||||||
ignoreReason = "EngineClosedException";
|
ignoreReason = "EngineClosedException";
|
||||||
|
@ -419,47 +416,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
||||||
if (ignoreReason != null) {
|
if (ignoreReason != null) {
|
||||||
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId);
|
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId);
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static enum ShardStatusChangeType {
|
private static enum ShardStatusChangeType {
|
||||||
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
|
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
|
||||||
}
|
}
|
||||||
|
|
||||||
static class ShardIndexingStatus {
|
|
||||||
long translogId = -1;
|
|
||||||
long translogNumberOfOperations = -1;
|
|
||||||
boolean activeIndexing = true;
|
|
||||||
long idleSinceNanoTime = -1; // contains the first time we saw this shard with no operations done on it
|
|
||||||
|
|
||||||
|
|
||||||
/** update status based on a new sample. updates all internal variables */
|
|
||||||
public void updateWith(long currentNanoTime, ShardIndexingStatus current, long inactiveNanoInterval) {
|
|
||||||
final boolean idle = (translogId == current.translogId && translogNumberOfOperations == current.translogNumberOfOperations);
|
|
||||||
if (activeIndexing && idle) {
|
|
||||||
// no indexing activity detected.
|
|
||||||
if (idleSinceNanoTime < 0) {
|
|
||||||
// first time we see this, start the clock.
|
|
||||||
idleSinceNanoTime = currentNanoTime;
|
|
||||||
} else if ((currentNanoTime - idleSinceNanoTime) > inactiveNanoInterval) {
|
|
||||||
// shard is inactive. mark it as such.
|
|
||||||
activeIndexing = false;
|
|
||||||
}
|
|
||||||
} else if (activeIndexing == false // we weren't indexing before
|
|
||||||
&& idle == false // but we do now
|
|
||||||
&& current.translogNumberOfOperations > 0 // but only if we're really sure - see note bellow
|
|
||||||
) {
|
|
||||||
// since we sync flush once a shard becomes inactive, the translog id can change, however that
|
|
||||||
// doesn't mean the an indexing operation has happened. Note that if we're really unlucky and a flush happens
|
|
||||||
// immediately after an indexing operation we may not become active immediately. The following
|
|
||||||
// indexing operation will mark the shard as active, so it's OK. If that one doesn't come, we might as well stay
|
|
||||||
// inactive
|
|
||||||
|
|
||||||
activeIndexing = true;
|
|
||||||
idleSinceNanoTime = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
translogId = current.translogId;
|
|
||||||
translogNumberOfOperations = current.translogNumberOfOperations;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -339,7 +339,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
||||||
client().prepareIndex("test", "test").setSource("{}").get();
|
client().prepareIndex("test", "test").setSource("{}").get();
|
||||||
ensureGreen("test");
|
ensureGreen("test");
|
||||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
indicesService.indexService("test").getShardOrNull(0).markAsInactive();
|
Boolean result = indicesService.indexService("test").getShardOrNull(0).checkIdle(0);
|
||||||
|
assertEquals(Boolean.TRUE, result);
|
||||||
assertBusy(new Runnable() { // should be very very quick
|
assertBusy(new Runnable() { // should be very very quick
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -954,7 +955,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndexAliasesService(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper);
|
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndexAliasesService(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
|
||||||
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
|
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
|
||||||
|
|
||||||
ShardRoutingHelper.reinit(routing);
|
ShardRoutingHelper.reinit(routing);
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
|
||||||
index("test1", "type", "1", "f", 1);
|
index("test1", "type", "1", "f", 1);
|
||||||
|
|
||||||
// make shard the shard buffer was set to inactive size
|
// make shard the shard buffer was set to inactive size
|
||||||
final ByteSizeValue inactiveBuffer = EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER;
|
final ByteSizeValue inactiveBuffer = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
|
||||||
if (awaitBusy(() -> getIWBufferSize("test1") == inactiveBuffer.bytes()) == false) {
|
if (awaitBusy(() -> getIWBufferSize("test1") == inactiveBuffer.bytes()) == false) {
|
||||||
fail("failed to update shard indexing buffer size for test1 index to [" + inactiveBuffer + "]; got: " + getIWBufferSize("test1"));
|
fail("failed to update shard indexing buffer size for test1 index to [" + inactiveBuffer + "]; got: " + getIWBufferSize("test1"));
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,13 +22,17 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.translog.TranslogConfig;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
|
@ -39,44 +43,28 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
|
|
||||||
final static ByteSizeValue INACTIVE = new ByteSizeValue(-1);
|
final static ByteSizeValue INACTIVE = new ByteSizeValue(-1);
|
||||||
|
|
||||||
final Map<ShardId, Long> translogIds = new HashMap<>();
|
|
||||||
final Map<ShardId, Long> translogOps = new HashMap<>();
|
|
||||||
|
|
||||||
final Map<ShardId, ByteSizeValue> indexingBuffers = new HashMap<>();
|
final Map<ShardId, ByteSizeValue> indexingBuffers = new HashMap<>();
|
||||||
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
|
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
|
||||||
|
|
||||||
|
final Map<ShardId, Long> lastIndexTimeNanos = new HashMap<>();
|
||||||
|
final Set<ShardId> activeShards = new HashSet<>();
|
||||||
|
|
||||||
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
|
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
|
||||||
|
|
||||||
public MockController(Settings settings) {
|
public MockController(Settings settings) {
|
||||||
super(Settings.builder()
|
super(Settings.builder()
|
||||||
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
|
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
|
||||||
.put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate
|
.put(SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate
|
||||||
.put(settings)
|
.put(settings)
|
||||||
.build(),
|
.build(),
|
||||||
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
|
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
|
||||||
}
|
}
|
||||||
|
|
||||||
public void incTranslog(ShardId shard1, int id, int ops) {
|
|
||||||
setTranslog(shard1, translogIds.get(shard1) + id, translogOps.get(shard1) + ops);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTranslog(ShardId id, long translogId, long ops) {
|
|
||||||
translogIds.put(id, translogId);
|
|
||||||
translogOps.put(id, ops);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deleteShard(ShardId id) {
|
public void deleteShard(ShardId id) {
|
||||||
translogIds.remove(id);
|
|
||||||
translogOps.remove(id);
|
|
||||||
indexingBuffers.remove(id);
|
indexingBuffers.remove(id);
|
||||||
translogBuffers.remove(id);
|
translogBuffers.remove(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void assertActive(ShardId id) {
|
|
||||||
assertThat(indexingBuffers.get(id), not(equalTo(INACTIVE)));
|
|
||||||
assertThat(translogBuffers.get(id), not(equalTo(INACTIVE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) {
|
public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) {
|
||||||
assertThat(indexingBuffers.get(id), equalTo(indexing));
|
assertThat(indexingBuffers.get(id), equalTo(indexing));
|
||||||
assertThat(translogBuffers.get(id), equalTo(translog));
|
assertThat(translogBuffers.get(id), equalTo(translog));
|
||||||
|
@ -94,29 +82,17 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<ShardId> availableShards() {
|
protected List<ShardId> availableShards() {
|
||||||
return new ArrayList<>(translogIds.keySet());
|
return new ArrayList<>(indexingBuffers.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean shardAvailable(ShardId shardId) {
|
protected boolean shardAvailable(ShardId shardId) {
|
||||||
return translogIds.containsKey(shardId);
|
return indexingBuffers.containsKey(shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void markShardAsInactive(ShardId shardId) {
|
protected Boolean getShardActive(ShardId shardId) {
|
||||||
indexingBuffers.put(shardId, INACTIVE);
|
return activeShards.contains(shardId);
|
||||||
translogBuffers.put(shardId, INACTIVE);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ShardIndexingStatus getTranslogStatus(ShardId shardId) {
|
|
||||||
if (!shardAvailable(shardId)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
ShardIndexingStatus status = new ShardIndexingStatus();
|
|
||||||
status.translogId = translogIds.get(shardId);
|
|
||||||
status.translogNumberOfOperations = translogOps.get(shardId);
|
|
||||||
return status;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -125,12 +101,34 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
translogBuffers.put(shardId, shardTranslogBufferSize);
|
translogBuffers.put(shardId, shardTranslogBufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Boolean checkIdle(ShardId shardId, long inactiveTimeNS) {
|
||||||
|
Long ns = lastIndexTimeNanos.get(shardId);
|
||||||
|
if (ns == null) {
|
||||||
|
return null;
|
||||||
|
} else if (currentTimeInNanos() - ns >= inactiveTimeNS) {
|
||||||
|
indexingBuffers.put(shardId, INACTIVE);
|
||||||
|
translogBuffers.put(shardId, INACTIVE);
|
||||||
|
activeShards.remove(shardId);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void incrementTimeSec(int sec) {
|
public void incrementTimeSec(int sec) {
|
||||||
currentTimeSec += sec;
|
currentTimeSec += sec;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void simulateFlush(ShardId shard) {
|
public void simulateIndexing(ShardId shardId) {
|
||||||
setTranslog(shard, translogIds.get(shard) + 1, 0);
|
lastIndexTimeNanos.put(shardId, currentTimeInNanos());
|
||||||
|
if (indexingBuffers.containsKey(shardId) == false) {
|
||||||
|
// First time we are seeing this shard; start it off with inactive buffers as IndexShard does:
|
||||||
|
indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||||
|
translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
||||||
|
}
|
||||||
|
activeShards.add(shardId);
|
||||||
|
forceCheck();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,14 +137,12 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build());
|
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build());
|
||||||
final ShardId shard1 = new ShardId("test", 1);
|
final ShardId shard1 = new ShardId("test", 1);
|
||||||
controller.setTranslog(shard1, randomInt(10), randomInt(10));
|
controller.simulateIndexing(shard1);
|
||||||
controller.forceCheck();
|
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
||||||
|
|
||||||
// add another shard
|
// add another shard
|
||||||
final ShardId shard2 = new ShardId("test", 2);
|
final ShardId shard2 = new ShardId("test", 2);
|
||||||
controller.setTranslog(shard2, randomInt(10), randomInt(10));
|
controller.simulateIndexing(shard2);
|
||||||
controller.forceCheck();
|
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||||
|
|
||||||
|
@ -161,8 +157,7 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
|
|
||||||
// add a new one
|
// add a new one
|
||||||
final ShardId shard3 = new ShardId("test", 3);
|
final ShardId shard3 = new ShardId("test", 3);
|
||||||
controller.setTranslog(shard3, randomInt(10), randomInt(10));
|
controller.simulateIndexing(shard3);
|
||||||
controller.forceCheck();
|
|
||||||
controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,48 +169,42 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
final ShardId shard1 = new ShardId("test", 1);
|
final ShardId shard1 = new ShardId("test", 1);
|
||||||
controller.setTranslog(shard1, 0, 0);
|
controller.simulateIndexing(shard1);
|
||||||
final ShardId shard2 = new ShardId("test", 2);
|
final ShardId shard2 = new ShardId("test", 2);
|
||||||
controller.setTranslog(shard2, 0, 0);
|
controller.simulateIndexing(shard2);
|
||||||
controller.forceCheck();
|
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||||
|
|
||||||
// index into both shards, move the clock and see that they are still active
|
// index into both shards, move the clock and see that they are still active
|
||||||
controller.setTranslog(shard1, randomInt(2), randomInt(2) + 1);
|
controller.simulateIndexing(shard1);
|
||||||
controller.setTranslog(shard2, randomInt(2) + 1, randomInt(2));
|
controller.simulateIndexing(shard2);
|
||||||
// the controller doesn't know when the ops happened, so even if this is more
|
|
||||||
// than the inactive time the shard is still marked as active
|
|
||||||
controller.incrementTimeSec(10);
|
controller.incrementTimeSec(10);
|
||||||
controller.forceCheck();
|
controller.forceCheck();
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
|
||||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
|
||||||
|
|
||||||
// index into one shard only, see other shard is made inactive correctly
|
// both shards now inactive
|
||||||
controller.incTranslog(shard1, randomInt(2), randomInt(2) + 1);
|
controller.assertInActive(shard1);
|
||||||
controller.forceCheck(); // register what happened with the controller (shard is still active)
|
controller.assertInActive(shard2);
|
||||||
controller.incrementTimeSec(3); // increment but not enough
|
|
||||||
controller.forceCheck();
|
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
|
||||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
|
||||||
|
|
||||||
controller.incrementTimeSec(3); // increment some more
|
// index into one shard only, see it becomes active
|
||||||
|
controller.simulateIndexing(shard1);
|
||||||
|
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||||
|
controller.assertInActive(shard2);
|
||||||
|
|
||||||
|
controller.incrementTimeSec(3); // increment but not enough to become inactive
|
||||||
controller.forceCheck();
|
controller.forceCheck();
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||||
controller.assertInActive(shard2);
|
controller.assertInActive(shard2);
|
||||||
|
|
||||||
if (randomBoolean()) {
|
controller.incrementTimeSec(3); // increment some more
|
||||||
// once a shard gets inactive it will be synced flushed and a new translog generation will be made
|
|
||||||
controller.simulateFlush(shard2);
|
|
||||||
controller.forceCheck();
|
controller.forceCheck();
|
||||||
|
controller.assertInActive(shard1);
|
||||||
controller.assertInActive(shard2);
|
controller.assertInActive(shard2);
|
||||||
}
|
|
||||||
|
|
||||||
// index some and shard becomes immediately active
|
// index some and shard becomes immediately active
|
||||||
controller.incTranslog(shard2, randomInt(2), 1 + randomInt(2)); // we must make sure translog ops is never 0
|
controller.simulateIndexing(shard2);
|
||||||
controller.forceCheck();
|
controller.assertInActive(shard1);
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMinShardBufferSizes() {
|
public void testMinShardBufferSizes() {
|
||||||
|
@ -273,10 +262,9 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
|
|
||||||
protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) {
|
protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) {
|
||||||
final ShardId shard1 = new ShardId("test", 1);
|
final ShardId shard1 = new ShardId("test", 1);
|
||||||
controller.setTranslog(shard1, 0, 0);
|
controller.simulateIndexing(shard1);
|
||||||
final ShardId shard2 = new ShardId("test", 2);
|
final ShardId shard2 = new ShardId("test", 2);
|
||||||
controller.setTranslog(shard2, 0, 0);
|
controller.simulateIndexing(shard2);
|
||||||
controller.forceCheck();
|
|
||||||
controller.assertBuffers(shard1, indexBufferSize, translogBufferSize);
|
controller.assertBuffers(shard1, indexBufferSize, translogBufferSize);
|
||||||
controller.assertBuffers(shard2, indexBufferSize, translogBufferSize);
|
controller.assertBuffers(shard2, indexBufferSize, translogBufferSize);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue