Internal: pending operations in the translog prevent shard from being marked as inactive

The IndexingMemoryController checks periodically if there is any indexing activity on the shard. If no activity is sean for 5m (default) the shard is marked as inactive allowing it's indexing buffer quota to given to other active shards.

Sadly the current check is bad as it checks for 0 translog operation. This makes the inactive wait for a flush to happen - which used to take 30m and since #13707 doesn't happen at all (as we rely on the synced flush triggered by inactivity). This commit fixes the check so it will work with any translog size.

Closes #13759
This commit is contained in:
Boaz Leskes 2015-09-24 09:26:12 +02:00
parent 6cbf2de592
commit d121550a4f
2 changed files with 18 additions and 11 deletions

View File

@ -38,12 +38,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList; import java.util.*;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
/** /**
@ -258,7 +253,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
} }
// consider shard inactive if it has same translogFileGeneration and no operations for a long time // consider shard inactive if it has same translogFileGeneration and no operations for a long time
if (status.translogId == translog.currentFileGeneration() && translog.totalOperations() == 0) { if (status.translogId == translog.currentFileGeneration() && translog.totalOperations() == status.translogNumberOfOperations) {
if (status.timeMS == -1) { if (status.timeMS == -1) {
// first time we noticed the shard become idle // first time we noticed the shard become idle
status.timeMS = timeMS; status.timeMS = timeMS;
@ -282,6 +277,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
status.timeMS = -1; status.timeMS = -1;
} }
status.translogId = translog.currentFileGeneration(); status.translogId = translog.currentFileGeneration();
status.translogNumberOfOperations = translog.totalOperations();
if (status.activeIndexing) { if (status.activeIndexing) {
activeShards++; activeShards++;
@ -376,6 +372,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private static class ShardIndexingStatus { private static class ShardIndexingStatus {
long translogId = -1; long translogId = -1;
long translogNumberOfOperations = -1;
boolean activeIndexing = true; boolean activeIndexing = true;
long timeMS = -1; // contains the first time we saw this shard with no operations done on it long timeMS = -1; // contains the first time we saw this shard with no operations done on it
} }

View File

@ -24,13 +24,14 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.ExecutionException;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndexingMemoryControllerIT extends ESIntegTestCase { public class IndexingMemoryControllerIT extends ESIntegTestCase {
@ -77,7 +78,7 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
} }
@Test @Test
public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException { public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException, ExecutionException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms").build()); createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms").build());
@ -86,6 +87,12 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
ensureGreen(); ensureGreen();
final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0);
if (randomBoolean()) {
logger.info("--> indexing some pending operations");
indexRandom(false, client().prepareIndex("test1", "type", "0").setSource("f", "0"));
}
boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) { if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
@ -97,12 +104,15 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) { if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + fail("failed to update shard indexing buffer size due to active state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().config().getIndexingBufferSize().bytes() + "]" shard1.engine().config().getIndexingBufferSize().bytes() + "]"
); );
} }
if (randomBoolean()) {
logger.info("--> flushing translogs");
flush(); // clean translogs flush(); // clean translogs
}
success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) { if (!success) {