only actually fork to another thread if a flush is really needed

This commit is contained in:
kimchy 2011-02-11 02:35:02 +02:00
parent fc88cccfb4
commit e63e5f232c
2 changed files with 29 additions and 18 deletions

View File

@ -63,6 +63,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
TimeValue sync = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(1)); TimeValue sync = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(1));
if (sync.millis() > 0) { if (sync.millis() > 0) {
this.indexShard.translog().syncOnEachOperation(false); this.indexShard.translog().syncOnEachOperation(false);
// we don't need to execute the sync on a different thread, just do it on the scheduler thread
flushScheduler = threadPool.scheduleWithFixedDelay(new Sync(), sync); flushScheduler = threadPool.scheduleWithFixedDelay(new Sync(), sync);
} else if (sync.millis() == 0) { } else if (sync.millis() == 0) {
flushScheduler = null; flushScheduler = null;

View File

@ -68,9 +68,11 @@ public class TranslogService extends AbstractIndexShardComponent {
this.flushThresholdOperations = componentSettings.getAsInt("flush_threshold_ops", componentSettings.getAsInt("flush_threshold", 20000)); this.flushThresholdOperations = componentSettings.getAsInt("flush_threshold_ops", componentSettings.getAsInt("flush_threshold", 20000));
this.flushThresholdSize = componentSettings.getAsBytesSize("flush_threshold_size", new ByteSizeValue(500, ByteSizeUnit.MB)); this.flushThresholdSize = componentSettings.getAsBytesSize("flush_threshold_size", new ByteSizeValue(500, ByteSizeUnit.MB));
this.flushThresholdPeriod = componentSettings.getAsTime("flush_threshold_period", TimeValue.timeValueMinutes(60)); this.flushThresholdPeriod = componentSettings.getAsTime("flush_threshold_period", TimeValue.timeValueMinutes(60));
this.interval = componentSettings.getAsTime("interval", timeValueMillis(1000)); this.interval = componentSettings.getAsTime("interval", timeValueMillis(5000));
this.future = threadPool.schedule(new TranslogBasedFlush(), interval, ThreadPool.ExecutionType.THREADED); logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);
this.future = threadPool.schedule(new TranslogBasedFlush(), interval, ThreadPool.ExecutionType.DEFAULT);
} }
@ -83,34 +85,36 @@ public class TranslogService extends AbstractIndexShardComponent {
private volatile long lastFlushTime = System.currentTimeMillis(); private volatile long lastFlushTime = System.currentTimeMillis();
@Override public void run() { @Override public void run() {
if (indexShard.state() != IndexShardState.STARTED) { if (indexShard.state() == IndexShardState.CLOSED) {
return; return;
} }
int currentNumberOfOperations = translog.numberOfOperations(); int currentNumberOfOperations = translog.numberOfOperations();
if (currentNumberOfOperations > flushThresholdOperations) { if (currentNumberOfOperations > flushThresholdOperations) {
logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations); logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations);
flush(); asyncFlushAndReschedule();
return; return;
} }
long sizeInBytes = translog.translogSizeInBytes(); long sizeInBytes = translog.translogSizeInBytes();
if (sizeInBytes > flushThresholdSize.bytes()) { if (sizeInBytes > flushThresholdSize.bytes()) {
logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize); logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize);
flush(); asyncFlushAndReschedule();
return; return;
} }
if ((System.currentTimeMillis() - lastFlushTime) > flushThresholdPeriod.millis()) { if ((System.currentTimeMillis() - lastFlushTime) > flushThresholdPeriod.millis()) {
logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod); logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod);
flush(); asyncFlushAndReschedule();
return; return;
} }
future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.THREADED); future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.DEFAULT);
} }
private void flush() { private void asyncFlushAndReschedule() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try { try {
indexShard.flush(new Engine.Flush()); indexShard.flush(new Engine.Flush());
} catch (EngineClosedException e) { } catch (EngineClosedException e) {
@ -121,6 +125,12 @@ public class TranslogService extends AbstractIndexShardComponent {
logger.warn("failed to flush shard on translog threshold", e); logger.warn("failed to flush shard on translog threshold", e);
} }
lastFlushTime = System.currentTimeMillis(); lastFlushTime = System.currentTimeMillis();
if (indexShard.state() != IndexShardState.CLOSED) {
future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.DEFAULT);
}
}
});
} }
} }
} }