Shared Storage Gateway (fs/s3): Wrong snapshotting of transaction log, closes #375.
This commit is contained in:
parent
a2011e0151
commit
2372f481aa
|
@ -70,16 +70,14 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
|
|||
|
||||
private final long lastIndexVersion;
|
||||
private final long lastTranslogId;
|
||||
private final long lastTranslogPosition;
|
||||
private final long lastTranslogLength;
|
||||
private final int lastTotalTranslogOperations;
|
||||
|
||||
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogPosition, long lastTranslogLength, int lastTotalTranslogOperations) {
|
||||
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogLength, int lastTotalTranslogOperations) {
|
||||
this.indexCommit = indexCommit;
|
||||
this.translogSnapshot = translogSnapshot;
|
||||
this.lastIndexVersion = lastIndexVersion;
|
||||
this.lastTranslogId = lastTranslogId;
|
||||
this.lastTranslogPosition = lastTranslogPosition;
|
||||
this.lastTranslogLength = lastTranslogLength;
|
||||
this.lastTotalTranslogOperations = lastTotalTranslogOperations;
|
||||
}
|
||||
|
@ -127,10 +125,6 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
|
|||
return lastTranslogId;
|
||||
}
|
||||
|
||||
public long lastTranslogPosition() {
|
||||
return lastTranslogPosition;
|
||||
}
|
||||
|
||||
public long lastTranslogLength() {
|
||||
return lastTranslogLength;
|
||||
}
|
||||
|
|
|
@ -60,8 +60,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
|||
|
||||
private volatile long lastTranslogId = -1;
|
||||
|
||||
private volatile long lastTranslogPosition;
|
||||
|
||||
private volatile int lastTotalTranslogOperations;
|
||||
|
||||
private volatile long lastTranslogLength;
|
||||
|
@ -150,7 +148,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
|||
|
||||
lastIndexVersion = recoveryStatus.index().version();
|
||||
lastTranslogId = -1;
|
||||
lastTranslogPosition = 0;
|
||||
lastTranslogLength = 0;
|
||||
lastTotalTranslogOperations = recoveryStatus.translog().currentTranslogOperations();
|
||||
|
||||
|
@ -230,11 +227,10 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
|||
|
||||
logger.debug("snapshot ({}) to {} ...", reason, shardGateway);
|
||||
SnapshotStatus snapshotStatus =
|
||||
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength, lastTotalTranslogOperations));
|
||||
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogLength, lastTotalTranslogOperations));
|
||||
|
||||
lastIndexVersion = snapshotIndexCommit.getVersion();
|
||||
lastTranslogId = translogSnapshot.translogId();
|
||||
lastTranslogPosition = translogSnapshot.position();
|
||||
lastTranslogLength = translogSnapshot.length();
|
||||
lastTotalTranslogOperations = translogSnapshot.totalOperations();
|
||||
return snapshotStatus;
|
||||
|
@ -247,7 +243,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
|||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n");
|
||||
sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.index().time())).append("]\n");
|
||||
sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [" + snapshotStatus.translog().expectedNumberOfOperations() + "], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]");
|
||||
sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [").append(snapshotStatus.translog().expectedNumberOfOperations()).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]");
|
||||
logger.debug(sb.toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -257,7 +257,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
|
|||
if (allTranslogFilesExists) {
|
||||
translogCommitPointFiles.addAll(commitPoint.translogFiles());
|
||||
if (snapshot.sameTranslogNewOperations()) {
|
||||
translogSnapshot.seekForward(snapshot.lastTranslogPosition());
|
||||
translogSnapshot.seekForward(snapshot.lastTranslogLength());
|
||||
if (translogSnapshot.lengthInBytes() > 0) {
|
||||
snapshotRequired = true;
|
||||
expectedNumberOfOperations = translogSnapshot.totalOperations() - snapshot.lastTotalTranslogOperations();
|
||||
|
|
|
@ -249,6 +249,11 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
|
|||
client("server1").prepareIndex("test", "type1", Long.toString(i))
|
||||
.setCreate(true) // make sure we use create, so if we recover wrongly, we will get increments...
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map()).execute().actionGet();
|
||||
|
||||
// snapshot every 100 so we get some actions going on in the gateway
|
||||
if ((i % 100) == 0) {
|
||||
client("server1").admin().indices().prepareGatewaySnapshot().execute().actionGet();
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("--> refreshing and checking count");
|
||||
|
|
Loading…
Reference in New Issue