From 38d8e3ccc2a8b5df63284fddb0450fead91219d1 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 16 Jul 2014 16:01:03 +0200 Subject: [PATCH] [Infra] remove indicesLifecycle.Listener from IndexingMemoryController The IndexingMemoryController determines the amount of indexing buffer size and translog buffer size each shard should have. It takes memory from inactive shards (indexing wise) and assigns it to other shards. To do so it needs to know about the addition and closing of shards. The current implementation hooks into the indicesService.indicesLifecycle() mechanism to receive call backs, such shard entered the POST_RECOVERY state. Those call backs are typically run on the thread that actually made the change. A mutex was used to synchronize those callbacks with IndexingMemoryController's background thread, which updates the internal engines memory usage on a regular interval. This introduced a dependency between those threads and the locks of the internal engines hosted on the node. In a *very* rare situation (two tests runs locally) this can cause recovery time outs where two nodes are recovering replicas from each other. This commit introduces a a lock free approach that updates the internal data structures during iterations in the background thread. Closes #6892 --- .../memory/IndexingMemoryController.java | 312 +++++++++--------- .../memory/IndexMemoryControllerTests.java | 80 ----- .../memory/IndexingMemoryControllerTests.java | 155 +++++++++ 3 files changed, 320 insertions(+), 227 deletions(-) delete mode 100644 src/test/java/org/elasticsearch/indices/memory/IndexMemoryControllerTests.java create mode 100644 src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index e081e31e636..47029247a8e 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -20,7 +20,6 @@ package org.elasticsearch.indices.memory; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -37,16 +36,12 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -66,16 +61,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent shardsIndicesStatus = Maps.newHashMap(); private volatile ScheduledFuture scheduler; - private final Object mutex = new Object(); - private static final EnumSet CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); @Inject @@ -137,14 +125,12 @@ public class IndexingMemoryController extends AbstractLifecycleComponent shardsIndicesStatus = new HashMap<>(); + + @Override public void run() { - synchronized (mutex) { - boolean activeInactiveStatusChanges = false; - List activeToInactiveIndexingShards = Lists.newArrayList(); - List inactiveToActiveIndexingShards = Lists.newArrayList(); - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - long time = threadPool.estimatedTimeInMillis(); - Translog translog = ((InternalIndexShard) indexShard).translog(); - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null) { // not added yet + EnumSet changes = EnumSet.noneOf(ShardStatusChangeType.class); + + changes.addAll(purgeDeletedAndClosedShards()); + + final List activeToInactiveIndexingShards = Lists.newArrayList(); + final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards); + for (IndexShard indexShard : activeToInactiveIndexingShards) { + // update inactive indexing buffer size + try { + ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER); + ((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER); + } catch (EngineClosedException e) { + // ignore + } catch (FlushNotAllowedEngineException e) { + // ignore + } + } + if (!changes.isEmpty()) { + calcAndSetShardBuffers(activeShards, "[" + changes + "]"); + } + } + + /** + * goes through all existing shards and check whether the changes their active status + * + * @return the current count of active shards + */ + private int updateShardStatuses(EnumSet changes, List activeToInactiveIndexingShards) { + int activeShards = 0; + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + + if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) { + // not ready to be updated yet. + continue; + } + + final long time = threadPool.estimatedTimeInMillis(); + + Translog translog = ((InternalIndexShard) indexShard).translog(); + ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); + if (status == null) { + status = new ShardIndexingStatus(); + shardsIndicesStatus.put(indexShard.shardId(), status); + changes.add(ShardStatusChangeType.ADDED); + } + // check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time) + if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) { + if (status.time == -1) { // first time + status.time = time; + } + // inactive? + if (status.activeIndexing) { + // mark it as inactive only if enough time has passed and there are no ongoing merges going on... + if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) { + // inactive for this amount of time, mark it + activeToInactiveIndexingShards.add(indexShard); + status.activeIndexing = false; + changes.add(ShardStatusChangeType.BECAME_INACTIVE); + logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER); + } + } + } else { + if (!status.activeIndexing) { + status.activeIndexing = true; + changes.add(ShardStatusChangeType.BECAME_ACTIVE); + logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); + } + status.time = -1; + } + status.translogId = translog.currentId(); + status.translogNumberOfOperations = translog.estimatedNumberOfOperations(); + + if (status.activeIndexing) { + activeShards++; + } + } + } + return activeShards; + } + + /** + * purge any existing statuses that are no longer updated + * + * @return true if any change + */ + private EnumSet purgeDeletedAndClosedShards() { + EnumSet changes = EnumSet.noneOf(ShardStatusChangeType.class); + + Iterator statusShardIdIterator = shardsIndicesStatus.keySet().iterator(); + while (statusShardIdIterator.hasNext()) { + ShardId statusShardId = statusShardIdIterator.next(); + IndexService indexService = indicesService.indexService(statusShardId.getIndex()); + boolean remove = false; + try { + if (indexService == null) { + remove = true; + continue; + } + IndexShard indexShard = indexService.shard(statusShardId.id()); + if (indexShard == null) { + remove = true; + continue; + } + remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state()); + + } finally { + if (remove) { + changes.add(ShardStatusChangeType.DELETED); + statusShardIdIterator.remove(); + } + } + } + return changes; + } + + private void calcAndSetShardBuffers(int activeShards, String reason) { + if (activeShards == 0) { + return; + } + ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShards); + if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) { + shardIndexingBufferSize = minShardIndexBufferSize; + } + if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) { + shardIndexingBufferSize = maxShardIndexBufferSize; + } + + ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShards); + if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) { + shardTranslogBufferSize = minShardTranslogBufferSize; + } + if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) { + shardTranslogBufferSize = maxShardTranslogBufferSize; + } + + logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize); + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + IndexShardState state = indexShard.state(); + if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) { + logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state); + continue; + } + ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); + if (status == null || status.activeIndexing) { + try { + ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize); + ((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize); + } catch (EngineClosedException e) { + // ignore continue; + } catch (FlushNotAllowedEngineException e) { + // ignore + continue; + } catch (Exception e) { + logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize); } - // check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time) - if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) { - if (status.time == -1) { // first time - status.time = time; - } - // inactive? - if (!status.inactiveIndexing) { - // mark it as inactive only if enough time has passed and there are no ongoing merges going on... - if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) { - // inactive for this amount of time, mark it - activeToInactiveIndexingShards.add(indexShard); - status.inactiveIndexing = true; - activeInactiveStatusChanges = true; - logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER); - } - } - } else { - if (status.inactiveIndexing) { - inactiveToActiveIndexingShards.add(indexShard); - status.inactiveIndexing = false; - activeInactiveStatusChanges = true; - logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); - } - status.time = -1; - } - status.translogId = translog.currentId(); - status.translogNumberOfOperations = translog.estimatedNumberOfOperations(); - } - } - for (IndexShard indexShard : activeToInactiveIndexingShards) { - // update inactive indexing buffer size - try { - ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER); - ((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER); - } catch (EngineClosedException e) { - // ignore - } catch (FlushNotAllowedEngineException e) { - // ignore - } - } - boolean shardsRecoveredOrDeleted = IndexingMemoryController.this.shardsRecoveredOrDeleted.compareAndSet(true, false); - if (shardsRecoveredOrDeleted || activeInactiveStatusChanges) { - calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] recovered/deleted[" + shardsRecoveredOrDeleted + "]"); - } - } - } - } - - class Listener extends IndicesLifecycle.Listener { - - @Override - public void afterIndexShardPostRecovery(IndexShard indexShard) { - synchronized (mutex) { - shardsIndicesStatus.put(indexShard.shardId(), new ShardIndexingStatus()); - shardsRecoveredOrDeleted.set(true); - } - } - - @Override - public void afterIndexShardClosed(ShardId shardId) { - synchronized (mutex) { - shardsIndicesStatus.remove(shardId); - shardsRecoveredOrDeleted.set(true); - } - } - } - - - private void calcAndSetShardBuffers(String reason) { - int shardsCount = countActiveShards(); - if (shardsCount == 0) { - return; - } - ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / shardsCount); - if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) { - shardIndexingBufferSize = minShardIndexBufferSize; - } - if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) { - shardIndexingBufferSize = maxShardIndexBufferSize; - } - - ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / shardsCount); - if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) { - shardTranslogBufferSize = minShardTranslogBufferSize; - } - if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) { - shardTranslogBufferSize = maxShardTranslogBufferSize; - } - - logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize); - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - IndexShardState state = indexShard.state(); - if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) { - logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state); - continue; - } - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null || !status.inactiveIndexing) { - try { - ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize); - ((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize); - } catch (EngineClosedException e) { - // ignore - continue; - } catch (FlushNotAllowedEngineException e) { - // ignore - continue; - } catch (Exception e) { - logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize); } } } } } - private int countActiveShards() { - int shardsCount = 0; - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null || !status.inactiveIndexing) { - shardsCount++; - } - } - } - return shardsCount; + private static enum ShardStatusChangeType { + ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE } + static class ShardIndexingStatus { long translogId = -1; int translogNumberOfOperations = -1; - boolean inactiveIndexing = false; + boolean activeIndexing = true; long time = -1; // contains the first time we saw this shard with no operations done on it } } diff --git a/src/test/java/org/elasticsearch/indices/memory/IndexMemoryControllerTests.java b/src/test/java/org/elasticsearch/indices/memory/IndexMemoryControllerTests.java deleted file mode 100644 index 349f723cda9..00000000000 --- a/src/test/java/org/elasticsearch/indices/memory/IndexMemoryControllerTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.memory; - -import com.google.common.base.Predicate; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.index.engine.internal.InternalEngine; -import org.elasticsearch.index.shard.service.InternalIndexShard; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.junit.Test; - - -@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) -public class IndexMemoryControllerTests extends ElasticsearchIntegrationTest { - - @Test - public void testIndexBufferSizeUpdateAfterShardCreation() throws InterruptedException { - - internalCluster().startNode(ImmutableSettings.builder() - .put("http.enabled", "false") - .put("discovery.type", "local") - .put("indices.memory.interval", "1s") - ); - - client().admin().indices().prepareCreate("test1") - .setSettings(ImmutableSettings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - ).get(); - - ensureGreen(); - - final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); - - client().admin().indices().prepareCreate("test2") - .setSettings(ImmutableSettings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - ).get(); - - ensureGreen(); - - final InternalIndexShard shard2 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0); - final long expectedShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes() / 2; - - boolean success = awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expectedShardSize && - ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expectedShardSize; - } - }); - - if (!success) { - fail("failed to update shard indexing buffer size. expected [" + expectedShardSize + "] shard1 [" + - ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" + - ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" - ); - } - - } -} diff --git a/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java new file mode 100644 index 00000000000..c16d75ed530 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.memory; + +import com.google.common.base.Predicate; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.internal.InternalEngine; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + + +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) +public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest { + + @Test + public void testIndexBufferSizeUpdateAfterCreationRemoval() throws InterruptedException { + + createNode(ImmutableSettings.EMPTY); + + prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); + + ensureGreen(); + + final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); + + prepareCreate("test2").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); + + ensureGreen(); + + final InternalIndexShard shard2 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0); + final long expected1ShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes(); + final long expected2ShardsSize = expected1ShardSize / 2; + + boolean success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expected2ShardsSize && + ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expected2ShardsSize; + } + }); + + if (!success) { + fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" + + ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() + "]" + ); + } + + client().admin().indices().prepareDelete("test2").get(); + success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() >= expected1ShardSize; + } + }); + + if (!success) { + fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + + } + + @Test + public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException { + + createNode(ImmutableSettings.builder().put("indices.memory.shard_inactive_time", "100ms").build()); + + prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); + + ensureGreen(); + + final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); + boolean success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + } + }); + if (!success) { + fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + + index("test1", "type", "1", "f", 1); + + success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() > Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + } + }); + if (!success) { + fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + + flush(); // clean translogs + + success = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + } + }); + if (!success) { + fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + + ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]" + ); + } + } + + private void createNode(Settings settings) { + internalCluster().startNode(ImmutableSettings.builder() + .put(ClusterName.SETTING, "IndexingMemoryControllerTests") + .put("node.name", "IndexingMemoryControllerTests") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(EsExecutors.PROCESSORS, 1) // limit the number of threads created + .put("http.enabled", false) + .put("index.store.type", "ram") + .put("config.ignore_system_properties", true) // make sure we get what we set :) + .put("gateway.type", "none") + .put("indices.memory.interval", "100ms") + .put(settings) + ); + } +}