diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java index 25c0a54a09b..68e2aac69be 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java @@ -90,16 +90,16 @@ public class SingleThreadIndexingStress { long time = System.currentTimeMillis(); return jsonBuilder().startObject() .field("id", id) - .field("numeric1", time) - .field("numeric2", time) - .field("numeric3", time) - .field("numeric4", time) - .field("numeric5", time) - .field("numeric6", time) - .field("numeric7", time) - .field("numeric8", time) - .field("numeric9", time) - .field("numeric10", time) +// .field("numeric1", time) +// .field("numeric2", time) +// .field("numeric3", time) +// .field("numeric4", time) +// .field("numeric5", time) +// .field("numeric6", time) +// .field("numeric7", time) +// .field("numeric8", time) +// .field("numeric9", time) +// .field("numeric10", time) .field("name", nameValue) .endObject(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index 1fafa25ce97..09eba85f73e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -152,7 +152,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct if (indexShard.state() == IndexShardState.STARTED) { // shardStatus.estimatedFlushableMemorySize = indexShard.estimateFlushableMemorySize(); shardStatus.translogId = indexShard.translog().currentId(); - shardStatus.translogOperations = indexShard.translog().size(); + shardStatus.translogOperations = indexShard.translog().numberOfOperations(); Engine.Searcher searcher = indexShard.searcher(); try { shardStatus.docs = new DocsStatus(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardManagement.java index eb528e28f4b..ea3bd9128f6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardManagement.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardManagement.java @@ -93,7 +93,7 @@ public class IndexShardManagement extends AbstractIndexShardComponent implements @ManagedAttribute(description = "Number of transaction log operations") public long getTranslogNumberOfOperations() { - return translog.size(); + return translog.numberOfOperations(); } @ManagedAttribute(description = "Estimated size in memory the transaction log takes") diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 98d0a218eb1..813c9561e1d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -48,13 +48,16 @@ public interface Translog extends IndexShardComponent { /** * Returns the number of operations in the transaction log. */ - int size(); + int numberOfOperations(); /** * The estimated memory size this translog is taking. */ long memorySizeInBytes(); + /** + * Returns the size in bytes of the translog. + */ long translogSizeInBytes(); /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java index 8767c39fb87..85ffeed5425 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -21,6 +21,8 @@ package org.elasticsearch.index.translog; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; @@ -47,7 +49,11 @@ public class TranslogService extends AbstractIndexShardComponent { private final Translog translog; - private final int flushThreshold; + private final int flushThresholdOperations; + + private final ByteSizeValue flushThresholdSize; + + private final TimeValue flushThresholdPeriod; private final TimeValue interval; @@ -59,7 +65,9 @@ public class TranslogService extends AbstractIndexShardComponent { this.indexShard = indexShard; this.translog = translog; - this.flushThreshold = componentSettings.getAsInt("flush_threshold", 5000); + this.flushThresholdOperations = componentSettings.getAsInt("flush_threshold_ops", componentSettings.getAsInt("flush_threshold", 20000)); + this.flushThresholdSize = componentSettings.getAsBytesSize("flush_threshold_size", new ByteSizeValue(500, ByteSizeUnit.MB)); + this.flushThresholdPeriod = componentSettings.getAsTime("flush_threshold_period", TimeValue.timeValueMinutes(60)); this.interval = componentSettings.getAsTime("interval", timeValueMillis(1000)); this.future = threadPool.scheduleWithFixedDelay(new TranslogBasedFlush(), interval); @@ -71,24 +79,46 @@ public class TranslogService extends AbstractIndexShardComponent { } private class TranslogBasedFlush implements Runnable { + + private volatile long lastFlushTime = System.currentTimeMillis(); + @Override public void run() { if (indexShard.state() != IndexShardState.STARTED) { return; } - int currentSize = translog.size(); - if (currentSize > flushThreshold) { - logger.trace("flushing translog, operations [{}], breached [{}]", currentSize, flushThreshold); - try { - indexShard.flush(new Engine.Flush()); - } catch (EngineClosedException e) { - // we are being closed, ignore - } catch (FlushNotAllowedEngineException e) { - // ignore this exception, we are not allowed to perform flush - } catch (Exception e) { - logger.warn("failed to flush shard on translog threshold", e); - } + int currentNumberOfOperations = translog.numberOfOperations(); + if (currentNumberOfOperations > flushThresholdOperations) { + logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations); + flush(); + return; } + + long sizeInBytes = translog.translogSizeInBytes(); + if (sizeInBytes > flushThresholdSize.bytes()) { + logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize); + flush(); + return; + } + + if ((System.currentTimeMillis() - lastFlushTime) > flushThresholdPeriod.millis()) { + logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod); + flush(); + return; + } + } + + private void flush() { + try { + indexShard.flush(new Engine.Flush()); + } catch (EngineClosedException e) { + // we are being closed, ignore + } catch (FlushNotAllowedEngineException e) { + // ignore this exception, we are not allowed to perform flush + } catch (Exception e) { + logger.warn("failed to flush shard on translog threshold", e); + } + lastFlushTime = System.currentTimeMillis(); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 7634e091c06..ce0a70ade30 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -85,7 +85,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog return this.id; } - @Override public int size() { + @Override public int numberOfOperations() { return operationCounter.get(); } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/FullRollingRestartTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/FullRollingRestartTests.java index f9e73d0152b..9a857e83143 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/FullRollingRestartTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/FullRollingRestartTests.java @@ -56,14 +56,14 @@ public class FullRollingRestartTests extends AbstractNodesTests { startNode("node3"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false)); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false)); // now start adding nodes startNode("node4"); startNode("node5"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("5").execute().actionGet().timedOut(), equalTo(false)); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("5").execute().actionGet().timedOut(), equalTo(false)); client("node1").admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { @@ -73,10 +73,10 @@ public class FullRollingRestartTests extends AbstractNodesTests { // now start shutting nodes down closeNode("node1"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false)); + assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false)); closeNode("node2"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false)); + assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false)); client("node5").admin().indices().prepareRefresh().execute().actionGet(); @@ -86,11 +86,11 @@ public class FullRollingRestartTests extends AbstractNodesTests { closeNode("node3"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false)); + assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false)); closeNode("node4"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false)); + assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForRelocatingShards(0).setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false)); client("node5").admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) {