Translog Flushing: Improve logic, flush not just by operations in the translog, closes #656.

This commit is contained in:
kimchy 2011-01-30 17:41:22 +02:00
parent 8884b575c1
commit 5b4846b0b6
7 changed files with 67 additions and 34 deletions

View File

@ -90,16 +90,16 @@ public class SingleThreadIndexingStress {
long time = System.currentTimeMillis();
return jsonBuilder().startObject()
.field("id", id)
.field("numeric1", time)
.field("numeric2", time)
.field("numeric3", time)
.field("numeric4", time)
.field("numeric5", time)
.field("numeric6", time)
.field("numeric7", time)
.field("numeric8", time)
.field("numeric9", time)
.field("numeric10", time)
// .field("numeric1", time)
// .field("numeric2", time)
// .field("numeric3", time)
// .field("numeric4", time)
// .field("numeric5", time)
// .field("numeric6", time)
// .field("numeric7", time)
// .field("numeric8", time)
// .field("numeric9", time)
// .field("numeric10", time)
.field("name", nameValue)
.endObject();
}

View File

@ -152,7 +152,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
if (indexShard.state() == IndexShardState.STARTED) {
// shardStatus.estimatedFlushableMemorySize = indexShard.estimateFlushableMemorySize();
shardStatus.translogId = indexShard.translog().currentId();
shardStatus.translogOperations = indexShard.translog().size();
shardStatus.translogOperations = indexShard.translog().numberOfOperations();
Engine.Searcher searcher = indexShard.searcher();
try {
shardStatus.docs = new DocsStatus();

View File

@ -93,7 +93,7 @@ public class IndexShardManagement extends AbstractIndexShardComponent implements
@ManagedAttribute(description = "Number of transaction log operations")
public long getTranslogNumberOfOperations() {
return translog.size();
return translog.numberOfOperations();
}
@ManagedAttribute(description = "Estimated size in memory the transaction log takes")

View File

@ -48,13 +48,16 @@ public interface Translog extends IndexShardComponent {
/**
* Returns the number of operations in the transaction log.
*/
int size();
int numberOfOperations();
/**
* The estimated memory size this translog is taking.
*/
long memorySizeInBytes();
/**
* Returns the size in bytes of the translog.
*/
long translogSizeInBytes();
/**

View File

@ -21,6 +21,8 @@ package org.elasticsearch.index.translog;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
@ -47,7 +49,11 @@ public class TranslogService extends AbstractIndexShardComponent {
private final Translog translog;
private final int flushThreshold;
private final int flushThresholdOperations;
private final ByteSizeValue flushThresholdSize;
private final TimeValue flushThresholdPeriod;
private final TimeValue interval;
@ -59,7 +65,9 @@ public class TranslogService extends AbstractIndexShardComponent {
this.indexShard = indexShard;
this.translog = translog;
this.flushThreshold = componentSettings.getAsInt("flush_threshold", 5000);
this.flushThresholdOperations = componentSettings.getAsInt("flush_threshold_ops", componentSettings.getAsInt("flush_threshold", 20000));
this.flushThresholdSize = componentSettings.getAsBytesSize("flush_threshold_size", new ByteSizeValue(500, ByteSizeUnit.MB));
this.flushThresholdPeriod = componentSettings.getAsTime("flush_threshold_period", TimeValue.timeValueMinutes(60));
this.interval = componentSettings.getAsTime("interval", timeValueMillis(1000));
this.future = threadPool.scheduleWithFixedDelay(new TranslogBasedFlush(), interval);
@ -71,24 +79,46 @@ public class TranslogService extends AbstractIndexShardComponent {
}
private class TranslogBasedFlush implements Runnable {
private volatile long lastFlushTime = System.currentTimeMillis();
@Override public void run() {
if (indexShard.state() != IndexShardState.STARTED) {
return;
}
int currentSize = translog.size();
if (currentSize > flushThreshold) {
logger.trace("flushing translog, operations [{}], breached [{}]", currentSize, flushThreshold);
try {
indexShard.flush(new Engine.Flush());
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
} catch (Exception e) {
logger.warn("failed to flush shard on translog threshold", e);
}
int currentNumberOfOperations = translog.numberOfOperations();
if (currentNumberOfOperations > flushThresholdOperations) {
logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations);
flush();
return;
}
long sizeInBytes = translog.translogSizeInBytes();
if (sizeInBytes > flushThresholdSize.bytes()) {
logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize);
flush();
return;
}
if ((System.currentTimeMillis() - lastFlushTime) > flushThresholdPeriod.millis()) {
logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod);
flush();
return;
}
}
private void flush() {
try {
indexShard.flush(new Engine.Flush());
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
} catch (Exception e) {
logger.warn("failed to flush shard on translog threshold", e);
}
lastFlushTime = System.currentTimeMillis();
}
}
}

View File

@ -85,7 +85,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
return this.id;
}
@Override public int size() {
@Override public int numberOfOperations() {
return operationCounter.get();
}

View File

@ -56,14 +56,14 @@ public class FullRollingRestartTests extends AbstractNodesTests {
startNode("node3");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false));
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false));
// now start adding nodes
startNode("node4");
startNode("node5");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("5").execute().actionGet().timedOut(), equalTo(false));
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("5").execute().actionGet().timedOut(), equalTo(false));
client("node1").admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
@ -73,10 +73,10 @@ public class FullRollingRestartTests extends AbstractNodesTests {
// now start shutting nodes down
closeNode("node1");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false));
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false));
closeNode("node2");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false));
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false));
client("node5").admin().indices().prepareRefresh().execute().actionGet();
@ -86,11 +86,11 @@ public class FullRollingRestartTests extends AbstractNodesTests {
closeNode("node3");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false));
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false));
closeNode("node4");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false));
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForRelocatingShards(0).setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false));
client("node5").admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {