This commit is contained in:
Michael McCandless 2015-10-07 10:18:52 -04:00 committed by mikemccand
parent 934827d25f
commit 7f435e2f4f
5 changed files with 20 additions and 19 deletions

View File

@ -288,7 +288,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
indicesLifecycle.afterIndexShardCreated(indexShard);
settingsService.addListener(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
indexServicesProvider.getIndexingMemoryController().forceCheck();
success = true;
return indexShard;
} catch (IOException e) {

View File

@ -40,6 +40,7 @@ import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
@ -107,7 +108,6 @@ public final class EngineConfig {
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final ByteSizeValue DEFAULT_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
public static final String DEFAULT_VERSION_MAP_SIZE = "25%";
@ -138,7 +138,8 @@ public final class EngineConfig {
this.failedEngineListener = failedEngineListener;
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
indexingBufferSize = DEFAULT_INDEX_BUFFER_SIZE;
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
indexingBufferSize = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
updateVersionMapSize();

View File

@ -259,9 +259,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
percolatorQueriesRegistry.enableRealTimePercolator();
}
lastWriteNS = System.nanoTime();
active.set(true);
// We start up inactive
active.set(false);
}
public Store store() {
@ -1057,16 +1057,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
* if the shard did in fact become inactive, else false. */
* if the shard is inactive. */
public boolean checkIdle(long inactiveTimeNS) {
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS && active.getAndSet(false)) {
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
logger.debug("shard is now inactive");
indicesLifecycle.onShardInactive(this);
return true;
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
logger.debug("shard is now inactive");
indicesLifecycle.onShardInactive(this);
}
}
return false;
return active.get() == false;
}
/** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog.TranslogGeneration;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;
import java.nio.file.Path;
@ -43,8 +44,6 @@ public final class TranslogConfig {
public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size";
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
public static final ByteSizeValue DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE = ByteSizeValue.parseBytesSizeValue("64k", INDEX_TRANSLOG_BUFFER_SIZE);
private final TimeValue syncInterval;
private final BigArrays bigArrays;
private final ThreadPool threadPool;
@ -74,7 +73,7 @@ public final class TranslogConfig {
this.threadPool = threadPool;
this.bigArrays = bigArrays;
this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name()));
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE).bytes(); // Not really interesting, updated by IndexingMemoryController...
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER).bytes(); // Not really interesting, updated by IndexingMemoryController...
syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
if (syncInterval.millis() > 0 && threadPool != null) {

View File

@ -123,9 +123,9 @@ public class IndexingMemoryControllerTests extends ESTestCase {
public void simulateIndexing(ShardId shardId) {
lastIndexTimeNanos.put(shardId, currentTimeInNanos());
if (indexingBuffers.containsKey(shardId) == false) {
// First time we are indexing into this shard; start it off with default indexing buffer:
indexingBuffers.put(shardId, EngineConfig.DEFAULT_INDEX_BUFFER_SIZE);
translogBuffers.put(shardId, TranslogConfig.DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE);
// First time we are seeing this shard; start it off with inactive buffers as IndexShard does:
indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
}
activeShards.add(shardId);
forceCheck();