Move inactiveTime into IndexShard out of IndexingMemoryController

This commit is contained in:
Simon Willnauer 2015-11-04 21:47:21 +01:00
parent 7d77f182e2
commit a4a0390ff6
7 changed files with 42 additions and 44 deletions

View File

@ -146,6 +146,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
"index.translog.flush_threshold_period",
"index.translog.interval",
"index.translog.sync_interval",
"index.shard.inactive_time",
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING));
/**

View File

@ -231,7 +231,7 @@ public class TimeValue implements Streamable {
public static TimeValue parseTimeValue(String sValue, TimeValue defaultValue, String settingName) {
settingName = Objects.requireNonNull(settingName);
assert settingName.startsWith("index.") == false || MetaDataIndexUpgradeService.INDEX_TIME_SETTINGS.contains(settingName);
assert settingName.startsWith("index.") == false || MetaDataIndexUpgradeService.INDEX_TIME_SETTINGS.contains(settingName) : settingName;
if (sValue == null) {
return defaultValue;
}

View File

@ -184,7 +184,9 @@ public class IndexShard extends AbstractIndexShardComponent {
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
public static final String INDEX_SHARD_INACTIVE_TIME_SETTING = "index.shard.inactive_time";
private static final String INDICES_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
private final ShardPath path;
@ -193,6 +195,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
private final IndexSearcherWrapper searcherWrapper;
private final TimeValue inactiveTime;
/** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
* IndexingMemoryController}). */
@ -203,6 +206,7 @@ public class IndexShard extends AbstractIndexShardComponent {
@Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider) {
super(shardId, indexSettings);
this.inactiveTime = this.indexSettings.getAsTime(INDEX_SHARD_INACTIVE_TIME_SETTING, this.indexSettings.getAsTime(INDICES_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)));
this.idxSettings = indexSettings;
this.codecService = new CodecService(mapperService, logger);
this.warmer = provider.getWarmer();
@ -915,9 +919,6 @@ public class IndexShard extends AbstractIndexShardComponent {
/** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
private void markLastWrite() {
if (active.getAndSet(true) == false) {
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
// be low, and it's rare this happens.
indexEventListener.onShardActive(this);
}
}
@ -1032,9 +1033,9 @@ public class IndexShard extends AbstractIndexShardComponent {
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
* if the shard is inactive. */
public boolean checkIdle(long inactiveTimeNS) {
public boolean checkIdle() {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTime.nanos()) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
@ -1047,7 +1048,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
/** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
* IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */
* IndexShard#INDEX_SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */
public boolean getActive() {
return active.get();
}
@ -1236,6 +1237,10 @@ public class IndexShard extends AbstractIndexShardComponent {
return indexEventListener;
}
public TimeValue getInactiveTime() {
return inactiveTime;
}
class EngineRefresher implements Runnable {
@Override
public void run() {
@ -1465,7 +1470,7 @@ public class IndexShard extends AbstractIndexShardComponent {
final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel);
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, indexSettings.getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)));
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, inactiveTime);
}
private static class IndexShardOperationCounter extends AbstractRefCounted {

View File

@ -73,9 +73,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** Sets a ceiling on the per-shard translog buffer size (default: 64 KB). */
public static final String MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_shard_translog_buffer_size";
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
/** How frequently we check shards to find inactive ones (default: 30 seconds). */
public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval";
@ -96,7 +93,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final ByteSizeValue minShardTranslogBufferSize;
private final ByteSizeValue maxShardTranslogBufferSize;
private final TimeValue inactiveTime;
private final TimeValue interval;
private volatile ScheduledFuture scheduler;
@ -160,17 +156,15 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
this.minShardTranslogBufferSize = this.settings.getAsBytesSize(MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(2, ByteSizeUnit.KB));
this.maxShardTranslogBufferSize = this.settings.getAsBytesSize(MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(64, ByteSizeUnit.KB));
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
// we need to have this relatively small to move a shard from inactive to active fast (enough)
this.interval = this.settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30));
this.statusChecker = new ShardsIndicesStatusChecker();
logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]",
logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}]",
this.indexingBuffer,
MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize,
SHARD_INACTIVE_TIME_SETTING, this.inactiveTime,
SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval);
}
@ -304,7 +298,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
// Was the shard active last time we checked?
Boolean wasActive = shardWasActive.get(shardId);
if (wasActive == null) {
// First time we are seeing this shard
shardWasActive.put(shardId, isActive);
@ -316,12 +309,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
logger.debug("marking shard {} as active indexing wise", shardId);
shardWasActive.put(shardId, true);
} else if (checkIdle(shardId, inactiveTime.nanos()) == Boolean.TRUE) {
} else if (checkIdle(shardId) == Boolean.TRUE) {
// Make shard inactive now
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
shardId,
inactiveTime);
shardWasActive.put(shardId, false);
}
}
@ -398,12 +389,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so. returns Boolean.TRUE if
* it did deactive, Boolean.FALSE if it did not, and null if the shard is unknown */
protected Boolean checkIdle(ShardId shardId, long inactiveTimeNS) {
String ignoreReason = null;
protected Boolean checkIdle(ShardId shardId) {
final String ignoreReason;
final IndexShard shard = getShard(shardId);
if (shard != null) {
try {
return shard.checkIdle(inactiveTimeNS);
if (shard.checkIdle()) {
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
shardId,
shard.getInactiveTime());
}
return Boolean.TRUE;
} catch (EngineClosedException e) {
// ignore
ignoreReason = "EngineClosedException";
@ -424,12 +420,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
}
public TimeValue getInactiveTime() {
return inactiveTime;
}
@Override
public void onShardActive(IndexShard indexShard) {
// At least on shard used to be inactive ie. a new write operation just showed up.
// We try to fix the shards indexing buffer immediately. We could do this async instead, but cost should
// be low, and it's rare this happens.
forceCheck();
}
}

View File

@ -331,18 +331,15 @@ public class IndexShardTests extends ESSingleNodeTestCase {
public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "0s"));
client().prepareIndex("test", "test").setSource("{}").get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
Boolean result = indicesService.indexService("test").getShardOrNull(0).checkIdle(0);
Boolean result = indicesService.indexService("test").getShardOrNull(0).checkIdle();
assertEquals(Boolean.TRUE, result);
assertBusy(new Runnable() { // should be very very quick
@Override
public void run() {
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
assertBusy(() -> {
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
});
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));

View File

@ -36,7 +36,7 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
}
public void testIndexBufferPushedToEngine() throws InterruptedException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100000h",
createNode(Settings.builder().put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "100000h",
IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb",
IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());
@ -65,7 +65,7 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
}
public void testInactivePushedToShard() throws InterruptedException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms",
createNode(Settings.builder().put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "100ms",
IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms",
IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());

View File

@ -22,9 +22,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
@ -54,7 +53,7 @@ public class IndexingMemoryControllerTests extends ESTestCase {
public MockController(Settings settings) {
super(Settings.builder()
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
.put(SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate
.put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate
.put(settings)
.build(),
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
@ -102,11 +101,12 @@ public class IndexingMemoryControllerTests extends ESTestCase {
}
@Override
protected Boolean checkIdle(ShardId shardId, long inactiveTimeNS) {
protected Boolean checkIdle(ShardId shardId) {
final TimeValue inactiveTime = settings.getAsTime(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
Long ns = lastIndexTimeNanos.get(shardId);
if (ns == null) {
return null;
} else if (currentTimeInNanos() - ns >= inactiveTimeNS) {
} else if (currentTimeInNanos() - ns >= inactiveTime.nanos()) {
indexingBuffers.put(shardId, INACTIVE);
translogBuffers.put(shardId, INACTIVE);
activeShards.remove(shardId);
@ -165,7 +165,7 @@ public class IndexingMemoryControllerTests extends ESTestCase {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb")
.put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "5s")
.put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s")
.build());
final ShardId shard1 = new ShardId("test", 1);