Automatic management of indexing buffer size, closes #241.
This commit is contained in:
parent
fdb2eff998
commit
088e0b5a64
|
@ -22,6 +22,7 @@ package org.elasticsearch.indices.memory;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -38,6 +39,8 @@ public class IndexingMemoryBufferController extends AbstractComponent {
|
|||
|
||||
private final ByteSizeValue indexingBuffer;
|
||||
|
||||
private final ByteSizeValue minShardIndexBufferSize;
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final Listener listener = new Listener();
|
||||
|
@ -53,6 +56,9 @@ public class IndexingMemoryBufferController extends AbstractComponent {
|
|||
} else {
|
||||
this.indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBuffer, null);
|
||||
}
|
||||
this.minShardIndexBufferSize = componentSettings.getAsBytesSize("min_shard_index_buffer_size", new ByteSizeValue(4, ByteSizeUnit.MB));
|
||||
|
||||
logger.debug("using index_buffer_size [{}], with min_shard_index_buffer_size [{}]", this.indexingBuffer, this.minShardIndexBufferSize);
|
||||
|
||||
indicesService.indicesLifecycle().addListener(listener);
|
||||
}
|
||||
|
@ -72,14 +78,17 @@ public class IndexingMemoryBufferController extends AbstractComponent {
|
|||
if (shardsCount == 0) {
|
||||
return;
|
||||
}
|
||||
ByteSizeValue shardIndexingBuffer = calcShardIndexingBuffer(shardsCount);
|
||||
if (shardIndexingBuffer == null) {
|
||||
ByteSizeValue shardIndexingBufferSize = calcShardIndexingBuffer(shardsCount);
|
||||
if (shardIndexingBufferSize == null) {
|
||||
return;
|
||||
}
|
||||
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBuffer);
|
||||
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = minShardIndexBufferSize;
|
||||
}
|
||||
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize);
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
((InternalIndexShard) indexShard).engine().indexingBuffer(shardIndexingBuffer);
|
||||
((InternalIndexShard) indexShard).engine().indexingBuffer(shardIndexingBufferSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue