diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index bb6f85bc0ba..dfbe32df809 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -33,7 +33,6 @@ import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; @@ -200,23 +199,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponent availableShards() { - ArrayList list = new ArrayList<>(); + protected List availableShards() { + List activeShards = new ArrayList<>(); for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - if (shardAvailable(indexShard)) { - list.add(indexShard.shardId()); + for (IndexShard shard : indexService) { + if (shardAvailable(shard)) { + activeShards.add(shard); } } } - return list; - } - - /** returns true if shard exists and is availabe for updates */ - protected boolean shardAvailable(ShardId shardId) { - return shardAvailable(getShard(shardId)); + return activeShards; } /** returns true if shard exists and is availabe for updates */ @@ -225,19 +218,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent shardWasActive = new HashMap<>(); - @Override public synchronized void run() { - EnumSet changes = purgeDeletedAndClosedShards(); - - updateShardStatuses(changes); - - if (changes.isEmpty() == false) { - // Something changed: recompute indexing buffers: - calcAndSetShardBuffers("[" + changes + "]"); - } + calcAndSetShardBuffers(); } - /** - * goes through all existing shards and check whether there are changes in their active status - */ - private void updateShardStatuses(EnumSet changes) { - for (ShardId shardId : availableShards()) { - - // Is the shard active now? - Boolean isActive = getShardActive(shardId); - - if (isActive == null) { - // shard was closed.. - continue; - } - - // Was the shard active last time we checked? - Boolean wasActive = shardWasActive.get(shardId); - if (wasActive == null) { - // First time we are seeing this shard - shardWasActive.put(shardId, isActive); - changes.add(ShardStatusChangeType.ADDED); - } else if (isActive) { - // Shard is active now - if (wasActive == false) { - // Shard became active itself, since we last checked (due to new indexing op arriving) - changes.add(ShardStatusChangeType.BECAME_ACTIVE); - logger.debug("marking shard {} as active indexing wise", shardId); - shardWasActive.put(shardId, true); - } else if (checkIdle(shardId) == Boolean.TRUE) { - // Make shard inactive now - changes.add(ShardStatusChangeType.BECAME_INACTIVE); - - shardWasActive.put(shardId, false); - } - } - } - } - - /** - * purge any existing statuses that are no longer updated - * - * @return the changes applied - */ - private EnumSet purgeDeletedAndClosedShards() { - EnumSet changes = EnumSet.noneOf(ShardStatusChangeType.class); - - Iterator statusShardIdIterator = shardWasActive.keySet().iterator(); - while (statusShardIdIterator.hasNext()) { - ShardId shardId = statusShardIdIterator.next(); - if (shardAvailable(shardId) == false) { - changes.add(ShardStatusChangeType.DELETED); - statusShardIdIterator.remove(); - } - } - return changes; - } - - private void calcAndSetShardBuffers(String reason) { - - // Count how many shards are now active: - int activeShardCount = 0; - for (Map.Entry ent : shardWasActive.entrySet()) { - if (ent.getValue()) { - activeShardCount++; + private void calcAndSetShardBuffers() { + List availableShards = availableShards(); + List activeShards = new ArrayList<>(); + for (IndexShard shard : availableShards) { + if (!checkIdle(shard)) { + activeShards.add(shard); } } + int activeShardCount = activeShards.size(); // TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard // is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not // get the same indexing buffer as large indices. But it quickly gets tricky... if (activeShardCount == 0) { - logger.debug("no active shards (reason={})", reason); + logger.debug("no active shards"); return; } @@ -372,13 +278,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent ent : shardWasActive.entrySet()) { - if (ent.getValue()) { - // This shard is active - updateShardBuffers(ent.getKey(), shardIndexingBufferSize, shardTranslogBufferSize); - } + for (IndexShard shard : activeShards) { + updateShardBuffers(shard, shardIndexingBufferSize, shardTranslogBufferSize); } } } @@ -389,14 +292,13 @@ public class IndexingMemoryController extends AbstractLifecycleComponent indexingBuffers = new HashMap<>(); - final Map translogBuffers = new HashMap<>(); + final Map indexingBuffers = new HashMap<>(); + final Map translogBuffers = new HashMap<>(); - final Map lastIndexTimeNanos = new HashMap<>(); - final Set activeShards = new HashSet<>(); + final Map lastIndexTimeNanos = new HashMap<>(); + final Set activeShards = new HashSet<>(); long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds(); public MockController(Settings settings) { super(Settings.builder() - .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it - .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate - .put(settings) - .build(), - null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb + .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it + .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate + .put(settings) + .build(), + null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb } - public void deleteShard(ShardId id) { + public void deleteShard(IndexShard id) { indexingBuffers.remove(id); translogBuffers.remove(id); } - public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) { + public void assertBuffers(IndexShard id, ByteSizeValue indexing, ByteSizeValue translog) { assertThat(indexingBuffers.get(id), equalTo(indexing)); assertThat(translogBuffers.get(id), equalTo(translog)); } - public void assertInActive(ShardId id) { + public void assertInactive(IndexShard id) { assertThat(indexingBuffers.get(id), equalTo(INACTIVE)); assertThat(translogBuffers.get(id), equalTo(INACTIVE)); } @@ -80,36 +77,31 @@ public class IndexingMemoryControllerTests extends ESTestCase { } @Override - protected List availableShards() { + protected List availableShards() { return new ArrayList<>(indexingBuffers.keySet()); } @Override - protected boolean shardAvailable(ShardId shardId) { - return indexingBuffers.containsKey(shardId); + protected boolean shardAvailable(IndexShard shard) { + return indexingBuffers.containsKey(shard); } @Override - protected Boolean getShardActive(ShardId shardId) { - return activeShards.contains(shardId); + protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + indexingBuffers.put(shard, shardIndexingBufferSize); + translogBuffers.put(shard, shardTranslogBufferSize); } @Override - protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { - indexingBuffers.put(shardId, shardIndexingBufferSize); - translogBuffers.put(shardId, shardTranslogBufferSize); - } - - @Override - protected Boolean checkIdle(ShardId shardId) { + protected Boolean checkIdle(IndexShard shard) { final TimeValue inactiveTime = settings.getAsTime(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)); - Long ns = lastIndexTimeNanos.get(shardId); + Long ns = lastIndexTimeNanos.get(shard); if (ns == null) { return null; } else if (currentTimeInNanos() - ns >= inactiveTime.nanos()) { - indexingBuffers.put(shardId, INACTIVE); - translogBuffers.put(shardId, INACTIVE); - activeShards.remove(shardId); + indexingBuffers.put(shard, INACTIVE); + translogBuffers.put(shard, INACTIVE); + activeShards.remove(shard); return true; } else { return false; @@ -120,118 +112,126 @@ public class IndexingMemoryControllerTests extends ESTestCase { currentTimeSec += sec; } - public void simulateIndexing(ShardId shardId) { - lastIndexTimeNanos.put(shardId, currentTimeInNanos()); - if (indexingBuffers.containsKey(shardId) == false) { + public void simulateIndexing(IndexShard shard) { + lastIndexTimeNanos.put(shard, currentTimeInNanos()); + if (indexingBuffers.containsKey(shard) == false) { // First time we are seeing this shard; start it off with inactive buffers as IndexShard does: - indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER); - translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); + indexingBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER); + translogBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); } - activeShards.add(shardId); + activeShards.add(shard); forceCheck(); } } public void testShardAdditionAndRemoval() { + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); - final ShardId shard1 = new ShardId("test", 1); - controller.simulateIndexing(shard1); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); + IndexShard shard0 = test.getShard(0); + controller.simulateIndexing(shard0); + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K // add another shard - final ShardId shard2 = new ShardId("test", 2); - controller.simulateIndexing(shard2); + IndexShard shard1 = test.getShard(1); + controller.simulateIndexing(shard1); + controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); // remove first shard - controller.deleteShard(shard1); + controller.deleteShard(shard0); controller.forceCheck(); - controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K // remove second shard - controller.deleteShard(shard2); + controller.deleteShard(shard1); controller.forceCheck(); // add a new one - final ShardId shard3 = new ShardId("test", 3); - controller.simulateIndexing(shard3); - controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + IndexShard shard2 = test.getShard(2); + controller.simulateIndexing(shard2); + controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K } public void testActiveInactive() { - MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") - .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s") - .build()); + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); - final ShardId shard1 = new ShardId("test", 1); + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") + .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s") + .build()); + + IndexShard shard0 = test.getShard(0); + controller.simulateIndexing(shard0); + IndexShard shard1 = test.getShard(1); controller.simulateIndexing(shard1); - final ShardId shard2 = new ShardId("test", 2); - controller.simulateIndexing(shard2); + controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); // index into both shards, move the clock and see that they are still active + controller.simulateIndexing(shard0); controller.simulateIndexing(shard1); - controller.simulateIndexing(shard2); controller.incrementTimeSec(10); controller.forceCheck(); // both shards now inactive - controller.assertInActive(shard1); - controller.assertInActive(shard2); + controller.assertInactive(shard0); + controller.assertInactive(shard1); // index into one shard only, see it becomes active - controller.simulateIndexing(shard1); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); - controller.assertInActive(shard2); + controller.simulateIndexing(shard0); + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInactive(shard1); controller.incrementTimeSec(3); // increment but not enough to become inactive controller.forceCheck(); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); - controller.assertInActive(shard2); + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInactive(shard1); controller.incrementTimeSec(3); // increment some more controller.forceCheck(); - controller.assertInActive(shard1); - controller.assertInActive(shard2); + controller.assertInactive(shard0); + controller.assertInactive(shard1); // index some and shard becomes immediately active - controller.simulateIndexing(shard2); - controller.assertInActive(shard1); - controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.simulateIndexing(shard1); + controller.assertInactive(shard0); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); } public void testMinShardBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") - .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB)); } public void testMaxShardBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") - .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") - .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") + .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB)); } public void testRelativeBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") - .build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") + .build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB))); assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); @@ -240,10 +240,10 @@ public class IndexingMemoryControllerTests extends ESTestCase { public void testMinBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") - .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); @@ -251,23 +251,24 @@ public class IndexingMemoryControllerTests extends ESTestCase { public void testMaxBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") - .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); } protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) { - final ShardId shard1 = new ShardId("test", 1); + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + IndexShard shard0 = test.getShard(0); + controller.simulateIndexing(shard0); + IndexShard shard1 = test.getShard(1); controller.simulateIndexing(shard1); - final ShardId shard2 = new ShardId("test", 2); - controller.simulateIndexing(shard2); + controller.assertBuffers(shard0, indexBufferSize, translogBufferSize); controller.assertBuffers(shard1, indexBufferSize, translogBufferSize); - controller.assertBuffers(shard2, indexBufferSize, translogBufferSize); - } - }