Drop 1.x BWC and cut over to Writeable for Translog.Operation (#18565)

We still maintain BWC for the translog operations back to 1.1 which is not
supported in the current version anyway. This commit drops the bwc and moves
the operations to the Writeable interface enforcing immutability.
This commit is contained in:
Simon Willnauer 2016-05-25 11:51:28 +02:00
parent 6862c48791
commit eab3113204
1 changed files with 77 additions and 108 deletions

View File

@ -33,7 +33,7 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.util.BigArrays;
@ -712,7 +712,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* A generic interface representing an operation performed on the transaction log.
* Each is associated with a type.
*/
public interface Operation extends Streamable {
public interface Operation extends Writeable {
enum Type {
@Deprecated
CREATE((byte) 1),
@ -721,7 +721,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final byte id;
private Type(byte id) {
Type(byte id) {
this.id = id;
}
@ -749,6 +749,33 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Source getSource();
/**
* Reads the type and the operation from the given stream. The operatino must be written with
* {@link #writeType(Operation, StreamOutput)}
*/
static Operation readType(StreamInput input) throws IOException {
Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
switch (type) {
case CREATE:
// the deserialization logic in Index was identical to that of Create when create was deprecated
return new Index(input);
case DELETE:
return new Translog.Delete(input);
case INDEX:
return new Index(input);
default:
throw new IOException("No type for [" + type + "]");
}
}
/**
* Writes the type and translog operation to the given stream
*/
static void writeType(Translog.Operation operation, StreamOutput output) throws IOException {
output.writeByte(operation.opType().id());
operation.writeTo(output);
}
}
public static class Source {
@ -768,19 +795,30 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
public static class Index implements Operation {
public static final int SERIALIZATION_FORMAT = 6;
public static final int SERIALIZATION_FORMAT = 6; // since 2.0-beta1 and 1.1
private final String id;
private final String type;
private final long version;
private final VersionType versionType;
private final BytesReference source;
private final String routing;
private final String parent;
private final long timestamp;
private final long ttl;
private String id;
private String type;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private BytesReference source;
private String routing;
private String parent;
private long timestamp;
private long ttl;
public Index() {
public Index(StreamInput in) throws IOException {
final int format = in.readVInt(); // SERIALIZATION_FORMAT
assert format == SERIALIZATION_FORMAT : "format was: " + format;
id = in.readString();
type = in.readString();
source = in.readBytesReference();
routing = in.readOptionalString();
parent = in.readOptionalString();
this.version = in.readLong();
this.timestamp = in.readLong();
this.ttl = in.readLong();
this.versionType = VersionType.fromValue(in.readByte());
assert versionType.validateVersionForWrites(this.version);
}
public Index(Engine.Index index) {
@ -799,6 +837,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
this.type = type;
this.id = id;
this.source = new BytesArray(source);
version = Versions.MATCH_ANY;
versionType = VersionType.INTERNAL;
routing = null;
parent = null;
timestamp = 0;
ttl = 0;
}
@Override
@ -852,60 +896,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return new Source(source, routing, parent, timestamp, ttl);
}
@Override
public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readString();
type = in.readString();
source = in.readBytesReference();
try {
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readString();
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readString();
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
if (version >= 5) {
this.ttl = in.readLong();
}
if (version >= 6) {
this.versionType = VersionType.fromValue(in.readByte());
}
} catch (Exception e) {
throw new ElasticsearchException("failed to read [" + type + "][" + id + "]", e);
}
assert versionType.validateVersionForWrites(version);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(id);
out.writeString(type);
out.writeBytesReference(source);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeString(routing);
}
if (parent == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeString(parent);
}
out.writeOptionalString(routing);
out.writeOptionalString(parent);
out.writeLong(version);
out.writeLong(timestamp);
out.writeLong(ttl);
@ -963,23 +961,29 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
public static class Delete implements Operation {
public static final int SERIALIZATION_FORMAT = 2;
public static final int SERIALIZATION_FORMAT = 2; // since 2.0-beta1 and 1.1
private Term uid;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private final Term uid;
private final long version;
private final VersionType versionType;
public Delete() {
public Delete(StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
assert format == SERIALIZATION_FORMAT : "format was: " + format;
uid = new Term(in.readString(), in.readString());
this.version = in.readLong();
this.versionType = VersionType.fromValue(in.readByte());
assert versionType.validateVersionForWrites(this.version);
}
public Delete(Engine.Delete delete) {
this(delete.uid());
this.uid = delete.uid();
this.version = delete.version();
this.versionType = delete.versionType();
}
public Delete(Term uid) {
this.uid = uid;
this(uid, Versions.MATCH_ANY, VersionType.INTERNAL);
}
public Delete(Term uid, long version, VersionType versionType) {
@ -1015,20 +1019,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
throw new IllegalStateException("trying to read doc source from delete operation");
}
@Override
public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
uid = new Term(in.readString(), in.readString());
if (version >= 1) {
this.version = in.readLong();
}
if (version >= 2) {
this.versionType = VersionType.fromValue(in.readByte());
}
assert versionType.validateVersionForWrites(version);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
@ -1107,7 +1097,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException {
Translog.Operation operation;
final Translog.Operation operation;
try {
final int opSize = in.readInt();
if (opSize < 4) { // 4byte for the checksum
@ -1124,9 +1114,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
verifyChecksum(in);
in.reset();
}
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
operation = newOperationFromType(type);
operation.readFrom(in);
operation = Translog.Operation.readType(in);
verifyChecksum(in);
} catch (EOFException e) {
throw new TruncatedTranslogException("reached premature end of file, translog is truncated", e);
@ -1169,30 +1157,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// because closing it closes the underlying stream, which we don't
// want to do here.
out.resetDigest();
out.writeByte(op.opType().id());
op.writeTo(out);
Translog.Operation.writeType(op, out);
long checksum = out.getChecksum();
out.writeInt((int) checksum);
}
/**
* Returns a new empty translog operation for the given {@link Translog.Operation.Type}
*/
static Translog.Operation newOperationFromType(Translog.Operation.Type type) throws IOException {
switch (type) {
case CREATE:
// the deserialization logic in Index was identical to that of Create when create was deprecated
return new Index();
case DELETE:
return new Translog.Delete();
case INDEX:
return new Index();
default:
throw new IOException("No type for [" + type + "]");
}
}
@Override
public void prepareCommit() throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {