initial chunk of work on refactoring the gateway, syntatic gateway files names, commit points that include translog information

This commit is contained in:
kimchy 2010-08-23 14:48:49 +03:00
parent 1517fa3d28
commit 5bd37f6f47
15 changed files with 118 additions and 20 deletions

View File

@ -72,11 +72,14 @@ public class GatewaySnapshotStatus {
final long indexSize;
public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize) {
final int expectedNumberOfOperations;
public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize, int expectedNumberOfOperations) {
this.stage = stage;
this.startTime = startTime;
this.time = time;
this.indexSize = indexSize;
this.expectedNumberOfOperations = expectedNumberOfOperations;
}
public Stage stage() {
@ -110,4 +113,12 @@ public class GatewaySnapshotStatus {
public ByteSizeValue getIndexSize() {
return indexSize();
}
public int expectedNumberOfOperations() {
return expectedNumberOfOperations;
}
public int getExpectedNumberOfOperations() {
return expectedNumberOfOperations();
}
}

View File

@ -254,6 +254,7 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeVLong(gatewaySnapshotStatus.startTime);
out.writeVLong(gatewaySnapshotStatus.time);
out.writeVLong(gatewaySnapshotStatus.indexSize);
out.writeVInt(gatewaySnapshotStatus.expectedNumberOfOperations());
}
}
@ -284,7 +285,7 @@ public class ShardStatus extends BroadcastShardOperationResponse {
if (in.readBoolean()) {
gatewaySnapshotStatus = new GatewaySnapshotStatus(GatewaySnapshotStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong());
in.readVLong(), in.readVLong(), in.readVLong(), in.readVInt());
}
}
}

View File

@ -239,7 +239,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
break;
}
shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(),
snapshotStatus.index().totalSize());
snapshotStatus.index().totalSize(), snapshotStatus.translog().expectedNumberOfOperations());
}
return shardStatus;

View File

@ -72,14 +72,16 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
private final long lastTranslogId;
private final long lastTranslogPosition;
private final long lastTranslogLength;
private final int lastTotalTranslogOperations;
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogPosition, long lastTranslogLength) {
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogPosition, long lastTranslogLength, int lastTotalTranslogOperations) {
this.indexCommit = indexCommit;
this.translogSnapshot = translogSnapshot;
this.lastIndexVersion = lastIndexVersion;
this.lastTranslogId = lastTranslogId;
this.lastTranslogPosition = lastTranslogPosition;
this.lastTranslogLength = lastTranslogLength;
this.lastTotalTranslogOperations = lastTotalTranslogOperations;
}
/**
@ -132,5 +134,9 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
public long lastTranslogLength() {
return lastTranslogLength;
}
public int lastTotalTranslogOperations() {
return this.lastTotalTranslogOperations;
}
}
}

View File

@ -62,6 +62,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private volatile long lastTranslogPosition;
private volatile int lastTotalTranslogOperations;
private volatile long lastTranslogLength;
private final TimeValue snapshotInterval;
@ -150,6 +152,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
lastTranslogId = -1;
lastTranslogPosition = 0;
lastTranslogLength = 0;
lastTotalTranslogOperations = recoveryStatus.translog().currentTranslogOperations();
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
@ -221,12 +224,13 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
logger.debug("snapshot ({}) to {} ...", reason, shardGateway);
SnapshotStatus snapshotStatus =
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength));
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength, lastTotalTranslogOperations));
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogPosition = translogSnapshot.position();
lastTranslogLength = translogSnapshot.length();
lastTotalTranslogOperations = translogSnapshot.totalOperations();
return snapshotStatus;
}
return null;
@ -237,7 +241,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
StringBuilder sb = new StringBuilder();
sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n");
sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.index().time())).append("]\n");
sb.append(" translog : id [").append(lastTranslogId).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]");
sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [" + snapshotStatus.translog().expectedNumberOfOperations() + "], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]");
logger.debug(sb.toString());
}
}

View File

@ -79,7 +79,7 @@ public class RecoveryStatus {
public static class Translog {
private long startTime = 0;
private long time;
private volatile long currentTranslogOperations = 0;
private volatile int currentTranslogOperations = 0;
public long startTime() {
return this.startTime;
@ -97,11 +97,11 @@ public class RecoveryStatus {
this.time = time;
}
public void addTranslogOperations(long count) {
public void addTranslogOperations(int count) {
this.currentTranslogOperations += count;
}
public long currentTranslogOperations() {
public int currentTranslogOperations() {
return this.currentTranslogOperations;
}
}

View File

@ -122,6 +122,7 @@ public class SnapshotStatus {
public static class Translog {
private long startTime;
private long time;
private int expectedNumberOfOperations;
public long startTime() {
return this.startTime;
@ -138,5 +139,13 @@ public class SnapshotStatus {
public void time(long time) {
this.time = time;
}
public int expectedNumberOfOperations() {
return expectedNumberOfOperations;
}
public void expectedNumberOfOperations(int expectedNumberOfOperations) {
this.expectedNumberOfOperations = expectedNumberOfOperations;
}
}
}

View File

@ -235,8 +235,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// Note, we assume the snapshot is always started from "base 0". We need to seek forward if we want to lastTranslogPosition if we want the delta
List<CommitPoint.FileInfo> translogCommitPointFiles = Lists.newArrayList();
int expectedNumberOfOperations = 0;
boolean snapshotRequired = snapshot.newTranslogCreated();
if (!snapshot.newTranslogCreated()) {
if (snapshot.newTranslogCreated()) {
snapshotRequired = true;
expectedNumberOfOperations = translogSnapshot.totalOperations();
} else {
// if we have a commit point, check that we have all the files listed in it
if (!commitPoints.commits().isEmpty()) {
CommitPoint commitPoint = commitPoints.commits().get(0);
@ -253,12 +257,16 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if (snapshot.sameTranslogNewOperations()) {
translogSnapshot.seekForward(snapshot.lastTranslogPosition());
snapshotRequired = true;
expectedNumberOfOperations = translogSnapshot.totalOperations() - snapshot.lastTotalTranslogOperations();
}
} else {
// a full translog snapshot is required
expectedNumberOfOperations = translogSnapshot.totalOperations();
snapshotRequired = true;
}
}
}
currentSnapshotStatus.translog().expectedNumberOfOperations(expectedNumberOfOperations);
if (snapshotRequired) {
CommitPoint.FileInfo addedTranslogFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), "translog-" + translogSnapshot.translogId(), translogSnapshot.lengthInBytes());

View File

@ -104,6 +104,16 @@ public interface Translog extends IndexShardComponent {
*/
long length();
/**
* The total number of operations in the translog.
*/
int totalOperations();
/**
* The number of operations in this snapshot.
*/
int snapshotOperations();
boolean hasNext();
Operation next();
@ -116,7 +126,7 @@ public interface Translog extends IndexShardComponent {
InputStream stream() throws IOException;
/**
* The length in bytes of this channel.
* The length in bytes of this stream.
*/
long lengthInBytes();
}

View File

@ -41,6 +41,10 @@ public class FsChannelSnapshot implements Translog.Snapshot {
private final long id;
private final int totalOperations;
private final int snapshotOperations;
private final RafReference raf;
private final FileChannel channel;
@ -53,12 +57,14 @@ public class FsChannelSnapshot implements Translog.Snapshot {
private ByteBuffer cacheBuffer;
public FsChannelSnapshot(ShardId shardId, long id, RafReference raf, long length) throws FileNotFoundException {
public FsChannelSnapshot(ShardId shardId, long id, RafReference raf, long length, int totalOperations, int snapshotOperations) throws FileNotFoundException {
this.shardId = shardId;
this.id = id;
this.raf = raf;
this.channel = raf.raf().getChannel();
this.length = length;
this.totalOperations = totalOperations;
this.snapshotOperations = snapshotOperations;
}
@Override public long translogId() {
@ -73,6 +79,14 @@ public class FsChannelSnapshot implements Translog.Snapshot {
return this.length;
}
@Override public int totalOperations() {
return this.totalOperations;
}
@Override public int snapshotOperations() {
return this.snapshotOperations;
}
@Override public InputStream stream() throws IOException {
return new FileChannelInputStream(channel, position, lengthInBytes());
}

View File

@ -37,6 +37,10 @@ public class FsStreamSnapshot implements Translog.Snapshot {
private final long id;
private final int totalOperations;
private final int snapshotOperations;
private final RafReference raf;
private final long length;
@ -49,11 +53,13 @@ public class FsStreamSnapshot implements Translog.Snapshot {
private byte[] cachedData;
public FsStreamSnapshot(ShardId shardId, long id, RafReference raf, long length) throws FileNotFoundException {
public FsStreamSnapshot(ShardId shardId, long id, RafReference raf, long length, int totalOperations, int snapshotOperations) throws FileNotFoundException {
this.shardId = shardId;
this.id = id;
this.raf = raf;
this.length = length;
this.totalOperations = totalOperations;
this.snapshotOperations = snapshotOperations;
this.dis = new DataInputStream(new FileInputStream(raf.file()));
}
@ -69,6 +75,14 @@ public class FsStreamSnapshot implements Translog.Snapshot {
return this.length;
}
@Override public int totalOperations() {
return this.totalOperations;
}
@Override public int snapshotOperations() {
return this.snapshotOperations;
}
@Override public InputStream stream() throws IOException {
return dis;
}

View File

@ -130,9 +130,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
try {
raf.increaseRefCount();
if (useStream) {
return new FsStreamSnapshot(shardId, this.id, raf, lastPosition);
return new FsStreamSnapshot(shardId, this.id, raf, lastPosition, operationCounter.get(), operationCounter.get());
} else {
return new FsChannelSnapshot(shardId, this.id, raf, lastPosition);
return new FsChannelSnapshot(shardId, this.id, raf, lastPosition, operationCounter.get(), operationCounter.get());
}
} catch (IOException e) {
throw new TranslogException(shardId, "Failed to snapshot", e);
@ -148,11 +148,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
try {
raf.increaseRefCount();
if (useStream) {
FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastPosition);
FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastPosition, operationCounter.get(), operationCounter.get() - snapshot.totalOperations());
newSnapshot.seekForward(snapshot.position());
return newSnapshot;
} else {
FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastPosition);
FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastPosition, operationCounter.get(), operationCounter.get() - snapshot.totalOperations());
newSnapshot.seekForward(snapshot.position());
return newSnapshot;
}

View File

@ -205,6 +205,10 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.field("size_in_bytes", gatewaySnapshotStatus.indexSize().bytes());
builder.endObject();
builder.startObject("index");
builder.field("expected_operations", gatewaySnapshotStatus.expectedNumberOfOperations());
builder.endObject();
builder.endObject();
}

View File

@ -59,21 +59,29 @@ public abstract class AbstractSimpleTranslogTests {
translog.add(new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1));
assertThat(snapshot.totalOperations(), equalTo(1));
assertThat(snapshot.snapshotOperations(), equalTo(1));
snapshot.release();
translog.add(new Translog.Index("test", "2", new byte[]{2}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(2));
assertThat(snapshot.totalOperations(), equalTo(2));
assertThat(snapshot.snapshotOperations(), equalTo(2));
snapshot.release();
translog.add(new Translog.Delete(newUid("3")));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(3));
assertThat(snapshot.totalOperations(), equalTo(3));
assertThat(snapshot.snapshotOperations(), equalTo(3));
snapshot.release();
translog.add(new Translog.DeleteByQuery(new byte[]{4}, null));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(4));
assertThat(snapshot.totalOperations(), equalTo(4));
assertThat(snapshot.snapshotOperations(), equalTo(4));
snapshot.release();
snapshot = translog.snapshot();
@ -104,6 +112,8 @@ public abstract class AbstractSimpleTranslogTests {
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(0));
assertThat(snapshot.totalOperations(), equalTo(0));
assertThat(snapshot.snapshotOperations(), equalTo(0));
snapshot.release();
}
@ -115,6 +125,8 @@ public abstract class AbstractSimpleTranslogTests {
translog.add(new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1));
assertThat(snapshot.totalOperations(), equalTo(1));
assertThat(snapshot.snapshotOperations(), equalTo(1));
snapshot.release();
snapshot = translog.snapshot();
@ -127,10 +139,14 @@ public abstract class AbstractSimpleTranslogTests {
// we use the translogSize to also navigate to the last position on this snapshot
// so snapshot(Snapshot) will work properly
assertThat(snapshot1, translogSize(1));
assertThat(snapshot1.totalOperations(), equalTo(1));
assertThat(snapshot1.snapshotOperations(), equalTo(1));
translog.add(new Translog.Index("test", "2", new byte[]{2}));
snapshot = translog.snapshot(snapshot1);
assertThat(snapshot, translogSize(1));
assertThat(snapshot.totalOperations(), equalTo(2));
assertThat(snapshot.snapshotOperations(), equalTo(1));
snapshot.release();
snapshot = translog.snapshot(snapshot1);
@ -138,6 +154,8 @@ public abstract class AbstractSimpleTranslogTests {
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index.source(), equalTo(new byte[]{2}));
assertThat(snapshot.hasNext(), equalTo(false));
assertThat(snapshot.totalOperations(), equalTo(2));
assertThat(snapshot.snapshotOperations(), equalTo(1));
snapshot.release();
snapshot1.release();
}

View File

@ -66,11 +66,10 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
// Translog tests
logger.info("Creating index [{}]", "test");
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
client("server1").admin().indices().prepareCreate("test").execute().actionGet();
// create a mapping
PutMappingResponse putMappingResponse = client("server1").admin().indices().putMapping(putMappingRequest("test").type("type1")
.source(mappingSource())).actionGet();
PutMappingResponse putMappingResponse = client("server1").admin().indices().preparePutMapping("test").setType("type1").setSource(mappingSource()).execute().actionGet();
assertThat(putMappingResponse.acknowledged(), equalTo(true));
// verify that mapping is there