Translog Flush: When disabling flush and enabling it again, scheduled flush stops executing, closes #1727.

This commit is contained in:
Shay Banon 2012-02-22 15:00:58 +02:00
parent 31b793591e
commit 0ef2afb855

View File

@ -26,11 +26,11 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
@ -138,30 +138,48 @@ public class TranslogService extends AbstractIndexShardComponent {
if (indexShard.state() == IndexShardState.CLOSED) {
return;
}
// flush is disabled, but still reschedule
if (disableFlush) {
reschedule();
return;
}
int currentNumberOfOperations = translog.estimatedNumberOfOperations();
if (currentNumberOfOperations > flushThresholdOperations) {
logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations);
asyncFlushAndReschedule();
if (indexShard.state() == IndexShardState.CREATED) {
reschedule();
return;
}
long sizeInBytes = translog.translogSizeInBytes();
if (sizeInBytes > flushThresholdSize.bytes()) {
logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize);
asyncFlushAndReschedule();
return;
if (flushThresholdOperations > 0) {
int currentNumberOfOperations = translog.estimatedNumberOfOperations();
if (currentNumberOfOperations > flushThresholdOperations) {
logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations);
asyncFlushAndReschedule();
return;
}
}
if ((System.currentTimeMillis() - lastFlushTime) > flushThresholdPeriod.millis()) {
logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod);
asyncFlushAndReschedule();
return;
if (flushThresholdSize.bytes() > 0) {
long sizeInBytes = translog.translogSizeInBytes();
if (sizeInBytes > flushThresholdSize.bytes()) {
logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize);
asyncFlushAndReschedule();
return;
}
}
if (flushThresholdPeriod.millis() > 0) {
if ((threadPool.estimatedTimeInMillis() - lastFlushTime) > flushThresholdPeriod.millis()) {
logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod);
asyncFlushAndReschedule();
return;
}
}
reschedule();
}
private void reschedule() {
future = threadPool.schedule(interval, ThreadPool.Names.SAME, this);
}
@ -171,14 +189,14 @@ public class TranslogService extends AbstractIndexShardComponent {
public void run() {
try {
indexShard.flush(new Engine.Flush());
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (IllegalIndexShardStateException e) {
// we are being closed, or in created state, ignore
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
} catch (Exception e) {
logger.warn("failed to flush shard on translog threshold", e);
}
lastFlushTime = System.currentTimeMillis();
lastFlushTime = threadPool.estimatedTimeInMillis();
if (indexShard.state() != IndexShardState.CLOSED) {
future = threadPool.schedule(interval, ThreadPool.Names.SAME, TranslogBasedFlush.this);