Internal: an inactive shard is temporarily activated by triggered synced flush

When a shard becomes in active we trigger a sync flush in order to speed up future recoveries. The sync flush causes a new translog generation to be made, which in turn confuses the IndexingMemoryController making it think that the shard is active. If no documents comes along in the next 5m, the shard is made inactive again , triggering a sync flush and so forth.

To avoid this, the IndexingMemoryController is changed to ignore empty translogs when checking if a shard became active. This comes with the price of potentially missing indexing operations which are followed by a flush. This is acceptable as if no more index operation come in, it's OK to leave the shard in active.

A new unit test is introduced and comparable integration tests are removed.

Closes #13802
This commit is contained in:
Boaz Leskes 2015-09-25 08:27:47 +02:00
parent ebe02ec54a
commit 148265bd16
3 changed files with 512 additions and 214 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.memory;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -101,8 +102,15 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
private final ShardsIndicesStatusChecker statusChecker;
@Inject
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
}
// for testing
protected IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) {
super(settings);
this.threadPool = threadPool;
this.indicesService = indicesService;
@ -111,7 +119,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
String indexingBufferSetting = this.settings.get(INDEX_BUFFER_SIZE_SETTING, "10%");
if (indexingBufferSetting.endsWith("%")) {
double percent = Double.parseDouble(indexingBufferSetting.substring(0, indexingBufferSetting.length() - 1));
indexingBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) * (percent / 100)));
indexingBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100)));
ByteSizeValue minIndexingBuffer = this.settings.getAsBytesSize(MIN_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(48, ByteSizeUnit.MB));
ByteSizeValue maxIndexingBuffer = this.settings.getAsBytesSize(MAX_INDEX_BUFFER_SIZE_SETTING, null);
@ -133,7 +141,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
String translogBufferSetting = this.settings.get(TRANSLOG_BUFFER_SIZE_SETTING, "1%");
if (translogBufferSetting.endsWith("%")) {
double percent = Double.parseDouble(translogBufferSetting.substring(0, translogBufferSetting.length() - 1));
translogBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) * (percent / 100)));
translogBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100)));
ByteSizeValue minTranslogBuffer = this.settings.getAsBytesSize(MIN_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.KB));
ByteSizeValue maxTranslogBuffer = this.settings.getAsBytesSize(MAX_TRANSLOG_BUFFER_SIZE_SETTING, null);
@ -144,7 +152,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
translogBuffer = maxTranslogBuffer;
}
} else {
translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, null);
translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, TRANSLOG_BUFFER_SIZE_SETTING);
}
this.translogBuffer = translogBuffer;
this.minShardTranslogBufferSize = this.settings.getAsBytesSize(MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(2, ByteSizeUnit.KB));
@ -154,6 +162,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
// we need to have this relatively small to move a shard from inactive to active fast (enough)
this.interval = this.settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30));
this.statusChecker = new ShardsIndicesStatusChecker();
logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]",
this.indexingBuffer,
MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
@ -165,7 +176,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
@Override
protected void doStart() {
// its fine to run it on the scheduler thread, no busy work
this.scheduler = threadPool.scheduleWithFixedDelay(new ShardsIndicesStatusChecker(), interval);
this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval);
}
@Override
@ -186,6 +197,90 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return indexingBuffer;
}
/**
* returns the current budget for the total amount of translog buffers of
* active shards on this node
*/
public ByteSizeValue translogBufferSize() {
return translogBuffer;
}
protected List<ShardId> availableShards() {
ArrayList<ShardId> list = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (shardAvailable(indexShard)) {
list.add(indexShard.shardId());
}
}
}
return list;
}
/** returns true if shard exists and is availabe for updates */
protected boolean shardAvailable(ShardId shardId) {
return shardAvailable(getShard(shardId));
}
/** returns true if shard exists and is availabe for updates */
protected boolean shardAvailable(@Nullable IndexShard shard) {
// shadow replica doesn't have an indexing buffer
return shard != null && shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());
}
/** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */
protected IndexShard getShard(ShardId shardId) {
IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
IndexShard indexShard = indexService.shard(shardId.id());
return indexShard;
}
return null;
}
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
final IndexShard shard = getShard(shardId);
if (shard != null) {
try {
shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize);
} catch (EngineClosedException e) {
// ignore
} catch (FlushNotAllowedEngineException e) {
// ignore
} catch (Exception e) {
logger.warn("failed to set shard {} index buffer to [{}]", shardId, shardIndexingBufferSize);
}
}
}
/** returns the current translog status (generation id + ops) for the given shard id. Returns null if unavailable. */
protected ShardIndexingStatus getTranslogStatus(ShardId shardId) {
final IndexShard indexShard = getShard(shardId);
if (indexShard == null) {
return null;
}
final Translog translog;
try {
translog = indexShard.engine().getTranslog();
} catch (EngineClosedException e) {
// not ready yet to be checked for activity
return null;
}
ShardIndexingStatus status = new ShardIndexingStatus();
status.translogId = translog.currentFileGeneration();
status.translogNumberOfOperations = translog.totalOperations();
return status;
}
// used for tests
void forceCheck() {
statusChecker.run();
}
class ShardsIndicesStatusChecker implements Runnable {
private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = new HashMap<>();
@ -194,19 +289,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
public void run() {
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();
final List<IndexShard> activeToInactiveIndexingShards = new ArrayList<>();
final List<ShardId> activeToInactiveIndexingShards = new ArrayList<>();
final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards);
for (IndexShard indexShard : activeToInactiveIndexingShards) {
// update inactive indexing buffer size
try {
indexShard.markAsInactive();
} catch (EngineClosedException e) {
// ignore
logger.trace("ignore EngineClosedException while marking shard [{}][{}] as inactive", indexShard.shardId().index().name(), indexShard.shardId().id());
} catch (FlushNotAllowedEngineException e) {
// ignore
logger.trace("ignore FlushNotAllowedException while marking shard [{}][{}] as inactive", indexShard.shardId().index().name(), indexShard.shardId().id());
}
for (ShardId indexShard : activeToInactiveIndexingShards) {
markShardAsInactive(indexShard);
}
if (changes.isEmpty() == false) {
@ -220,70 +306,42 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
*
* @return the current count of active shards
*/
private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<IndexShard> activeToInactiveIndexingShards) {
private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<ShardId> activeToInactiveIndexingShards) {
int activeShards = 0;
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
for (ShardId shardId : availableShards()) {
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) {
// not ready to be updated yet
final ShardIndexingStatus currentStatus = getTranslogStatus(shardId);
if (currentStatus == null) {
// shard was closed..
continue;
}
if (indexShard.canIndex() == false) {
// shadow replica doesn't have an indexing buffer
continue;
}
final Translog translog;
try {
translog = indexShard.engine().getTranslog();
} catch (EngineClosedException e) {
// not ready yet to be checked for activity
continue;
}
final long timeMS = threadPool.estimatedTimeInMillis();
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
ShardIndexingStatus status = shardsIndicesStatus.get(shardId);
if (status == null) {
status = new ShardIndexingStatus();
shardsIndicesStatus.put(indexShard.shardId(), status);
status = currentStatus;
shardsIndicesStatus.put(shardId, status);
changes.add(ShardStatusChangeType.ADDED);
}
// consider shard inactive if it has same translogFileGeneration and no operations for a long time
if (status.translogId == translog.currentFileGeneration() && translog.totalOperations() == status.translogNumberOfOperations) {
if (status.timeMS == -1) {
// first time we noticed the shard become idle
status.timeMS = timeMS;
}
// mark it as inactive only if enough time has passed
if (status.activeIndexing && (timeMS - status.timeMS) > inactiveTime.millis()) {
// inactive for this amount of time, mark it
activeToInactiveIndexingShards.add(indexShard);
status.activeIndexing = false;
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]",
indexShard.shardId().index().name(), indexShard.shardId().id(),
inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
}
} else {
if (!status.activeIndexing) {
status.activeIndexing = true;
final boolean lastActiveIndexing = status.activeIndexing;
status.updateWith(currentTimeInNanos(), currentStatus, inactiveTime.nanos());
if (lastActiveIndexing && (status.activeIndexing == false)) {
activeToInactiveIndexingShards.add(shardId);
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise, setting size to [{}]",
shardId,
inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
} else if ((lastActiveIndexing == false) && status.activeIndexing) {
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id());
logger.debug("marking shard {} as active indexing wise", shardId);
}
status.timeMS = -1;
}
status.translogId = translog.currentFileGeneration();
status.translogNumberOfOperations = translog.totalOperations();
if (status.activeIndexing) {
activeShards++;
}
}
}
return activeShards;
}
@ -297,20 +355,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
Iterator<ShardId> statusShardIdIterator = shardsIndicesStatus.keySet().iterator();
while (statusShardIdIterator.hasNext()) {
ShardId statusShardId = statusShardIdIterator.next();
IndexService indexService = indicesService.indexService(statusShardId.getIndex());
boolean remove;
if (indexService == null) {
remove = true;
} else {
IndexShard indexShard = indexService.shard(statusShardId.id());
if (indexShard == null) {
remove = true;
} else {
remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state());
}
}
if (remove) {
ShardId shardId = statusShardIdIterator.next();
if (shardAvailable(shardId) == false) {
changes.add(ShardStatusChangeType.DELETED);
statusShardIdIterator.remove();
}
@ -340,29 +386,38 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
IndexShardState state = indexShard.state();
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) {
logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state);
continue;
}
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
for (ShardId shardId : availableShards()) {
ShardIndexingStatus status = shardsIndicesStatus.get(shardId);
if (status == null || status.activeIndexing) {
updateShardBuffers(shardId, shardIndexingBufferSize, shardTranslogBufferSize);
}
}
}
}
protected long currentTimeInNanos() {
return System.nanoTime();
}
// update inactive indexing buffer size
protected void markShardAsInactive(ShardId shardId) {
String ignoreReason = null;
final IndexShard shard = getShard(shardId);
if (shard != null) {
try {
indexShard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize);
shard.markAsInactive();
} catch (EngineClosedException e) {
// ignore
continue;
ignoreReason = "EngineClosedException";
} catch (FlushNotAllowedEngineException e) {
// ignore
continue;
} catch (Exception e) {
logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize);
}
}
ignoreReason = "FlushNotAllowedEngineException";
}
} else {
ignoreReason = "shard not found";
}
if (ignoreReason != null) {
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId);
}
}
@ -370,10 +425,41 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
}
private static class ShardIndexingStatus {
static class ShardIndexingStatus {
long translogId = -1;
long translogNumberOfOperations = -1;
boolean activeIndexing = true;
long timeMS = -1; // contains the first time we saw this shard with no operations done on it
long idleSinceNanoTime = -1; // contains the first time we saw this shard with no operations done on it
/** update status based on a new sample. updates all internal variables */
public void updateWith(long currentNanoTime, ShardIndexingStatus current, long inactiveNanoInterval) {
final boolean idle = (translogId == current.translogId && translogNumberOfOperations == current.translogNumberOfOperations);
if (activeIndexing && idle) {
// no indexing activity detected.
if (idleSinceNanoTime < 0) {
// first time we see this, start the clock.
idleSinceNanoTime = currentNanoTime;
} else if ((currentNanoTime - idleSinceNanoTime) > inactiveNanoInterval) {
// shard is inactive. mark it as such.
activeIndexing = false;
}
} else if (activeIndexing == false // we weren't indexing before
&& idle == false // but we do now
&& current.translogNumberOfOperations > 0 // but only if we're really sure - see note bellow
) {
// since we sync flush once a shard becomes inactive, the translog id can change, however that
// doesn't mean the an indexing operation has happened. Note that if we're really unlucky and a flush happens
// immediately after an indexing operation we may not become active immediately. The following
// indexing operation will mark the shard as active, so it's OK. If that one doesn't come, we might as well stay
// inactive
activeIndexing = true;
idleSinceNanoTime = -1;
}
translogId = current.translogId;
translogNumberOfOperations = current.translogNumberOfOperations;
}
}
}

