put back active/inactive logic, for sync'd flush

This commit is contained in:
Michael McCandless 2015-10-14 06:18:25 -04:00 committed by mikemccand
parent b3357f09fe
commit 6ae8ca9a5e
5 changed files with 77 additions and 9 deletions

View File

@ -189,6 +189,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final IndexSearcherWrapper searcherWrapper; private final IndexSearcherWrapper searcherWrapper;
/** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
* IndexingMemoryController}). */
private final AtomicBoolean active = new AtomicBoolean();
private volatile long lastWriteNS;
private final IndexingMemoryController indexingMemoryController; private final IndexingMemoryController indexingMemoryController;
@Inject @Inject
@ -450,6 +455,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
*/ */
public boolean index(Engine.Index index) { public boolean index(Engine.Index index) {
ensureWriteAllowed(index); ensureWriteAllowed(index);
markLastWrite(index);
index = indexingService.preIndex(index); index = indexingService.preIndex(index);
final boolean created; final boolean created;
try { try {
@ -474,6 +480,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public void delete(Engine.Delete delete) { public void delete(Engine.Delete delete) {
ensureWriteAllowed(delete); ensureWriteAllowed(delete);
markLastWrite(delete);
delete = indexingService.preDelete(delete); delete = indexingService.preDelete(delete);
try { try {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -883,6 +890,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
} }
} }
/** Returns timestamp of last indexing operation */
public long getLastWriteNS() {
return lastWriteNS;
}
/** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
private void markLastWrite(Engine.Operation op) {
lastWriteNS = op.startTime();
if (active.getAndSet(true) == false) {
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
// be low, and it's rare this happens.
indexingMemoryController.forceCheck();
}
}
private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException { private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
Engine.Operation.Origin origin = op.origin(); Engine.Operation.Origin origin = op.origin();
IndexShardState state = this.state; // one time volatile read IndexShardState state = this.state; // one time volatile read
@ -954,6 +977,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return engine.indexBufferRAMBytesUsed(); return engine.indexBufferRAMBytesUsed();
} }
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. */
public void checkIdle(long inactiveTimeNS) {
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
indicesLifecycle.onShardInactive(this);
}
}
}
/** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
* IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */
public boolean getActive() {
return active.get();
}
public final boolean isFlushOnClose() { public final boolean isFlushOnClose() {
return flushOnClose; return flushOnClose;
} }

View File

@ -51,6 +51,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */ /** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size"; public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size";
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
/** How frequently we check indexing memory usage (default: 5 seconds). */ /** How frequently we check indexing memory usage (default: 5 seconds). */
public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval"; public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval";
@ -62,6 +65,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final ByteSizeValue indexingBuffer; private final ByteSizeValue indexingBuffer;
private final TimeValue inactiveTime;
private final TimeValue interval; private final TimeValue interval;
private volatile ScheduledFuture scheduler; private volatile ScheduledFuture scheduler;
@ -101,12 +105,16 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
} }
this.indexingBuffer = indexingBuffer; this.indexingBuffer = indexingBuffer;
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
// we need to have this relatively small to free up heap quickly enough // we need to have this relatively small to free up heap quickly enough
this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(5)); this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(5));
this.statusChecker = new ShardsIndicesStatusChecker(); this.statusChecker = new ShardsIndicesStatusChecker();
logger.debug("using indexing buffer size [{}] with {} [{}]", this.indexingBuffer, SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval); logger.debug("using indexing buffer size [{}] with {} [{}], {} [{}]",
this.indexingBuffer,
SHARD_INACTIVE_TIME_SETTING, this.inactiveTime,
SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
} }
@Override @Override
@ -175,6 +183,15 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return shard != null && shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state()); return shard != null && shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());
} }
/** ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so. returns Boolean.TRUE if
* it did deactive, Boolean.FALSE if it did not, and null if the shard is unknown */
protected void checkIdle(ShardId shardId, long inactiveTimeNS) {
final IndexShard shard = getShard(shardId);
if (shard != null) {
shard.checkIdle(inactiveTimeNS);
}
}
/** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */ /** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */
protected IndexShard getShard(ShardId shardId) { protected IndexShard getShard(ShardId shardId) {
IndexService indexService = indicesService.indexService(shardId.index().name()); IndexService indexService = indicesService.indexService(shardId.index().name());
@ -214,10 +231,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
// Fast check to sum up how much heap all shards' indexing buffers are using now: // Fast check to sum up how much heap all shards' indexing buffers are using now:
long totalBytesUsed = 0; long totalBytesUsed = 0;
for (ShardId shardId : availableShards()) { for (ShardId shardId : availableShards()) {
long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId);
if (shardBytesUsed > 0) { // Give shard a chance to transition to inactive so sync'd flush can happen:
totalBytesUsed += shardBytesUsed; checkIdle(shardId, inactiveTime.nanos());
}
totalBytesUsed += getIndexBufferRAMBytesUsed(shardId);
} }
if (totalBytesUsed > indexingBuffer.bytes()) { if (totalBytesUsed > indexingBuffer.bytes()) {

View File

@ -91,6 +91,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.index.translog.TranslogTests;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -1564,7 +1565,8 @@ public class InternalEngineTests extends ESTestCase {
public void testDeletesAloneCanTriggerRefresh() throws Exception { public void testDeletesAloneCanTriggerRefresh() throws Exception {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put(defaultSettings) .put(defaultSettings)
.put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb").build(); .put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb")
.put(IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING, "100ms").build();
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {

View File

@ -339,6 +339,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
client().prepareIndex("test", "test").setSource("{}").get(); client().prepareIndex("test", "test").setSource("{}").get();
ensureGreen("test"); ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesService indicesService = getInstanceFromNode(IndicesService.class);
// force the shard to become idle now:
indicesService.indexService("test").getShardOrNull(0).checkIdle(0);
assertBusy(new Runnable() { // should be very very quick assertBusy(new Runnable() { // should be very very quick
@Override @Override
public void run() { public void run() {

View File

@ -75,6 +75,10 @@ public class IndexingMemoryControllerTests extends ESTestCase {
} }
} }
@Override
protected void checkIdle(ShardId shardId, long inactiveTimeNS) {
}
@Override @Override
public void refreshShardAsync(ShardId shardId) { public void refreshShardAsync(ShardId shardId) {
indexBufferRAMBytesUsed.put(shardId, 0L); indexBufferRAMBytesUsed.put(shardId, 0L);
@ -104,7 +108,6 @@ public class IndexingMemoryControllerTests extends ESTestCase {
controller.simulateIndexing(shard1); controller.simulateIndexing(shard1);
controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB)); controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB));
// add another shard // add another shard
final ShardId shard2 = new ShardId("test", 2); final ShardId shard2 = new ShardId("test", 2);
controller.simulateIndexing(shard2); controller.simulateIndexing(shard2);
@ -144,7 +147,8 @@ public class IndexingMemoryControllerTests extends ESTestCase {
controller.assertBuffer(shard1, new ByteSizeValue(2, ByteSizeUnit.MB)); controller.assertBuffer(shard1, new ByteSizeValue(2, ByteSizeUnit.MB));
controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB)); controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB));
// index into one shard only, hits the 5mb limit, so shard1 is refreshed // index into one shard only, crosses the 5mb limit, so shard1 is refreshed
controller.simulateIndexing(shard1);
controller.simulateIndexing(shard1); controller.simulateIndexing(shard1);
controller.assertBuffer(shard1, new ByteSizeValue(0, ByteSizeUnit.MB)); controller.assertBuffer(shard1, new ByteSizeValue(0, ByteSizeUnit.MB));
controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB)); controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB));
@ -153,7 +157,8 @@ public class IndexingMemoryControllerTests extends ESTestCase {
controller.simulateIndexing(shard2); controller.simulateIndexing(shard2);
controller.assertBuffer(shard2, new ByteSizeValue(4, ByteSizeUnit.MB)); controller.assertBuffer(shard2, new ByteSizeValue(4, ByteSizeUnit.MB));
controller.simulateIndexing(shard2); controller.simulateIndexing(shard2);
// shard2 used up the full 5 mb and is now cleared: controller.simulateIndexing(shard2);
// shard2 crossed 5 mb and is now cleared:
controller.assertBuffer(shard2, new ByteSizeValue(0, ByteSizeUnit.MB)); controller.assertBuffer(shard2, new ByteSizeValue(0, ByteSizeUnit.MB));
} }