diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 73451f25fe3..4760ae197b2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -63,6 +63,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen TimeValue sync = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(1)); if (sync.millis() > 0) { this.indexShard.translog().syncOnEachOperation(false); + // we don't need to execute the sync on a different thread, just do it on the scheduler thread flushScheduler = threadPool.scheduleWithFixedDelay(new Sync(), sync); } else if (sync.millis() == 0) { flushScheduler = null; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java index f66e0785efd..e5711332ccb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -68,9 +68,11 @@ public class TranslogService extends AbstractIndexShardComponent { this.flushThresholdOperations = componentSettings.getAsInt("flush_threshold_ops", componentSettings.getAsInt("flush_threshold", 20000)); this.flushThresholdSize = componentSettings.getAsBytesSize("flush_threshold_size", new ByteSizeValue(500, ByteSizeUnit.MB)); this.flushThresholdPeriod = componentSettings.getAsTime("flush_threshold_period", TimeValue.timeValueMinutes(60)); - this.interval = componentSettings.getAsTime("interval", timeValueMillis(1000)); + this.interval = componentSettings.getAsTime("interval", timeValueMillis(5000)); - this.future = threadPool.schedule(new TranslogBasedFlush(), interval, ThreadPool.ExecutionType.THREADED); + logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod); + + this.future = threadPool.schedule(new TranslogBasedFlush(), interval, ThreadPool.ExecutionType.DEFAULT); } @@ -83,44 +85,52 @@ public class TranslogService extends AbstractIndexShardComponent { private volatile long lastFlushTime = System.currentTimeMillis(); @Override public void run() { - if (indexShard.state() != IndexShardState.STARTED) { + if (indexShard.state() == IndexShardState.CLOSED) { return; } int currentNumberOfOperations = translog.numberOfOperations(); if (currentNumberOfOperations > flushThresholdOperations) { logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations); - flush(); + asyncFlushAndReschedule(); return; } long sizeInBytes = translog.translogSizeInBytes(); if (sizeInBytes > flushThresholdSize.bytes()) { logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize); - flush(); + asyncFlushAndReschedule(); return; } if ((System.currentTimeMillis() - lastFlushTime) > flushThresholdPeriod.millis()) { logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod); - flush(); + asyncFlushAndReschedule(); return; } - future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.THREADED); + future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.DEFAULT); } - private void flush() { - try { - indexShard.flush(new Engine.Flush()); - } catch (EngineClosedException e) { - // we are being closed, ignore - } catch (FlushNotAllowedEngineException e) { - // ignore this exception, we are not allowed to perform flush - } catch (Exception e) { - logger.warn("failed to flush shard on translog threshold", e); - } - lastFlushTime = System.currentTimeMillis(); + private void asyncFlushAndReschedule() { + threadPool.cached().execute(new Runnable() { + @Override public void run() { + try { + indexShard.flush(new Engine.Flush()); + } catch (EngineClosedException e) { + // we are being closed, ignore + } catch (FlushNotAllowedEngineException e) { + // ignore this exception, we are not allowed to perform flush + } catch (Exception e) { + logger.warn("failed to flush shard on translog threshold", e); + } + lastFlushTime = System.currentTimeMillis(); + + if (indexShard.state() != IndexShardState.CLOSED) { + future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.DEFAULT); + } + } + }); } } }