IndexingMemoryController should not track shard index states

This commit modifies IndexingMemoryController to be stateless. Rather
than statefully tracking the indexing status of shards,
IndexingMemoryController can grab all available shards, check their idle
state, and then resize the buffers based on the number of and which
shards are not idle.

The driver for this change is a performance regression that can arise in
some scenarios after #13918. One scenario under which this performance
regression can arise is if an index is deleted and then created
again. Because IndexingMemoryController was previously statefully
tracking the state of shards via a map of ShardIds, the new shards with
the same ShardIds as previously existing shards would not be detected
and therefore their version maps would never be resized from the
defaults. This led to an explosion in the number of merges causing a
degradation in performance.

Closes #15225
This commit is contained in:
Jason Tedor 2015-12-04 15:01:47 -05:00
parent 6613b3a0d6
commit 5341404f01
2 changed files with 130 additions and 231 deletions

View File

@ -33,7 +33,6 @@ import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
@ -200,23 +199,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return translogBuffer;
}
protected List<ShardId> availableShards() {
ArrayList<ShardId> list = new ArrayList<>();
protected List<IndexShard> availableShards() {
List<IndexShard> activeShards = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (shardAvailable(indexShard)) {
list.add(indexShard.shardId());
for (IndexShard shard : indexService) {
if (shardAvailable(shard)) {
activeShards.add(shard);
}
}
}
return list;
}
/** returns true if shard exists and is availabe for updates */
protected boolean shardAvailable(ShardId shardId) {
return shardAvailable(getShard(shardId));
return activeShards;
}
/** returns true if shard exists and is availabe for updates */
@ -225,19 +218,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return shard != null && shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());
}
/** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */
protected IndexShard getShard(ShardId shardId) {
IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
return indexShard;
}
return null;
}
/** set new indexing and translog buffers on this shard. this may cause the shard to refresh to free up heap. */
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
final IndexShard shard = getShard(shardId);
protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
if (shard != null) {
try {
shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize);
@ -246,113 +228,37 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
} catch (FlushNotAllowedEngineException e) {
// ignore
} catch (Exception e) {
logger.warn("failed to set shard {} index buffer to [{}]", e, shardId, shardIndexingBufferSize);
logger.warn("failed to set shard {} index buffer to [{}]", e, shard.shardId(), shardIndexingBufferSize);
}
}
}
/** returns {@link IndexShard#getActive} if the shard exists, else null */
protected Boolean getShardActive(ShardId shardId) {
final IndexShard indexShard = getShard(shardId);
if (indexShard == null) {
return null;
}
return indexShard.getActive();
}
/** check if any shards active status changed, now. */
public void forceCheck() {
statusChecker.run();
}
class ShardsIndicesStatusChecker implements Runnable {
// True if the shard was active last time we checked
private final Map<ShardId,Boolean> shardWasActive = new HashMap<>();
@Override
public synchronized void run() {
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();
updateShardStatuses(changes);
if (changes.isEmpty() == false) {
// Something changed: recompute indexing buffers:
calcAndSetShardBuffers("[" + changes + "]");
}
calcAndSetShardBuffers();
}
/**
* goes through all existing shards and check whether there are changes in their active status
*/
private void updateShardStatuses(EnumSet<ShardStatusChangeType> changes) {
for (ShardId shardId : availableShards()) {
// Is the shard active now?
Boolean isActive = getShardActive(shardId);
if (isActive == null) {
// shard was closed..
continue;
}
// Was the shard active last time we checked?
Boolean wasActive = shardWasActive.get(shardId);
if (wasActive == null) {
// First time we are seeing this shard
shardWasActive.put(shardId, isActive);
changes.add(ShardStatusChangeType.ADDED);
} else if (isActive) {
// Shard is active now
if (wasActive == false) {
// Shard became active itself, since we last checked (due to new indexing op arriving)
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
logger.debug("marking shard {} as active indexing wise", shardId);
shardWasActive.put(shardId, true);
} else if (checkIdle(shardId) == Boolean.TRUE) {
// Make shard inactive now
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
shardWasActive.put(shardId, false);
}
}
}
}
/**
* purge any existing statuses that are no longer updated
*
* @return the changes applied
*/
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
Iterator<ShardId> statusShardIdIterator = shardWasActive.keySet().iterator();
while (statusShardIdIterator.hasNext()) {
ShardId shardId = statusShardIdIterator.next();
if (shardAvailable(shardId) == false) {
changes.add(ShardStatusChangeType.DELETED);
statusShardIdIterator.remove();
}
}
return changes;
}
private void calcAndSetShardBuffers(String reason) {
// Count how many shards are now active:
int activeShardCount = 0;
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
if (ent.getValue()) {
activeShardCount++;
private void calcAndSetShardBuffers() {
List<IndexShard> availableShards = availableShards();
List<IndexShard> activeShards = new ArrayList<>();
for (IndexShard shard : availableShards) {
if (!checkIdle(shard)) {
activeShards.add(shard);
}
}
int activeShardCount = activeShards.size();
// TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard
// is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not
// get the same indexing buffer as large indices. But it quickly gets tricky...
if (activeShardCount == 0) {
logger.debug("no active shards (reason={})", reason);
logger.debug("no active shards");
return;
}
@ -372,13 +278,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
shardTranslogBufferSize = maxShardTranslogBufferSize;
}
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);
logger.debug("recalculating shard indexing buffer, total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
if (ent.getValue()) {
// This shard is active
updateShardBuffers(ent.getKey(), shardIndexingBufferSize, shardTranslogBufferSize);
}
for (IndexShard shard : activeShards) {
updateShardBuffers(shard, shardIndexingBufferSize, shardTranslogBufferSize);
}
}
}
@ -389,14 +292,13 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so. returns Boolean.TRUE if
* it did deactive, Boolean.FALSE if it did not, and null if the shard is unknown */
protected Boolean checkIdle(ShardId shardId) {
String ignoreReason; // eclipse compiler does not know it is really final
final IndexShard shard = getShard(shardId);
protected Boolean checkIdle(IndexShard shard) {
String ignoreReason = null; // eclipse compiler does not know it is really final
if (shard != null) {
try {
if (shard.checkIdle()) {
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
shardId,
shard.shardId(),
shard.getInactiveTime());
return Boolean.TRUE;
}
@ -412,15 +314,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
ignoreReason = "shard not found";
}
if (ignoreReason != null) {
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId);
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shard.shardId());
}
return null;
}
private static enum ShardStatusChangeType {
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
}
@Override
public void onShardActive(IndexShard indexShard) {
// At least one shard used to be inactive ie. a new write operation just showed up.

View File

@ -22,31 +22,28 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class IndexingMemoryControllerTests extends ESTestCase {
public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
static class MockController extends IndexingMemoryController {
final static ByteSizeValue INACTIVE = new ByteSizeValue(-1);
final Map<ShardId, ByteSizeValue> indexingBuffers = new HashMap<>();
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
final Map<IndexShard, ByteSizeValue> indexingBuffers = new HashMap<>();
final Map<IndexShard, ByteSizeValue> translogBuffers = new HashMap<>();
final Map<ShardId, Long> lastIndexTimeNanos = new HashMap<>();
final Set<ShardId> activeShards = new HashSet<>();
final Map<IndexShard, Long> lastIndexTimeNanos = new HashMap<>();
final Set<IndexShard> activeShards = new HashSet<>();
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
@ -59,17 +56,17 @@ public class IndexingMemoryControllerTests extends ESTestCase {
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
}
public void deleteShard(ShardId id) {
public void deleteShard(IndexShard id) {
indexingBuffers.remove(id);
translogBuffers.remove(id);
}
public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) {
public void assertBuffers(IndexShard id, ByteSizeValue indexing, ByteSizeValue translog) {
assertThat(indexingBuffers.get(id), equalTo(indexing));
assertThat(translogBuffers.get(id), equalTo(translog));
}
public void assertInActive(ShardId id) {
public void assertInactive(IndexShard id) {
assertThat(indexingBuffers.get(id), equalTo(INACTIVE));
assertThat(translogBuffers.get(id), equalTo(INACTIVE));
}
@ -80,36 +77,31 @@ public class IndexingMemoryControllerTests extends ESTestCase {
}
@Override
protected List<ShardId> availableShards() {
protected List<IndexShard> availableShards() {
return new ArrayList<>(indexingBuffers.keySet());
}
@Override
protected boolean shardAvailable(ShardId shardId) {
return indexingBuffers.containsKey(shardId);
protected boolean shardAvailable(IndexShard shard) {
return indexingBuffers.containsKey(shard);
}
@Override
protected Boolean getShardActive(ShardId shardId) {
return activeShards.contains(shardId);
protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
indexingBuffers.put(shard, shardIndexingBufferSize);
translogBuffers.put(shard, shardTranslogBufferSize);
}
@Override
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
indexingBuffers.put(shardId, shardIndexingBufferSize);
translogBuffers.put(shardId, shardTranslogBufferSize);
}
@Override
protected Boolean checkIdle(ShardId shardId) {
protected Boolean checkIdle(IndexShard shard) {
final TimeValue inactiveTime = settings.getAsTime(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
Long ns = lastIndexTimeNanos.get(shardId);
Long ns = lastIndexTimeNanos.get(shard);
if (ns == null) {
return null;
} else if (currentTimeInNanos() - ns >= inactiveTime.nanos()) {
indexingBuffers.put(shardId, INACTIVE);
translogBuffers.put(shardId, INACTIVE);
activeShards.remove(shardId);
indexingBuffers.put(shard, INACTIVE);
translogBuffers.put(shard, INACTIVE);
activeShards.remove(shard);
return true;
} else {
return false;
@ -120,91 +112,99 @@ public class IndexingMemoryControllerTests extends ESTestCase {
currentTimeSec += sec;
}
public void simulateIndexing(ShardId shardId) {
lastIndexTimeNanos.put(shardId, currentTimeInNanos());
if (indexingBuffers.containsKey(shardId) == false) {
public void simulateIndexing(IndexShard shard) {
lastIndexTimeNanos.put(shard, currentTimeInNanos());
if (indexingBuffers.containsKey(shard) == false) {
// First time we are seeing this shard; start it off with inactive buffers as IndexShard does:
indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
indexingBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
translogBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
}
activeShards.add(shardId);
activeShards.add(shard);
forceCheck();
}
}
public void testShardAdditionAndRemoval() {
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build());
final ShardId shard1 = new ShardId("test", 1);
controller.simulateIndexing(shard1);
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
IndexShard shard0 = test.getShard(0);
controller.simulateIndexing(shard0);
controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
// add another shard
final ShardId shard2 = new ShardId("test", 2);
controller.simulateIndexing(shard2);
IndexShard shard1 = test.getShard(1);
controller.simulateIndexing(shard1);
controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
// remove first shard
controller.deleteShard(shard1);
controller.deleteShard(shard0);
controller.forceCheck();
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
// remove second shard
controller.deleteShard(shard2);
controller.deleteShard(shard1);
controller.forceCheck();
// add a new one
final ShardId shard3 = new ShardId("test", 3);
controller.simulateIndexing(shard3);
controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
IndexShard shard2 = test.getShard(2);
controller.simulateIndexing(shard2);
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
}
public void testActiveInactive() {
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb")
.put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s")
.build());
final ShardId shard1 = new ShardId("test", 1);
IndexShard shard0 = test.getShard(0);
controller.simulateIndexing(shard0);
IndexShard shard1 = test.getShard(1);
controller.simulateIndexing(shard1);
final ShardId shard2 = new ShardId("test", 2);
controller.simulateIndexing(shard2);
controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
// index into both shards, move the clock and see that they are still active
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard1);
controller.simulateIndexing(shard2);
controller.incrementTimeSec(10);
controller.forceCheck();
// both shards now inactive
controller.assertInActive(shard1);
controller.assertInActive(shard2);
controller.assertInactive(shard0);
controller.assertInactive(shard1);
// index into one shard only, see it becomes active
controller.simulateIndexing(shard1);
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.assertInActive(shard2);
controller.simulateIndexing(shard0);
controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.assertInactive(shard1);
controller.incrementTimeSec(3); // increment but not enough to become inactive
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.assertInActive(shard2);
controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.assertInactive(shard1);
controller.incrementTimeSec(3); // increment some more
controller.forceCheck();
controller.assertInActive(shard1);
controller.assertInActive(shard2);
controller.assertInactive(shard0);
controller.assertInactive(shard1);
// index some and shard becomes immediately active
controller.simulateIndexing(shard2);
controller.assertInActive(shard1);
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.simulateIndexing(shard1);
controller.assertInactive(shard0);
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
}
public void testMinShardBufferSizes() {
@ -261,13 +261,14 @@ public class IndexingMemoryControllerTests extends ESTestCase {
}
protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) {
final ShardId shard1 = new ShardId("test", 1);
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
IndexShard shard0 = test.getShard(0);
controller.simulateIndexing(shard0);
IndexShard shard1 = test.getShard(1);
controller.simulateIndexing(shard1);
final ShardId shard2 = new ShardId("test", 2);
controller.simulateIndexing(shard2);
controller.assertBuffers(shard0, indexBufferSize, translogBufferSize);
controller.assertBuffers(shard1, indexBufferSize, translogBufferSize);
controller.assertBuffers(shard2, indexBufferSize, translogBufferSize);
}
}