diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 93f9db67b35..2946bc9c447 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -33,7 +33,7 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.util.BigArrays; @@ -712,7 +712,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * A generic interface representing an operation performed on the transaction log. * Each is associated with a type. */ - public interface Operation extends Streamable { + public interface Operation extends Writeable { enum Type { @Deprecated CREATE((byte) 1), @@ -721,7 +721,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final byte id; - private Type(byte id) { + Type(byte id) { this.id = id; } @@ -749,6 +749,33 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC Source getSource(); + /** + * Reads the type and the operation from the given stream. The operatino must be written with + * {@link #writeType(Operation, StreamOutput)} + */ + static Operation readType(StreamInput input) throws IOException { + Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); + switch (type) { + case CREATE: + // the deserialization logic in Index was identical to that of Create when create was deprecated + return new Index(input); + case DELETE: + return new Translog.Delete(input); + case INDEX: + return new Index(input); + default: + throw new IOException("No type for [" + type + "]"); + } + } + + /** + * Writes the type and translog operation to the given stream + */ + static void writeType(Translog.Operation operation, StreamOutput output) throws IOException { + output.writeByte(operation.opType().id()); + operation.writeTo(output); + } + } public static class Source { @@ -768,19 +795,30 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } public static class Index implements Operation { - public static final int SERIALIZATION_FORMAT = 6; + public static final int SERIALIZATION_FORMAT = 6; // since 2.0-beta1 and 1.1 + private final String id; + private final String type; + private final long version; + private final VersionType versionType; + private final BytesReference source; + private final String routing; + private final String parent; + private final long timestamp; + private final long ttl; - private String id; - private String type; - private long version = Versions.MATCH_ANY; - private VersionType versionType = VersionType.INTERNAL; - private BytesReference source; - private String routing; - private String parent; - private long timestamp; - private long ttl; - - public Index() { + public Index(StreamInput in) throws IOException { + final int format = in.readVInt(); // SERIALIZATION_FORMAT + assert format == SERIALIZATION_FORMAT : "format was: " + format; + id = in.readString(); + type = in.readString(); + source = in.readBytesReference(); + routing = in.readOptionalString(); + parent = in.readOptionalString(); + this.version = in.readLong(); + this.timestamp = in.readLong(); + this.ttl = in.readLong(); + this.versionType = VersionType.fromValue(in.readByte()); + assert versionType.validateVersionForWrites(this.version); } public Index(Engine.Index index) { @@ -799,6 +837,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC this.type = type; this.id = id; this.source = new BytesArray(source); + version = Versions.MATCH_ANY; + versionType = VersionType.INTERNAL; + routing = null; + parent = null; + timestamp = 0; + ttl = 0; } @Override @@ -852,60 +896,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return new Source(source, routing, parent, timestamp, ttl); } - @Override - public void readFrom(StreamInput in) throws IOException { - int version = in.readVInt(); // version - id = in.readString(); - type = in.readString(); - source = in.readBytesReference(); - try { - if (version >= 1) { - if (in.readBoolean()) { - routing = in.readString(); - } - } - if (version >= 2) { - if (in.readBoolean()) { - parent = in.readString(); - } - } - if (version >= 3) { - this.version = in.readLong(); - } - if (version >= 4) { - this.timestamp = in.readLong(); - } - if (version >= 5) { - this.ttl = in.readLong(); - } - if (version >= 6) { - this.versionType = VersionType.fromValue(in.readByte()); - } - } catch (Exception e) { - throw new ElasticsearchException("failed to read [" + type + "][" + id + "]", e); - } - - assert versionType.validateVersionForWrites(version); - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(SERIALIZATION_FORMAT); out.writeString(id); out.writeString(type); out.writeBytesReference(source); - if (routing == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeString(routing); - } - if (parent == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeString(parent); - } + out.writeOptionalString(routing); + out.writeOptionalString(parent); out.writeLong(version); out.writeLong(timestamp); out.writeLong(ttl); @@ -963,23 +961,29 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } public static class Delete implements Operation { - public static final int SERIALIZATION_FORMAT = 2; + public static final int SERIALIZATION_FORMAT = 2; // since 2.0-beta1 and 1.1 - private Term uid; - private long version = Versions.MATCH_ANY; - private VersionType versionType = VersionType.INTERNAL; + private final Term uid; + private final long version; + private final VersionType versionType; - public Delete() { + public Delete(StreamInput in) throws IOException { + final int format = in.readVInt();// SERIALIZATION_FORMAT + assert format == SERIALIZATION_FORMAT : "format was: " + format; + uid = new Term(in.readString(), in.readString()); + this.version = in.readLong(); + this.versionType = VersionType.fromValue(in.readByte()); + assert versionType.validateVersionForWrites(this.version); } public Delete(Engine.Delete delete) { - this(delete.uid()); + this.uid = delete.uid(); this.version = delete.version(); this.versionType = delete.versionType(); } public Delete(Term uid) { - this.uid = uid; + this(uid, Versions.MATCH_ANY, VersionType.INTERNAL); } public Delete(Term uid, long version, VersionType versionType) { @@ -1015,20 +1019,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC throw new IllegalStateException("trying to read doc source from delete operation"); } - @Override - public void readFrom(StreamInput in) throws IOException { - int version = in.readVInt(); // version - uid = new Term(in.readString(), in.readString()); - if (version >= 1) { - this.version = in.readLong(); - } - if (version >= 2) { - this.versionType = VersionType.fromValue(in.readByte()); - } - assert versionType.validateVersionForWrites(version); - - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(SERIALIZATION_FORMAT); @@ -1107,7 +1097,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException { - Translog.Operation operation; + final Translog.Operation operation; try { final int opSize = in.readInt(); if (opSize < 4) { // 4byte for the checksum @@ -1124,9 +1114,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC verifyChecksum(in); in.reset(); } - Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); - operation = newOperationFromType(type); - operation.readFrom(in); + operation = Translog.Operation.readType(in); verifyChecksum(in); } catch (EOFException e) { throw new TruncatedTranslogException("reached premature end of file, translog is truncated", e); @@ -1169,30 +1157,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // because closing it closes the underlying stream, which we don't // want to do here. out.resetDigest(); - out.writeByte(op.opType().id()); - op.writeTo(out); + Translog.Operation.writeType(op, out); long checksum = out.getChecksum(); out.writeInt((int) checksum); } - /** - * Returns a new empty translog operation for the given {@link Translog.Operation.Type} - */ - static Translog.Operation newOperationFromType(Translog.Operation.Type type) throws IOException { - switch (type) { - case CREATE: - // the deserialization logic in Index was identical to that of Create when create was deprecated - return new Index(); - case DELETE: - return new Translog.Delete(); - case INDEX: - return new Index(); - default: - throw new IOException("No type for [" + type + "]"); - } - } - - @Override public void prepareCommit() throws IOException { try (ReleasableLock lock = writeLock.acquire()) {