diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index e081e31e636..47029247a8e 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -20,7 +20,6 @@ package org.elasticsearch.indices.memory; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -37,16 +36,12 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -66,16 +61,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent shardsIndicesStatus = Maps.newHashMap(); private volatile ScheduledFuture scheduler; - private final Object mutex = new Object(); - private static final EnumSet CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); @Inject @@ -137,14 +125,12 @@ public class IndexingMemoryController extends AbstractLifecycleComponent shardsIndicesStatus = new HashMap<>(); + + @Override public void run() { - synchronized (mutex) { - boolean activeInactiveStatusChanges = false; - List activeToInactiveIndexingShards = Lists.newArrayList(); - List inactiveToActiveIndexingShards = Lists.newArrayList(); - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - long time = threadPool.estimatedTimeInMillis(); - Translog translog = ((InternalIndexShard) indexShard).translog(); - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null) { // not added yet + EnumSet changes = EnumSet.noneOf(ShardStatusChangeType.class); + + changes.addAll(purgeDeletedAndClosedShards()); + + final List activeToInactiveIndexingShards = Lists.newArrayList(); + final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards); + for (IndexShard indexShard : activeToInactiveIndexingShards) { + // update inactive indexing buffer size + try { + ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER); + ((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER); + } catch (EngineClosedException e) { + // ignore + } catch (FlushNotAllowedEngineException e) { + // ignore + } + } + if (!changes.isEmpty()) { + calcAndSetShardBuffers(activeShards, "[" + changes + "]"); + } + } + + /** + * goes through all existing shards and check whether the changes their active status + * + * @return the current count of active shards + */ + private int updateShardStatuses(EnumSet changes, List activeToInactiveIndexingShards) { + int activeShards = 0; + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + + if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) { + // not ready to be updated yet. + continue; + } + + final long time = threadPool.estimatedTimeInMillis(); + + Translog translog = ((InternalIndexShard) indexShard).translog(); + ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); + if (status == null) { + status = new ShardIndexingStatus(); + shardsIndicesStatus.put(indexShard.shardId(), status); + changes.add(ShardStatusChangeType.ADDED); + } + // check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time) + if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) { + if (status.time == -1) { // first time + status.time = time; + } + // inactive? + if (status.activeIndexing) { + // mark it as inactive only if enough time has passed and there are no ongoing merges going on... + if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) { + // inactive for this amount of time, mark it + activeToInactiveIndexingShards.add(indexShard); + status.activeIndexing = false; + changes.add(ShardStatusChangeType.BECAME_INACTIVE); + logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER); + } + } + } else { + if (!status.activeIndexing) { + status.activeIndexing = true; + changes.add(ShardStatusChangeType.BECAME_ACTIVE); + logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); + } + status.time = -1; + } + status.translogId = translog.currentId(); + status.translogNumberOfOperations = translog.estimatedNumberOfOperations(); + + if (status.activeIndexing) { + activeShards++; + } + } + } + return activeShards; + } + + /** + * purge any existing statuses that are no longer updated + * + * @return true if any change + */ + private EnumSet purgeDeletedAndClosedShards() { + EnumSet changes = EnumSet.noneOf(ShardStatusChangeType.class); + + Iterator statusShardIdIterator = shardsIndicesStatus.keySet().iterator(); + while (statusShardIdIterator.hasNext()) { + ShardId statusShardId = statusShardIdIterator.next(); + IndexService indexService = indicesService.indexService(statusShardId.getIndex()); + boolean remove = false; + try { + if (indexService == null) { + remove = true; + continue; + } + IndexShard indexShard = indexService.shard(statusShardId.id()); + if (indexShard == null) { + remove = true; + continue; + } + remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state()); + + } finally { + if (remove) { + changes.add(ShardStatusChangeType.DELETED); + statusShardIdIterator.remove(); + } + } + } + return changes; + } + + private void calcAndSetShardBuffers(int activeShards, String reason) { + if (activeShards == 0) { + return; + } + ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShards); + if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) { + shardIndexingBufferSize = minShardIndexBufferSize; + } + if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) { + shardIndexingBufferSize = maxShardIndexBufferSize; + } + + ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShards); + if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) { + shardTranslogBufferSize = minShardTranslogBufferSize; + } + if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) { + shardTranslogBufferSize = maxShardTranslogBufferSize; + } + + logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize); + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + IndexShardState state = indexShard.state(); + if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) { + logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state); + continue; + } + ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); + if (status == null || status.activeIndexing) { + try { + ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize); + ((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize); + } catch (EngineClosedException e) { + // ignore continue; + } catch (FlushNotAllowedEngineException e) { + // ignore + continue; + } catch (Exception e) { + logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize); } - // check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time) - if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) { - if (status.time == -1) { // first time - status.time = time; - } - // inactive? - if (!status.inactiveIndexing) { - // mark it as inactive only if enough time has passed and there are no ongoing merges going on... - if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) { - // inactive for this amount of time, mark it - activeToInactiveIndexingShards.add(indexShard); - status.inactiveIndexing = true; - activeInactiveStatusChanges = true; - logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER); - } - } - } else { - if (status.inactiveIndexing) { - inactiveToActiveIndexingShards.add(indexShard); - status.inactiveIndexing = false; - activeInactiveStatusChanges = true; - logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); - } - status.time = -1; - } - status.translogId = translog.currentId(); - status.translogNumberOfOperations = translog.estimatedNumberOfOperations(); - } - } - for (IndexShard indexShard : activeToInactiveIndexingShards) { - // update inactive indexing buffer size - try { - ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER); - ((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER); - } catch (EngineClosedException e) { - // ignore - } catch (FlushNotAllowedEngineException e) { - // ignore - } - } - boolean shardsRecoveredOrDeleted = IndexingMemoryController.this.shardsRecoveredOrDeleted.compareAndSet(true, false); - if (shardsRecoveredOrDeleted || activeInactiveStatusChanges) { - calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] recovered/deleted[" + shardsRecoveredOrDeleted + "]"); - } - } - } - } - - class Listener extends IndicesLifecycle.Listener { - - @Override - public void afterIndexShardPostRecovery(IndexShard indexShard) { - synchronized (mutex) { - shardsIndicesStatus.put(indexShard.shardId(), new ShardIndexingStatus()); - shardsRecoveredOrDeleted.set(true); - } - } - - @Override - public void afterIndexShardClosed(ShardId shardId) { - synchronized (mutex) { - shardsIndicesStatus.remove(shardId); - shardsRecoveredOrDeleted.set(true); - } - } - } - - - private void calcAndSetShardBuffers(String reason) { - int shardsCount = countActiveShards(); - if (shardsCount == 0) { - return; - } - ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / shardsCount); - if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) { - shardIndexingBufferSize = minShardIndexBufferSize; - } - if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) { - shardIndexingBufferSize = maxShardIndexBufferSize; - } - - ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / shardsCount); - if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) { - shardTranslogBufferSize = minShardTranslogBufferSize; - } - if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) { - shardTranslogBufferSize = maxShardTranslogBufferSize; - } - - logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize); - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - IndexShardState state = indexShard.state(); - if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) { - logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state); - continue; - } - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null || !status.inactiveIndexing) { - try { - ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize); - ((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize); - } catch (EngineClosedException e) { - // ignore - continue; - } catch (FlushNotAllowedEngineException e) { - // ignore - continue; - } catch (Exception e) { - logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize); } } } } } - private int countActiveShards() { - int shardsCount = 0; - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null || !status.inactiveIndexing) { - shardsCount++; - } - } - } - return shardsCount; + private static enum ShardStatusChangeType { + ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE } + static class ShardIndexingStatus { long translogId = -1; int translogNumberOfOperations = -1; - boolean inactiveIndexing = false; + boolean activeIndexing = true; long time = -1; // contains the first time we saw this shard with no operations done on it } } diff --git a/src/test/java/org/elasticsearch/indices/memory/IndexMemoryControllerTests.java b/src/test/java/org/elasticsearch/indices/memory/IndexMemoryControllerTests.java deleted file mode 100644 index 349f723cda9..00000000000 --- a/src/test/java/org/elasticsearch/indices/memory/IndexMemoryControllerTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.memory; - -import com.google.common.base.Predicate; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.index.engine.internal.InternalEngine; -import org.elasticsearch.index.shard.service.InternalIndexShard; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.junit.Test; - - -@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) -public class IndexMemoryControllerTests extends ElasticsearchIntegrationTest { - - @Test - public void testIndexBufferSizeUpdateAfterShardCreation() throws InterruptedException { - - internalCluster().startNode(ImmutableSettings.builder() - .put("http.enabled", "false") - .put("discovery.type", "local") - .put("indices.memory.interval", "1s") - ); - - client().admin().indices().prepareCreate("test1") - .setSettings(ImmutableSettings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - ).get(); - - ensureGreen(); - - final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); - - client().admin().indices().prepareCreate("test2") - .setSettings(ImmutableSettings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - ).get(); - - ensureGreen(); - - final InternalIndexShard shard2 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0); - final long expectedShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes() / 2; - - boolean success = awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expectedShardSize && - ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expectedShardSize; - } - }); - - if (!success) { - fail("failed to update shard indexing buffer size. expected [" + expectedShardSize + "] shard1 [" + - ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" + - ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" - ); - } - - } -} diff --git a/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java new file mode 100644 index 00000000000..c16d75ed530 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.memory; + +import com.google.common.base.Predicate; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.internal.InternalEngine; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + + +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) +public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest { + + @Test + public void testIndexBufferSizeUpdateAfterCreationRemoval() throws InterruptedException { + + createNode(ImmutableSettings.EMPTY); + + prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); + + ensureGreen(); + + final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); + + prepareCreate("test2").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); + + ensureGreen(); + + final InternalIndexShard shard2 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0); + final long expected1ShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes(); + final long expected2ShardsSize = expected1ShardSize / 2; + + boolean success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expected2ShardsSize && + ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expected2ShardsSize; + } + }); + + if (!success) { + fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" + + ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() + "]" + ); + } + + client().admin().indices().prepareDelete("test2").get(); + success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() >= expected1ShardSize; + } + }); + + if (!success) { + fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + + } + + @Test + public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException { + + createNode(ImmutableSettings.builder().put("indices.memory.shard_inactive_time", "100ms").build()); + + prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); + + ensureGreen(); + + final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); + boolean success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + } + }); + if (!success) { + fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + + index("test1", "type", "1", "f", 1); + + success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() > Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + } + }); + if (!success) { + fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + + flush(); // clean translogs + + success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + } + }); + if (!success) { + fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + } + + private void createNode(Settings settings) { + internalCluster().startNode(ImmutableSettings.builder() + .put(ClusterName.SETTING, "IndexingMemoryControllerTests") + .put("node.name", "IndexingMemoryControllerTests") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(EsExecutors.PROCESSORS, 1) // limit the number of threads created + .put("http.enabled", false) + .put("index.store.type", "ram") + .put("config.ignore_system_properties", true) // make sure we get what we set :) + .put("gateway.type", "none") + .put("indices.memory.interval", "100ms") + .put(settings) + ); + } +}