Automatic management of indexing buffer size, closes #241.
This commit is contained in:
parent
03ac38fdba
commit
fdb2eff998
|
@ -14,6 +14,7 @@
|
||||||
<w>booleans</w>
|
<w>booleans</w>
|
||||||
<w>bytebuffer</w>
|
<w>bytebuffer</w>
|
||||||
<w>cacheable</w>
|
<w>cacheable</w>
|
||||||
|
<w>calc</w>
|
||||||
<w>camelcase</w>
|
<w>camelcase</w>
|
||||||
<w>canonicalhost</w>
|
<w>canonicalhost</w>
|
||||||
<w>checksum</w>
|
<w>checksum</w>
|
||||||
|
|
|
@ -42,6 +42,8 @@ import javax.annotation.Nullable;
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
public interface Engine extends IndexShardComponent, CloseableComponent {
|
public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||||
|
|
||||||
|
void indexingBuffer(ByteSizeValue indexingBufferSize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts the Engine.
|
* Starts the Engine.
|
||||||
*
|
*
|
||||||
|
|
|
@ -60,7 +60,7 @@ import static org.elasticsearch.common.util.concurrent.resource.AcquirableResour
|
||||||
*/
|
*/
|
||||||
public class RobinEngine extends AbstractIndexShardComponent implements Engine, ScheduledRefreshableEngine {
|
public class RobinEngine extends AbstractIndexShardComponent implements Engine, ScheduledRefreshableEngine {
|
||||||
|
|
||||||
private final ByteSizeValue ramBufferSize;
|
private volatile ByteSizeValue indexingBufferSize;
|
||||||
|
|
||||||
private final TimeValue refreshInterval;
|
private final TimeValue refreshInterval;
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
|
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
|
||||||
Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
|
Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
|
||||||
|
|
||||||
this.ramBufferSize = componentSettings.getAsBytesSize("ram_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB));
|
this.indexingBufferSize = componentSettings.getAsBytesSize("indexing_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB));
|
||||||
this.refreshInterval = componentSettings.getAsTime("refresh_interval", timeValueSeconds(1));
|
this.refreshInterval = componentSettings.getAsTime("refresh_interval", timeValueSeconds(1));
|
||||||
this.termIndexInterval = componentSettings.getAsInt("term_index_interval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL);
|
this.termIndexInterval = componentSettings.getAsInt("term_index_interval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL);
|
||||||
|
|
||||||
|
@ -129,12 +129,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public void indexingBuffer(ByteSizeValue indexingBufferSize) {
|
||||||
|
this.indexingBufferSize = indexingBufferSize;
|
||||||
|
IndexWriter indexWriter = this.indexWriter;
|
||||||
|
if (indexWriter != null) {
|
||||||
|
indexWriter.setRAMBufferSizeMB(indexingBufferSize.mbFrac());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override public void start() throws EngineException {
|
@Override public void start() throws EngineException {
|
||||||
if (indexWriter != null) {
|
if (indexWriter != null) {
|
||||||
throw new EngineAlreadyStartedException(shardId);
|
throw new EngineAlreadyStartedException(shardId);
|
||||||
}
|
}
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Starting engine with ram_buffer_size[" + ramBufferSize + "], refresh_interval[" + refreshInterval + "]");
|
logger.debug("Starting engine with ram_buffer_size[" + indexingBufferSize + "], refresh_interval[" + refreshInterval + "]");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
this.indexWriter = createWriter();
|
this.indexWriter = createWriter();
|
||||||
|
@ -477,7 +485,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
indexWriter.setMergeScheduler(mergeScheduler.newMergeScheduler());
|
indexWriter.setMergeScheduler(mergeScheduler.newMergeScheduler());
|
||||||
indexWriter.setMergePolicy(mergePolicyProvider.newMergePolicy(indexWriter));
|
indexWriter.setMergePolicy(mergePolicyProvider.newMergePolicy(indexWriter));
|
||||||
indexWriter.setSimilarity(similarityService.defaultIndexSimilarity());
|
indexWriter.setSimilarity(similarityService.defaultIndexSimilarity());
|
||||||
indexWriter.setRAMBufferSizeMB(ramBufferSize.mbFrac());
|
indexWriter.setRAMBufferSizeMB(indexingBufferSize.mbFrac());
|
||||||
indexWriter.setTermIndexInterval(termIndexInterval);
|
indexWriter.setTermIndexInterval(termIndexInterval);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
safeClose(indexWriter);
|
safeClose(indexWriter);
|
||||||
|
|
|
@ -23,10 +23,12 @@ import org.elasticsearch.common.inject.AbstractModule;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
|
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
|
||||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||||
|
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
|
||||||
|
import org.elasticsearch.indices.memory.IndicesMemoryCleaner;
|
||||||
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
|
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (Shay Banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class IndicesModule extends AbstractModule {
|
public class IndicesModule extends AbstractModule {
|
||||||
|
|
||||||
|
@ -42,6 +44,7 @@ public class IndicesModule extends AbstractModule {
|
||||||
bind(RecoveryThrottler.class).asEagerSingleton();
|
bind(RecoveryThrottler.class).asEagerSingleton();
|
||||||
bind(IndicesClusterStateService.class).asEagerSingleton();
|
bind(IndicesClusterStateService.class).asEagerSingleton();
|
||||||
bind(IndicesMemoryCleaner.class).asEagerSingleton();
|
bind(IndicesMemoryCleaner.class).asEagerSingleton();
|
||||||
|
bind(IndexingMemoryBufferController.class).asEagerSingleton();
|
||||||
bind(IndicesAnalysisService.class).asEagerSingleton();
|
bind(IndicesAnalysisService.class).asEagerSingleton();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.indices.memory;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.index.service.IndexService;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.shard.service.IndexShard;
|
||||||
|
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||||
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class IndexingMemoryBufferController extends AbstractComponent {
|
||||||
|
|
||||||
|
private final ByteSizeValue indexingBuffer;
|
||||||
|
|
||||||
|
private final IndicesService indicesService;
|
||||||
|
|
||||||
|
private final Listener listener = new Listener();
|
||||||
|
|
||||||
|
@Inject public IndexingMemoryBufferController(Settings settings, IndicesService indicesService) {
|
||||||
|
super(settings);
|
||||||
|
this.indicesService = indicesService;
|
||||||
|
|
||||||
|
String indexingBuffer = componentSettings.get("index_buffer_size", "40%");
|
||||||
|
if (indexingBuffer.endsWith("%")) {
|
||||||
|
double percent = Double.parseDouble(indexingBuffer.substring(0, indexingBuffer.length() - 1));
|
||||||
|
this.indexingBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().mem().heapMax().bytes()) * (percent / 100)));
|
||||||
|
} else {
|
||||||
|
this.indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBuffer, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
indicesService.indicesLifecycle().addListener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Listener extends IndicesLifecycle.Listener {
|
||||||
|
|
||||||
|
@Override public void afterIndexShardCreated(IndexShard indexShard) {
|
||||||
|
calcAndSetShardIndexingBuffer("created_shard[" + indexShard.shardId().index().name() + "][" + indexShard.shardId().id() + "]");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void afterIndexShardClosed(ShardId shardId, boolean delete) {
|
||||||
|
calcAndSetShardIndexingBuffer("removed_shard[" + shardId.index().name() + "][" + shardId.id() + "]");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void calcAndSetShardIndexingBuffer(String reason) {
|
||||||
|
int shardsCount = countShards();
|
||||||
|
if (shardsCount == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ByteSizeValue shardIndexingBuffer = calcShardIndexingBuffer(shardsCount);
|
||||||
|
if (shardIndexingBuffer == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBuffer);
|
||||||
|
for (IndexService indexService : indicesService) {
|
||||||
|
for (IndexShard indexShard : indexService) {
|
||||||
|
((InternalIndexShard) indexShard).engine().indexingBuffer(shardIndexingBuffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteSizeValue calcShardIndexingBuffer(int shardsCount) {
|
||||||
|
return new ByteSizeValue(indexingBuffer.bytes() / shardsCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int countShards() {
|
||||||
|
int shardsCount = 0;
|
||||||
|
for (IndexService indexService : indicesService) {
|
||||||
|
for (IndexShard indexShard : indexService) {
|
||||||
|
shardsCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return shardsCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.indices;
|
package org.elasticsearch.indices.memory;
|
||||||
|
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
|
@ -34,6 +34,7 @@ import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.service.IndexShard;
|
import org.elasticsearch.index.shard.service.IndexShard;
|
||||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -43,7 +44,7 @@ import java.util.Set;
|
||||||
import static org.elasticsearch.common.collect.Sets.*;
|
import static org.elasticsearch.common.collect.Sets.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (Shay Banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class IndicesMemoryCleaner extends AbstractComponent {
|
public class IndicesMemoryCleaner extends AbstractComponent {
|
||||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.indices.IndicesMemoryCleaner;
|
import org.elasticsearch.indices.memory.IndicesMemoryCleaner;
|
||||||
import org.elasticsearch.monitor.memory.MemoryMonitor;
|
import org.elasticsearch.monitor.memory.MemoryMonitor;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue