From 5341404f014fbfd0c0b67c61546df38625d9b4ad Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 4 Dec 2015 15:01:47 -0500 Subject: [PATCH] IndexingMemoryController should not track shard index states This commit modifies IndexingMemoryController to be stateless. Rather than statefully tracking the indexing status of shards, IndexingMemoryController can grab all available shards, check their idle state, and then resize the buffers based on the number of and which shards are not idle. The driver for this change is a performance regression that can arise in some scenarios after #13918. One scenario under which this performance regression can arise is if an index is deleted and then created again. Because IndexingMemoryController was previously statefully tracking the state of shards via a map of ShardIds, the new shards with the same ShardIds as previously existing shards would not be detected and therefore their version maps would never be resized from the defaults. This led to an explosion in the number of merges causing a degradation in performance. Closes #15225 --- .../memory/IndexingMemoryController.java | 150 ++----------- .../memory/IndexingMemoryControllerTests.java | 211 +++++++++--------- 2 files changed, 130 insertions(+), 231 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index bb6f85bc0ba..dfbe32df809 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -33,7 +33,6 @@ import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; @@ -200,23 +199,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponent availableShards() { - ArrayList list = new ArrayList<>(); + protected List availableShards() { + List activeShards = new ArrayList<>(); for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - if (shardAvailable(indexShard)) { - list.add(indexShard.shardId()); + for (IndexShard shard : indexService) { + if (shardAvailable(shard)) { + activeShards.add(shard); } } } - return list; - } - - /** returns true if shard exists and is availabe for updates */ - protected boolean shardAvailable(ShardId shardId) { - return shardAvailable(getShard(shardId)); + return activeShards; } /** returns true if shard exists and is availabe for updates */ @@ -225,19 +218,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent shardWasActive = new HashMap<>(); - @Override public synchronized void run() { - EnumSet changes = purgeDeletedAndClosedShards(); - - updateShardStatuses(changes); - - if (changes.isEmpty() == false) { - // Something changed: recompute indexing buffers: - calcAndSetShardBuffers("[" + changes + "]"); - } + calcAndSetShardBuffers(); } - /** - * goes through all existing shards and check whether there are changes in their active status - */ - private void updateShardStatuses(EnumSet changes) { - for (ShardId shardId : availableShards()) { - - // Is the shard active now? - Boolean isActive = getShardActive(shardId); - - if (isActive == null) { - // shard was closed.. - continue; - } - - // Was the shard active last time we checked? - Boolean wasActive = shardWasActive.get(shardId); - if (wasActive == null) { - // First time we are seeing this shard - shardWasActive.put(shardId, isActive); - changes.add(ShardStatusChangeType.ADDED); - } else if (isActive) { - // Shard is active now - if (wasActive == false) { - // Shard became active itself, since we last checked (due to new indexing op arriving) - changes.add(ShardStatusChangeType.BECAME_ACTIVE); - logger.debug("marking shard {} as active indexing wise", shardId); - shardWasActive.put(shardId, true); - } else if (checkIdle(shardId) == Boolean.TRUE) { - // Make shard inactive now - changes.add(ShardStatusChangeType.BECAME_INACTIVE); - - shardWasActive.put(shardId, false); - } - } - } - } - - /** - * purge any existing statuses that are no longer updated - * - * @return the changes applied - */ - private EnumSet purgeDeletedAndClosedShards() { - EnumSet changes = EnumSet.noneOf(ShardStatusChangeType.class); - - Iterator statusShardIdIterator = shardWasActive.keySet().iterator(); - while (statusShardIdIterator.hasNext()) { - ShardId shardId = statusShardIdIterator.next(); - if (shardAvailable(shardId) == false) { - changes.add(ShardStatusChangeType.DELETED); - statusShardIdIterator.remove(); - } - } - return changes; - } - - private void calcAndSetShardBuffers(String reason) { - - // Count how many shards are now active: - int activeShardCount = 0; - for (Map.Entry ent : shardWasActive.entrySet()) { - if (ent.getValue()) { - activeShardCount++; + private void calcAndSetShardBuffers() { + List availableShards = availableShards(); + List activeShards = new ArrayList<>(); + for (IndexShard shard : availableShards) { + if (!checkIdle(shard)) { + activeShards.add(shard); } } + int activeShardCount = activeShards.size(); // TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard // is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not // get the same indexing buffer as large indices. But it quickly gets tricky... if (activeShardCount == 0) { - logger.debug("no active shards (reason={})", reason); + logger.debug("no active shards"); return; } @@ -372,13 +278,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent ent : shardWasActive.entrySet()) { - if (ent.getValue()) { - // This shard is active - updateShardBuffers(ent.getKey(), shardIndexingBufferSize, shardTranslogBufferSize); - } + for (IndexShard shard : activeShards) { + updateShardBuffers(shard, shardIndexingBufferSize, shardTranslogBufferSize); } } } @@ -389,14 +292,13 @@ public class IndexingMemoryController extends AbstractLifecycleComponent indexingBuffers = new HashMap<>(); - final Map translogBuffers = new HashMap<>(); + final Map indexingBuffers = new HashMap<>(); + final Map translogBuffers = new HashMap<>(); - final Map lastIndexTimeNanos = new HashMap<>(); - final Set activeShards = new HashSet<>(); + final Map lastIndexTimeNanos = new HashMap<>(); + final Set activeShards = new HashSet<>(); long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds(); public MockController(Settings settings) { super(Settings.builder() - .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it - .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate - .put(settings) - .build(), - null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb + .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it + .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate + .put(settings) + .build(), + null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb } - public void deleteShard(ShardId id) { + public void deleteShard(IndexShard id) { indexingBuffers.remove(id); translogBuffers.remove(id); } - public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) { + public void assertBuffers(IndexShard id, ByteSizeValue indexing, ByteSizeValue translog) { assertThat(indexingBuffers.get(id), equalTo(indexing)); assertThat(translogBuffers.get(id), equalTo(translog)); } - public void assertInActive(ShardId id) { + public void assertInactive(IndexShard id) { assertThat(indexingBuffers.get(id), equalTo(INACTIVE)); assertThat(translogBuffers.get(id), equalTo(INACTIVE)); } @@ -80,36 +77,31 @@ public class IndexingMemoryControllerTests extends ESTestCase { } @Override - protected List availableShards() { + protected List availableShards() { return new ArrayList<>(indexingBuffers.keySet()); } @Override - protected boolean shardAvailable(ShardId shardId) { - return indexingBuffers.containsKey(shardId); + protected boolean shardAvailable(IndexShard shard) { + return indexingBuffers.containsKey(shard); } @Override - protected Boolean getShardActive(ShardId shardId) { - return activeShards.contains(shardId); + protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + indexingBuffers.put(shard, shardIndexingBufferSize); + translogBuffers.put(shard, shardTranslogBufferSize); } @Override - protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { - indexingBuffers.put(shardId, shardIndexingBufferSize); - translogBuffers.put(shardId, shardTranslogBufferSize); - } - - @Override - protected Boolean checkIdle(ShardId shardId) { + protected Boolean checkIdle(IndexShard shard) { final TimeValue inactiveTime = settings.getAsTime(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)); - Long ns = lastIndexTimeNanos.get(shardId); + Long ns = lastIndexTimeNanos.get(shard); if (ns == null) { return null; } else if (currentTimeInNanos() - ns >= inactiveTime.nanos()) { - indexingBuffers.put(shardId, INACTIVE); - translogBuffers.put(shardId, INACTIVE); - activeShards.remove(shardId); + indexingBuffers.put(shard, INACTIVE); + translogBuffers.put(shard, INACTIVE); + activeShards.remove(shard); return true; } else { return false; @@ -120,118 +112,126 @@ public class IndexingMemoryControllerTests extends ESTestCase { currentTimeSec += sec; } - public void simulateIndexing(ShardId shardId) { - lastIndexTimeNanos.put(shardId, currentTimeInNanos()); - if (indexingBuffers.containsKey(shardId) == false) { + public void simulateIndexing(IndexShard shard) { + lastIndexTimeNanos.put(shard, currentTimeInNanos()); + if (indexingBuffers.containsKey(shard) == false) { // First time we are seeing this shard; start it off with inactive buffers as IndexShard does: - indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER); - translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); + indexingBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER); + translogBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); } - activeShards.add(shardId); + activeShards.add(shard); forceCheck(); } } public void testShardAdditionAndRemoval() { + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); - final ShardId shard1 = new ShardId("test", 1); - controller.simulateIndexing(shard1); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); + IndexShard shard0 = test.getShard(0); + controller.simulateIndexing(shard0); + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K // add another shard - final ShardId shard2 = new ShardId("test", 2); - controller.simulateIndexing(shard2); + IndexShard shard1 = test.getShard(1); + controller.simulateIndexing(shard1); + controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); // remove first shard - controller.deleteShard(shard1); + controller.deleteShard(shard0); controller.forceCheck(); - controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K // remove second shard - controller.deleteShard(shard2); + controller.deleteShard(shard1); controller.forceCheck(); // add a new one - final ShardId shard3 = new ShardId("test", 3); - controller.simulateIndexing(shard3); - controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + IndexShard shard2 = test.getShard(2); + controller.simulateIndexing(shard2); + controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K } public void testActiveInactive() { - MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") - .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s") - .build()); + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); - final ShardId shard1 = new ShardId("test", 1); + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") + .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s") + .build()); + + IndexShard shard0 = test.getShard(0); + controller.simulateIndexing(shard0); + IndexShard shard1 = test.getShard(1); controller.simulateIndexing(shard1); - final ShardId shard2 = new ShardId("test", 2); - controller.simulateIndexing(shard2); + controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); // index into both shards, move the clock and see that they are still active + controller.simulateIndexing(shard0); controller.simulateIndexing(shard1); - controller.simulateIndexing(shard2); controller.incrementTimeSec(10); controller.forceCheck(); // both shards now inactive - controller.assertInActive(shard1); - controller.assertInActive(shard2); + controller.assertInactive(shard0); + controller.assertInactive(shard1); // index into one shard only, see it becomes active - controller.simulateIndexing(shard1); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); - controller.assertInActive(shard2); + controller.simulateIndexing(shard0); + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInactive(shard1); controller.incrementTimeSec(3); // increment but not enough to become inactive controller.forceCheck(); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); - controller.assertInActive(shard2); + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInactive(shard1); controller.incrementTimeSec(3); // increment some more controller.forceCheck(); - controller.assertInActive(shard1); - controller.assertInActive(shard2); + controller.assertInactive(shard0); + controller.assertInactive(shard1); // index some and shard becomes immediately active - controller.simulateIndexing(shard2); - controller.assertInActive(shard1); - controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.simulateIndexing(shard1); + controller.assertInactive(shard0); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); } public void testMinShardBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") - .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB)); } public void testMaxShardBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") - .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") - .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") + .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB)); } public void testRelativeBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") - .build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") + .build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB))); assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); @@ -240,10 +240,10 @@ public class IndexingMemoryControllerTests extends ESTestCase { public void testMinBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") - .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); @@ -251,23 +251,24 @@ public class IndexingMemoryControllerTests extends ESTestCase { public void testMaxBufferSizes() { MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") - .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); } protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) { - final ShardId shard1 = new ShardId("test", 1); + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + IndexShard shard0 = test.getShard(0); + controller.simulateIndexing(shard0); + IndexShard shard1 = test.getShard(1); controller.simulateIndexing(shard1); - final ShardId shard2 = new ShardId("test", 2); - controller.simulateIndexing(shard2); + controller.assertBuffers(shard0, indexBufferSize, translogBufferSize); controller.assertBuffers(shard1, indexBufferSize, translogBufferSize); - controller.assertBuffers(shard2, indexBufferSize, translogBufferSize); - } - }