diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/src/main/java/org/elasticsearch/index/translog/TranslogService.java index bc1cad9528b..ac0b817b666 100644 --- a/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -26,11 +26,11 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; @@ -138,30 +138,48 @@ public class TranslogService extends AbstractIndexShardComponent { if (indexShard.state() == IndexShardState.CLOSED) { return; } + + // flush is disabled, but still reschedule if (disableFlush) { + reschedule(); return; } - int currentNumberOfOperations = translog.estimatedNumberOfOperations(); - if (currentNumberOfOperations > flushThresholdOperations) { - logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations); - asyncFlushAndReschedule(); + if (indexShard.state() == IndexShardState.CREATED) { + reschedule(); return; } - long sizeInBytes = translog.translogSizeInBytes(); - if (sizeInBytes > flushThresholdSize.bytes()) { - logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize); - asyncFlushAndReschedule(); - return; + if (flushThresholdOperations > 0) { + int currentNumberOfOperations = translog.estimatedNumberOfOperations(); + if (currentNumberOfOperations > flushThresholdOperations) { + logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations); + asyncFlushAndReschedule(); + return; + } } - if ((System.currentTimeMillis() - lastFlushTime) > flushThresholdPeriod.millis()) { - logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod); - asyncFlushAndReschedule(); - return; + if (flushThresholdSize.bytes() > 0) { + long sizeInBytes = translog.translogSizeInBytes(); + if (sizeInBytes > flushThresholdSize.bytes()) { + logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize); + asyncFlushAndReschedule(); + return; + } } + if (flushThresholdPeriod.millis() > 0) { + if ((threadPool.estimatedTimeInMillis() - lastFlushTime) > flushThresholdPeriod.millis()) { + logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod); + asyncFlushAndReschedule(); + return; + } + } + + reschedule(); + } + + private void reschedule() { future = threadPool.schedule(interval, ThreadPool.Names.SAME, this); } @@ -171,14 +189,14 @@ public class TranslogService extends AbstractIndexShardComponent { public void run() { try { indexShard.flush(new Engine.Flush()); - } catch (EngineClosedException e) { - // we are being closed, ignore + } catch (IllegalIndexShardStateException e) { + // we are being closed, or in created state, ignore } catch (FlushNotAllowedEngineException e) { // ignore this exception, we are not allowed to perform flush } catch (Exception e) { logger.warn("failed to flush shard on translog threshold", e); } - lastFlushTime = System.currentTimeMillis(); + lastFlushTime = threadPool.estimatedTimeInMillis(); if (indexShard.state() != IndexShardState.CLOSED) { future = threadPool.schedule(interval, ThreadPool.Names.SAME, TranslogBasedFlush.this);