Indexing Buffer: Automatically inactivate unindexed into shards and lower their indexing buffer size, closes #821.
This commit is contained in:
parent
09d3b882f0
commit
3ff35d42b5
|
@ -73,7 +73,7 @@ public class TranslogService extends AbstractIndexShardComponent {
|
|||
|
||||
this.flushThresholdOperations = componentSettings.getAsInt("flush_threshold_ops", componentSettings.getAsInt("flush_threshold", 5000));
|
||||
this.flushThresholdSize = componentSettings.getAsBytesSize("flush_threshold_size", new ByteSizeValue(200, ByteSizeUnit.MB));
|
||||
this.flushThresholdPeriod = componentSettings.getAsTime("flush_threshold_period", TimeValue.timeValueMinutes(60));
|
||||
this.flushThresholdPeriod = componentSettings.getAsTime("flush_threshold_period", TimeValue.timeValueMinutes(30));
|
||||
this.interval = componentSettings.getAsTime("interval", timeValueMillis(5000));
|
||||
|
||||
logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);
|
||||
|
|
|
@ -19,35 +19,57 @@
|
|||
|
||||
package org.elasticsearch.indices.memory;
|
||||
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class IndexingMemoryBufferController extends AbstractComponent {
|
||||
public class IndexingMemoryBufferController extends AbstractLifecycleComponent<IndexingMemoryBufferController> {
|
||||
|
||||
private final ByteSizeValue indexingBuffer;
|
||||
|
||||
private final ByteSizeValue minShardIndexBufferSize;
|
||||
private final ByteSizeValue maxShardIndexBufferSize;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
|
||||
private final ByteSizeValue indexingBuffer;
|
||||
|
||||
private final ByteSizeValue inactiveShardIndexBufferSize;
|
||||
private final ByteSizeValue minShardIndexBufferSize;
|
||||
private final ByteSizeValue maxShardIndexBufferSize;
|
||||
|
||||
private final TimeValue inactiveTime;
|
||||
private final TimeValue interval;
|
||||
|
||||
private final Listener listener = new Listener();
|
||||
|
||||
@Inject public IndexingMemoryBufferController(Settings settings, IndicesService indicesService) {
|
||||
private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = Maps.newHashMap();
|
||||
|
||||
private volatile ScheduledFuture scheduler;
|
||||
|
||||
private final Object mutex = new Object();
|
||||
|
||||
@Inject public IndexingMemoryBufferController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.indicesService = indicesService;
|
||||
|
||||
ByteSizeValue indexingBuffer;
|
||||
|
@ -69,60 +91,147 @@ public class IndexingMemoryBufferController extends AbstractComponent {
|
|||
}
|
||||
|
||||
this.indexingBuffer = indexingBuffer;
|
||||
this.inactiveShardIndexBufferSize = componentSettings.getAsBytesSize("inactive_shard_index_buffer_size", new ByteSizeValue(1, ByteSizeUnit.MB));
|
||||
this.minShardIndexBufferSize = componentSettings.getAsBytesSize("min_shard_index_buffer_size", new ByteSizeValue(4, ByteSizeUnit.MB));
|
||||
// LUCENE MONITOR: Based on this thread, currently (based on Mike), having a large buffer does not make a lot of sense: https://issues.apache.org/jira/browse/LUCENE-2324?focusedCommentId=13005155&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13005155
|
||||
this.maxShardIndexBufferSize = componentSettings.getAsBytesSize("max_shard_index_buffer_size", new ByteSizeValue(512, ByteSizeUnit.MB));
|
||||
|
||||
logger.debug("using index_buffer_size [{}], with min_shard_index_buffer_size [{}], max_shard_index_buffer_size [{}]", this.indexingBuffer, this.minShardIndexBufferSize, this.maxShardIndexBufferSize);
|
||||
this.inactiveTime = componentSettings.getAsTime("shard_inactive_time", TimeValue.timeValueMinutes(30));
|
||||
// we need to have this relatively small to move a shard from inactive to active fast (enough)
|
||||
this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(30));
|
||||
|
||||
logger.debug("using index_buffer_size [{}], with min_shard_index_buffer_size [{}], max_shard_index_buffer_size [{}], inactive_shard_index_buffer_size [{}], shard_inactive_time [{}]", this.indexingBuffer, this.minShardIndexBufferSize, this.maxShardIndexBufferSize, this.inactiveShardIndexBufferSize, this.inactiveTime);
|
||||
|
||||
indicesService.indicesLifecycle().addListener(listener);
|
||||
}
|
||||
|
||||
private class Listener extends IndicesLifecycle.Listener {
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
indicesService.indicesLifecycle().addListener(listener);
|
||||
// its fine to run it on the scheduler thread, no busy work
|
||||
this.scheduler = threadPool.scheduleWithFixedDelay(new ShardsIndicesStatusChecker(), interval);
|
||||
}
|
||||
|
||||
@Override protected void doStop() throws ElasticSearchException {
|
||||
indicesService.indicesLifecycle().removeListener(listener);
|
||||
if (scheduler != null) {
|
||||
scheduler.cancel(false);
|
||||
scheduler = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
class ShardsIndicesStatusChecker implements Runnable {
|
||||
@Override public void run() {
|
||||
synchronized (mutex) {
|
||||
boolean activeInactiveStatusChanges = false;
|
||||
long time = System.currentTimeMillis();
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
Translog translog = ((InternalIndexShard) indexShard).translog();
|
||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||
if (status == null) { // not added yet
|
||||
continue;
|
||||
}
|
||||
// check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time)
|
||||
if (status.translogId == translog.currentId() && translog.numberOfOperations() == 0) {
|
||||
if (status.time == -1) { // first time
|
||||
status.time = time;
|
||||
}
|
||||
// inactive?
|
||||
if (!status.inactive) {
|
||||
if ((time - status.time) > inactiveTime.millis()) {
|
||||
// inactive for this amount of time, mark it
|
||||
status.inactive = true;
|
||||
activeInactiveStatusChanges = true;
|
||||
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]), setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, inactiveShardIndexBufferSize);
|
||||
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(inactiveShardIndexBufferSize);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (status.inactive) {
|
||||
status.inactive = false;
|
||||
activeInactiveStatusChanges = true;
|
||||
logger.debug("marking shard [{}][{}] as active", indexShard.shardId().index().name(), indexShard.shardId().id());
|
||||
}
|
||||
status.time = -1;
|
||||
}
|
||||
status.translogId = translog.currentId();
|
||||
status.translogNumberOfOperations = translog.numberOfOperations();
|
||||
}
|
||||
}
|
||||
if (activeInactiveStatusChanges) {
|
||||
calcAndSetShardIndexingBuffer("shards became active/inactive");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Listener extends IndicesLifecycle.Listener {
|
||||
|
||||
@Override public void afterIndexShardCreated(IndexShard indexShard) {
|
||||
calcAndSetShardIndexingBuffer("created_shard[" + indexShard.shardId().index().name() + "][" + indexShard.shardId().id() + "]");
|
||||
synchronized (mutex) {
|
||||
calcAndSetShardIndexingBuffer("created_shard[" + indexShard.shardId().index().name() + "][" + indexShard.shardId().id() + "]");
|
||||
shardsIndicesStatus.put(indexShard.shardId(), new ShardIndexingStatus());
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void afterIndexShardClosed(ShardId shardId, boolean delete) {
|
||||
calcAndSetShardIndexingBuffer("removed_shard[" + shardId.index().name() + "][" + shardId.id() + "]");
|
||||
synchronized (mutex) {
|
||||
calcAndSetShardIndexingBuffer("removed_shard[" + shardId.index().name() + "][" + shardId.id() + "]");
|
||||
shardsIndicesStatus.remove(shardId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void calcAndSetShardIndexingBuffer(String reason) {
|
||||
int shardsCount = countShards();
|
||||
if (shardsCount == 0) {
|
||||
return;
|
||||
}
|
||||
ByteSizeValue shardIndexingBufferSize = calcShardIndexingBuffer(shardsCount);
|
||||
if (shardIndexingBufferSize == null) {
|
||||
return;
|
||||
}
|
||||
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = minShardIndexBufferSize;
|
||||
}
|
||||
if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = maxShardIndexBufferSize;
|
||||
}
|
||||
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize);
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
|
||||
private void calcAndSetShardIndexingBuffer(String reason) {
|
||||
int shardsCount = countShards();
|
||||
if (shardsCount == 0) {
|
||||
return;
|
||||
}
|
||||
ByteSizeValue shardIndexingBufferSize = calcShardIndexingBuffer(shardsCount);
|
||||
if (shardIndexingBufferSize == null) {
|
||||
return;
|
||||
}
|
||||
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = minShardIndexBufferSize;
|
||||
}
|
||||
if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = maxShardIndexBufferSize;
|
||||
}
|
||||
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize);
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||
if (status == null || !status.inactive) {
|
||||
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ByteSizeValue calcShardIndexingBuffer(int shardsCount) {
|
||||
return new ByteSizeValue(indexingBuffer.bytes() / shardsCount);
|
||||
}
|
||||
private ByteSizeValue calcShardIndexingBuffer(int shardsCount) {
|
||||
return new ByteSizeValue(indexingBuffer.bytes() / shardsCount);
|
||||
}
|
||||
|
||||
private int countShards() {
|
||||
int shardsCount = 0;
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
private int countShards() {
|
||||
int shardsCount = 0;
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||
if (status == null || !status.inactive) {
|
||||
shardsCount++;
|
||||
}
|
||||
}
|
||||
return shardsCount;
|
||||
}
|
||||
return shardsCount;
|
||||
}
|
||||
|
||||
static class ShardIndexingStatus {
|
||||
long translogId = -1;
|
||||
int translogNumberOfOperations = -1;
|
||||
boolean inactive = false;
|
||||
long time = -1; // contains the first time we saw this shard with no operations done on it
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.elasticsearch.http.HttpServerModule;
|
|||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
|
||||
import org.elasticsearch.jmx.JmxModule;
|
||||
import org.elasticsearch.jmx.JmxService;
|
||||
import org.elasticsearch.monitor.MonitorModule;
|
||||
|
@ -168,6 +169,7 @@ public final class InternalNode implements Node {
|
|||
}
|
||||
|
||||
injector.getInstance(IndicesService.class).start();
|
||||
injector.getInstance(IndexingMemoryBufferController.class).start();
|
||||
injector.getInstance(IndicesClusterStateService.class).start();
|
||||
injector.getInstance(RiversManager.class).start();
|
||||
injector.getInstance(ClusterService.class).start();
|
||||
|
@ -204,6 +206,7 @@ public final class InternalNode implements Node {
|
|||
// stop any changes happening as a result of cluster state changes
|
||||
injector.getInstance(IndicesClusterStateService.class).stop();
|
||||
// we close indices first, so operations won't be allowed on it
|
||||
injector.getInstance(IndexingMemoryBufferController.class).stop();
|
||||
injector.getInstance(IndicesService.class).stop();
|
||||
// sleep a bit to let operations finish with indices service
|
||||
// try {
|
||||
|
@ -252,6 +255,7 @@ public final class InternalNode implements Node {
|
|||
stopWatch.stop().start("indices_cluster");
|
||||
injector.getInstance(IndicesClusterStateService.class).close();
|
||||
stopWatch.stop().start("indices");
|
||||
injector.getInstance(IndexingMemoryBufferController.class).close();
|
||||
injector.getInstance(IndicesService.class).close();
|
||||
stopWatch.stop().start("routing");
|
||||
injector.getInstance(RoutingService.class).close();
|
||||
|
|
Loading…
Reference in New Issue