diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index 34ee9ef2a3c..e11817adc75 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -14,6 +14,7 @@
booleans
bytebuffer
cacheable
+ calc
camelcase
canonicalhost
checksum
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java
index 8533db0d06c..11a45eae6c5 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -42,6 +42,8 @@ import javax.annotation.Nullable;
@ThreadSafe
public interface Engine extends IndexShardComponent, CloseableComponent {
+ void indexingBuffer(ByteSizeValue indexingBufferSize);
+
/**
* Starts the Engine.
*
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
index b43a1a5df44..e10c3a7b68c 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
@@ -60,7 +60,7 @@ import static org.elasticsearch.common.util.concurrent.resource.AcquirableResour
*/
public class RobinEngine extends AbstractIndexShardComponent implements Engine, ScheduledRefreshableEngine {
- private final ByteSizeValue ramBufferSize;
+ private volatile ByteSizeValue indexingBufferSize;
private final TimeValue refreshInterval;
@@ -105,7 +105,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
- this.ramBufferSize = componentSettings.getAsBytesSize("ram_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB));
+ this.indexingBufferSize = componentSettings.getAsBytesSize("indexing_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB));
this.refreshInterval = componentSettings.getAsTime("refresh_interval", timeValueSeconds(1));
this.termIndexInterval = componentSettings.getAsInt("term_index_interval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL);
@@ -129,12 +129,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
}
+ @Override public void indexingBuffer(ByteSizeValue indexingBufferSize) {
+ this.indexingBufferSize = indexingBufferSize;
+ IndexWriter indexWriter = this.indexWriter;
+ if (indexWriter != null) {
+ indexWriter.setRAMBufferSizeMB(indexingBufferSize.mbFrac());
+ }
+ }
+
@Override public void start() throws EngineException {
if (indexWriter != null) {
throw new EngineAlreadyStartedException(shardId);
}
if (logger.isDebugEnabled()) {
- logger.debug("Starting engine with ram_buffer_size[" + ramBufferSize + "], refresh_interval[" + refreshInterval + "]");
+ logger.debug("Starting engine with ram_buffer_size[" + indexingBufferSize + "], refresh_interval[" + refreshInterval + "]");
}
try {
this.indexWriter = createWriter();
@@ -477,7 +485,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
indexWriter.setMergeScheduler(mergeScheduler.newMergeScheduler());
indexWriter.setMergePolicy(mergePolicyProvider.newMergePolicy(indexWriter));
indexWriter.setSimilarity(similarityService.defaultIndexSimilarity());
- indexWriter.setRAMBufferSizeMB(ramBufferSize.mbFrac());
+ indexWriter.setRAMBufferSizeMB(indexingBufferSize.mbFrac());
indexWriter.setTermIndexInterval(termIndexInterval);
} catch (IOException e) {
safeClose(indexWriter);
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java
index e01bc2a1399..2811979b263 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java
@@ -23,10 +23,12 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
+import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
+import org.elasticsearch.indices.memory.IndicesMemoryCleaner;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
/**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
*/
public class IndicesModule extends AbstractModule {
@@ -42,6 +44,7 @@ public class IndicesModule extends AbstractModule {
bind(RecoveryThrottler.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndicesMemoryCleaner.class).asEagerSingleton();
+ bind(IndexingMemoryBufferController.class).asEagerSingleton();
bind(IndicesAnalysisService.class).asEagerSingleton();
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryBufferController.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryBufferController.java
new file mode 100644
index 00000000000..1ed25ee7743
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryBufferController.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.memory;
+
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.index.service.IndexService;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.service.IndexShard;
+import org.elasticsearch.index.shard.service.InternalIndexShard;
+import org.elasticsearch.indices.IndicesLifecycle;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.monitor.jvm.JvmInfo;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class IndexingMemoryBufferController extends AbstractComponent {
+
+ private final ByteSizeValue indexingBuffer;
+
+ private final IndicesService indicesService;
+
+ private final Listener listener = new Listener();
+
+ @Inject public IndexingMemoryBufferController(Settings settings, IndicesService indicesService) {
+ super(settings);
+ this.indicesService = indicesService;
+
+ String indexingBuffer = componentSettings.get("index_buffer_size", "40%");
+ if (indexingBuffer.endsWith("%")) {
+ double percent = Double.parseDouble(indexingBuffer.substring(0, indexingBuffer.length() - 1));
+ this.indexingBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().mem().heapMax().bytes()) * (percent / 100)));
+ } else {
+ this.indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBuffer, null);
+ }
+
+ indicesService.indicesLifecycle().addListener(listener);
+ }
+
+ private class Listener extends IndicesLifecycle.Listener {
+
+ @Override public void afterIndexShardCreated(IndexShard indexShard) {
+ calcAndSetShardIndexingBuffer("created_shard[" + indexShard.shardId().index().name() + "][" + indexShard.shardId().id() + "]");
+ }
+
+ @Override public void afterIndexShardClosed(ShardId shardId, boolean delete) {
+ calcAndSetShardIndexingBuffer("removed_shard[" + shardId.index().name() + "][" + shardId.id() + "]");
+ }
+
+ private void calcAndSetShardIndexingBuffer(String reason) {
+ int shardsCount = countShards();
+ if (shardsCount == 0) {
+ return;
+ }
+ ByteSizeValue shardIndexingBuffer = calcShardIndexingBuffer(shardsCount);
+ if (shardIndexingBuffer == null) {
+ return;
+ }
+ logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBuffer);
+ for (IndexService indexService : indicesService) {
+ for (IndexShard indexShard : indexService) {
+ ((InternalIndexShard) indexShard).engine().indexingBuffer(shardIndexingBuffer);
+ }
+ }
+ }
+
+ private ByteSizeValue calcShardIndexingBuffer(int shardsCount) {
+ return new ByteSizeValue(indexingBuffer.bytes() / shardsCount);
+ }
+
+ private int countShards() {
+ int shardsCount = 0;
+ for (IndexService indexService : indicesService) {
+ for (IndexShard indexShard : indexService) {
+ shardsCount++;
+ }
+ }
+ return shardsCount;
+ }
+ }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndicesMemoryCleaner.java
similarity index 98%
rename from modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java
rename to modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndicesMemoryCleaner.java
index 7362f0c8b07..9590a40bdfc 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/memory/IndicesMemoryCleaner.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.elasticsearch.indices;
+package org.elasticsearch.indices.memory;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
@@ -34,6 +34,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.IndicesService;
import java.util.ArrayList;
import java.util.Collections;
@@ -43,7 +44,7 @@ import java.util.Set;
import static org.elasticsearch.common.collect.Sets.*;
/**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
*/
public class IndicesMemoryCleaner extends AbstractComponent {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java
index 32b98c3c3db..01bf4c63a5c 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java
@@ -26,7 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.indices.IndicesMemoryCleaner;
+import org.elasticsearch.indices.memory.IndicesMemoryCleaner;
import org.elasticsearch.monitor.memory.MemoryMonitor;
import org.elasticsearch.threadpool.ThreadPool;