View File

@ -22,115 +22,24 @@ package org.elasticsearch.indices.memory;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import java.util.concurrent.ExecutionException;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndexingMemoryControllerIT extends ESIntegTestCase {
@Test
public void testIndexBufferSizeUpdateAfterCreationRemoval() throws InterruptedException {
createNode(Settings.EMPTY);
prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen();
final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0);
prepareCreate("test2").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen();
final IndexShard shard2 = internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0);
final long expected1ShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes();
final long expected2ShardsSize = expected1ShardSize / 2;
boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize &&
shard2.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize
);
if (!success) {
fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" +
shard1.engine().config().getIndexingBufferSize().bytes() + "] shard2 [" +
shard2.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
client().admin().indices().prepareDelete("test2").get();
success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() >= expected1ShardSize);
if (!success) {
fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" +
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
}
@Test
public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException, ExecutionException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms").build());
prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen();
final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0);
if (randomBoolean()) {
logger.info("--> indexing some pending operations");
indexRandom(false, client().prepareIndex("test1", "type", "0").setSource("f", "0"));
}
boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
index("test1", "type", "1", "f", 1);
success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) {
fail("failed to update shard indexing buffer size due to active state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
if (randomBoolean()) {
logger.info("--> flushing translogs");
flush(); // clean translogs
}
success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
// Make sure we also pushed the tiny indexing buffer down to the underlying IndexWriter:
assertEquals(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(), getIWBufferSize("test1"));
}
private long getIWBufferSize(String indexName) {
return client().admin().indices().prepareStats(indexName).get().getTotal().getSegments().getIndexWriterMaxMemoryInBytes();
}
@Test
public void testIndexBufferSizeTwoShards() throws InterruptedException {
public void testIndexBufferPushedToEngine() throws InterruptedException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100000h",
IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb",
IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());
@ -151,14 +60,32 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
if (awaitBusy(() -> getIWBufferSize("test4") == 16*1024*1024) == false) {
fail("failed to update shard indexing buffer size for test4 index to 16 MB; got: " + getIWBufferSize("test4"));
}
client().admin().indices().prepareDelete("test4").get();
if (awaitBusy(() -> getIWBufferSize("test3") == 32 * 1024 * 1024) == false) {
fail("failed to update shard indexing buffer size for test3 index to 32 MB; got: " + getIWBufferSize("test4"));
}
}
@Test
public void testIndexBufferNotPercent() throws InterruptedException {
// #13487: Make sure you can specify non-percent sized index buffer and not hit NPE
createNode(Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb").build());
// ... and that it took:
assertEquals(32*1024*1024, internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes());
public void testInactivePushedToShard() throws InterruptedException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms",
IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms",
IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());
// Create two active indices, sharing 32 MB indexing buffer:
prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen();
index("test1", "type", "1", "f", 1);
// make shard the shard buffer was set to inactive size
final ByteSizeValue inactiveBuffer = EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER;
if (awaitBusy(() -> getIWBufferSize("test1") == inactiveBuffer.bytes()) == false) {
fail("failed to update shard indexing buffer size for test1 index to [" + inactiveBuffer + "]; got: " + getIWBufferSize("test1"));
}
}
private void createNode(Settings settings) {

View File

@ -0,0 +1,285 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.memory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class IndexingMemoryControllerTests extends ESTestCase {
static class MockController extends IndexingMemoryController {
final static ByteSizeValue INACTIVE = new ByteSizeValue(-1);
final Map<ShardId, Long> translogIds = new HashMap<>();
final Map<ShardId, Long> translogOps = new HashMap<>();
final Map<ShardId, ByteSizeValue> indexingBuffers = new HashMap<>();
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
public MockController(Settings settings) {
super(Settings.builder()
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
.put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate
.put(settings)
.build(),
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
}
public void incTranslog(ShardId shard1, int id, int ops) {
setTranslog(shard1, translogIds.get(shard1) + id, translogOps.get(shard1) + ops);
}
public void setTranslog(ShardId id, long translogId, long ops) {
translogIds.put(id, translogId);
translogOps.put(id, ops);
}
public void deleteShard(ShardId id) {
translogIds.remove(id);
translogOps.remove(id);
indexingBuffers.remove(id);
translogBuffers.remove(id);
}
public void assertActive(ShardId id) {
assertThat(indexingBuffers.get(id), not(equalTo(INACTIVE)));
assertThat(translogBuffers.get(id), not(equalTo(INACTIVE)));
}
public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) {
assertThat(indexingBuffers.get(id), equalTo(indexing));
assertThat(translogBuffers.get(id), equalTo(translog));
}
public void assertInActive(ShardId id) {
assertThat(indexingBuffers.get(id), equalTo(INACTIVE));
assertThat(translogBuffers.get(id), equalTo(INACTIVE));
}
@Override
protected long currentTimeInNanos() {
return TimeValue.timeValueSeconds(currentTimeSec).nanos();
}
@Override
protected List<ShardId> availableShards() {
return new ArrayList<>(translogIds.keySet());
}
@Override
protected boolean shardAvailable(ShardId shardId) {
return translogIds.containsKey(shardId);
}
@Override
protected void markShardAsInactive(ShardId shardId) {
indexingBuffers.put(shardId, INACTIVE);
translogBuffers.put(shardId, INACTIVE);
}
@Override
protected ShardIndexingStatus getTranslogStatus(ShardId shardId) {
if (!shardAvailable(shardId)) {
return null;
}
ShardIndexingStatus status = new ShardIndexingStatus();
status.translogId = translogIds.get(shardId);
status.translogNumberOfOperations = translogOps.get(shardId);
return status;
}
@Override
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
indexingBuffers.put(shardId, shardIndexingBufferSize);
translogBuffers.put(shardId, shardTranslogBufferSize);
}
public void incrementTimeSec(int sec) {
currentTimeSec += sec;
}
public void simulateFlush(ShardId shard) {
setTranslog(shard, translogIds.get(shard) + 1, 0);
}
}
public void testShardAdditionAndRemoval() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build());
final ShardId shard1 = new ShardId("test", 1);
controller.setTranslog(shard1, randomInt(10), randomInt(10));
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
// add another shard
final ShardId shard2 = new ShardId("test", 2);
controller.setTranslog(shard2, randomInt(10), randomInt(10));
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
// remove first shard
controller.deleteShard(shard1);
controller.forceCheck();
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
// remove second shard
controller.deleteShard(shard2);
controller.forceCheck();
// add a new one
final ShardId shard3 = new ShardId("test", 3);
controller.setTranslog(shard3, randomInt(10), randomInt(10));
controller.forceCheck();
controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
}
public void testActiveInactive() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb")
.put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "5s")
.build());
final ShardId shard1 = new ShardId("test", 1);
controller.setTranslog(shard1, 0, 0);
final ShardId shard2 = new ShardId("test", 2);
controller.setTranslog(shard2, 0, 0);
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
// index into both shards, move the clock and see that they are still active
controller.setTranslog(shard1, randomInt(2), randomInt(2) + 1);
controller.setTranslog(shard2, randomInt(2) + 1, randomInt(2));
// the controller doesn't know when the ops happened, so even if this is more
// than the inactive time the shard is still marked as active
controller.incrementTimeSec(10);
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
// index into one shard only, see other shard is made inactive correctly
controller.incTranslog(shard1, randomInt(2), randomInt(2) + 1);
controller.forceCheck(); // register what happened with the controller (shard is still active)
controller.incrementTimeSec(3); // increment but not enough
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.incrementTimeSec(3); // increment some more
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.assertInActive(shard2);
if (randomBoolean()) {
// once a shard gets inactive it will be synced flushed and a new translog generation will be made
controller.simulateFlush(shard2);
controller.forceCheck();
controller.assertInActive(shard2);
}
// index some and shard becomes immediately active
controller.incTranslog(shard2, randomInt(2), 1 + randomInt(2)); // we must make sure translog ops is never 0
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
}
public void testMinShardBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb")
.put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb")
.put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build());
assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB));
}
public void testMaxShardBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb")
.put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb")
.put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build());
assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB));
}
public void testRelativeBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%")
.build());
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB)));
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
}
public void testMinBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%")
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb")
.put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build());
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
}
public void testMaxBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%")
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb")
.put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build());
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
}
protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) {
final ShardId shard1 = new ShardId("test", 1);
controller.setTranslog(shard1, 0, 0);
final ShardId shard2 = new ShardId("test", 2);
controller.setTranslog(shard2, 0, 0);
controller.forceCheck();
controller.assertBuffers(shard1, indexBufferSize, translogBufferSize);
controller.assertBuffers(shard2, indexBufferSize, translogBufferSize);
}
}