when dynamically updating the shard indexing buffer size, ignore cases where it can't be updated because flushing is disabled

This commit is contained in:
Shay Banon 2011-08-09 19:22:47 +03:00
parent 56b11c5ff3
commit ee015f5829
1 changed files with 23 additions and 3 deletions

View File

@ -28,6 +28,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
@ -124,9 +126,9 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
@Override public void run() { @Override public void run() {
synchronized (mutex) { synchronized (mutex) {
boolean activeInactiveStatusChanges = false; boolean activeInactiveStatusChanges = false;
long time = System.currentTimeMillis();
for (IndexService indexService : indicesService) { for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
long time = threadPool.estimatedTimeInMillis();
Translog translog = ((InternalIndexShard) indexShard).translog(); Translog translog = ((InternalIndexShard) indexShard).translog();
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null) { // not added yet if (status == null) { // not added yet
@ -140,11 +142,19 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
// inactive? // inactive?
if (!status.inactive) { if (!status.inactive) {
if ((time - status.time) > inactiveTime.millis()) { if ((time - status.time) > inactiveTime.millis()) {
try {
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
} catch (EngineClosedException e) {
// ignore
continue;
} catch (FlushNotAllowedEngineException e) {
// ignore
continue;
}
// inactive for this amount of time, mark it // inactive for this amount of time, mark it
status.inactive = true; status.inactive = true;
activeInactiveStatusChanges = true; activeInactiveStatusChanges = true;
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]), setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER); logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]), setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
} }
} }
} else { } else {
@ -204,7 +214,17 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null || !status.inactive) { if (status == null || !status.inactive) {
try {
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize); ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
} catch (EngineClosedException e) {
// ignore
continue;
} catch (FlushNotAllowedEngineException e) {
// ignore
continue;
} catch (Exception e) {
logger.warn("failed to set shard [{}][{}] index buffer to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), shardIndexingBufferSize);
}
} }
} }
} }