`type` and `id` are lost upon serialization of `Translog.Delete`. (#24586)
This was introduced in #24460: the constructor of `Translog.Delete` that takes a `StreamInput` does not set the type and id. To make it a bit more robust, I made fields final so that forgetting to set them would make the compiler complain.
This commit is contained in:
parent
5e8b569255
commit
87d19b21c7
|
@ -1102,8 +1102,8 @@ public abstract class Engine implements Closeable {
|
||||||
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
|
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
|
||||||
Origin origin, long startTime) {
|
Origin origin, long startTime) {
|
||||||
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
|
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
|
||||||
this.type = type;
|
this.type = Objects.requireNonNull(type);
|
||||||
this.id = id;
|
this.id = Objects.requireNonNull(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Delete(String type, String id, Term uid) {
|
public Delete(String type, String id, Term uid) {
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
import org.elasticsearch.index.mapper.Uid;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
import org.elasticsearch.index.shard.IndexShardComponent;
|
import org.elasticsearch.index.shard.IndexShardComponent;
|
||||||
|
@ -58,6 +59,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
@ -919,8 +921,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
private final String id;
|
private final String id;
|
||||||
private final long autoGeneratedIdTimestamp;
|
private final long autoGeneratedIdTimestamp;
|
||||||
private final String type;
|
private final String type;
|
||||||
private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
private final long seqNo;
|
||||||
private long primaryTerm = 0;
|
private final long primaryTerm;
|
||||||
private final long version;
|
private final long version;
|
||||||
private final VersionType versionType;
|
private final VersionType versionType;
|
||||||
private final BytesReference source;
|
private final BytesReference source;
|
||||||
|
@ -950,6 +952,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
if (format >= FORMAT_SEQ_NO) {
|
if (format >= FORMAT_SEQ_NO) {
|
||||||
seqNo = in.readLong();
|
seqNo = in.readLong();
|
||||||
primaryTerm = in.readLong();
|
primaryTerm = in.readLong();
|
||||||
|
} else {
|
||||||
|
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||||
|
primaryTerm = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -976,6 +981,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.source = new BytesArray(source);
|
this.source = new BytesArray(source);
|
||||||
this.seqNo = seqNo;
|
this.seqNo = seqNo;
|
||||||
|
this.primaryTerm = 0;
|
||||||
this.version = version;
|
this.version = version;
|
||||||
this.versionType = versionType;
|
this.versionType = versionType;
|
||||||
this.routing = routing;
|
this.routing = routing;
|
||||||
|
@ -1113,27 +1119,42 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
|
|
||||||
public static class Delete implements Operation {
|
public static class Delete implements Operation {
|
||||||
|
|
||||||
private static final int FORMAT_5_X = 2;
|
public static final int FORMAT_5_0 = 2; // 5.0 - 5.5
|
||||||
private static final int FORMAT_SEQ_NO = FORMAT_5_X + 1;
|
private static final int FORMAT_SINGLE_TYPE = FORMAT_5_0 + 1; // 5.5 - 6.0
|
||||||
|
private static final int FORMAT_SEQ_NO = FORMAT_SINGLE_TYPE + 1; // 6.0 - *
|
||||||
public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO;
|
public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO;
|
||||||
|
|
||||||
private String type, id;
|
private final String type, id;
|
||||||
private Term uid;
|
private final Term uid;
|
||||||
private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
private final long seqNo;
|
||||||
private long primaryTerm = 0;
|
private final long primaryTerm;
|
||||||
private long version = Versions.MATCH_ANY;
|
private final long version;
|
||||||
private VersionType versionType = VersionType.INTERNAL;
|
private final VersionType versionType;
|
||||||
|
|
||||||
public Delete(StreamInput in) throws IOException {
|
public Delete(StreamInput in) throws IOException {
|
||||||
final int format = in.readVInt();// SERIALIZATION_FORMAT
|
final int format = in.readVInt();// SERIALIZATION_FORMAT
|
||||||
assert format >= FORMAT_5_X : "format was: " + format;
|
assert format >= FORMAT_5_0 : "format was: " + format;
|
||||||
|
if (format >= FORMAT_SINGLE_TYPE) {
|
||||||
|
type = in.readString();
|
||||||
|
id = in.readString();
|
||||||
uid = new Term(in.readString(), in.readString());
|
uid = new Term(in.readString(), in.readString());
|
||||||
|
} else {
|
||||||
|
uid = new Term(in.readString(), in.readString());
|
||||||
|
// the uid was constructed from the type and id so we can
|
||||||
|
// extract them back
|
||||||
|
Uid uidObject = Uid.createUid(uid.text());
|
||||||
|
type = uidObject.type();
|
||||||
|
id = uidObject.id();
|
||||||
|
}
|
||||||
this.version = in.readLong();
|
this.version = in.readLong();
|
||||||
this.versionType = VersionType.fromValue(in.readByte());
|
this.versionType = VersionType.fromValue(in.readByte());
|
||||||
assert versionType.validateVersionForWrites(this.version);
|
assert versionType.validateVersionForWrites(this.version);
|
||||||
if (format >= FORMAT_SEQ_NO) {
|
if (format >= FORMAT_SEQ_NO) {
|
||||||
seqNo = in.readLong();
|
seqNo = in.readLong();
|
||||||
primaryTerm = in.readLong();
|
primaryTerm = in.readLong();
|
||||||
|
} else {
|
||||||
|
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||||
|
primaryTerm = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1147,8 +1168,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
}
|
}
|
||||||
|
|
||||||
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
|
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
|
||||||
this.type = type;
|
this.type = Objects.requireNonNull(type);
|
||||||
this.id = id;
|
this.id = Objects.requireNonNull(id);
|
||||||
this.uid = uid;
|
this.uid = uid;
|
||||||
this.seqNo = seqNo;
|
this.seqNo = seqNo;
|
||||||
this.primaryTerm = primaryTerm;
|
this.primaryTerm = primaryTerm;
|
||||||
|
@ -1204,6 +1225,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
out.writeVInt(SERIALIZATION_FORMAT);
|
out.writeVInt(SERIALIZATION_FORMAT);
|
||||||
|
out.writeString(type);
|
||||||
|
out.writeString(id);
|
||||||
out.writeString(uid.field());
|
out.writeString(uid.field());
|
||||||
out.writeString(uid.text());
|
out.writeString(uid.text());
|
||||||
out.writeLong(version);
|
out.writeLong(version);
|
||||||
|
|
|
@ -1939,7 +1939,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
indexResult = engine.index(index);
|
indexResult = engine.index(index);
|
||||||
assertFalse(indexResult.isCreated());
|
assertFalse(indexResult.isCreated());
|
||||||
|
|
||||||
engine.delete(new Engine.Delete(null, "1", newUid(doc)));
|
engine.delete(new Engine.Delete("doc", "1", newUid(doc)));
|
||||||
|
|
||||||
index = indexForDoc(doc);
|
index = indexForDoc(doc);
|
||||||
indexResult = engine.index(index);
|
indexResult = engine.index(index);
|
||||||
|
|
|
@ -354,24 +354,24 @@ public class TranslogTests extends ESTestCase {
|
||||||
{
|
{
|
||||||
final TranslogStats stats = stats();
|
final TranslogStats stats = stats();
|
||||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(2L));
|
assertThat(stats.estimatedNumberOfOperations(), equalTo(2L));
|
||||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(139L));
|
assertThat(stats.getTranslogSizeInBytes(), equalTo(146L));
|
||||||
}
|
}
|
||||||
|
|
||||||
translog.add(new Translog.Delete("test", "3", 2, newUid("3")));
|
translog.add(new Translog.Delete("test", "3", 2, newUid("3")));
|
||||||
{
|
{
|
||||||
final TranslogStats stats = stats();
|
final TranslogStats stats = stats();
|
||||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(3L));
|
assertThat(stats.estimatedNumberOfOperations(), equalTo(3L));
|
||||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(181L));
|
assertThat(stats.getTranslogSizeInBytes(), equalTo(195L));
|
||||||
}
|
}
|
||||||
|
|
||||||
translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16)));
|
translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16)));
|
||||||
{
|
{
|
||||||
final TranslogStats stats = stats();
|
final TranslogStats stats = stats();
|
||||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
|
assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
|
||||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(223L));
|
assertThat(stats.getTranslogSizeInBytes(), equalTo(237L));
|
||||||
}
|
}
|
||||||
|
|
||||||
final long expectedSizeInBytes = 266L;
|
final long expectedSizeInBytes = 280L;
|
||||||
translog.rollGeneration();
|
translog.rollGeneration();
|
||||||
{
|
{
|
||||||
final TranslogStats stats = stats();
|
final TranslogStats stats = stats();
|
||||||
|
@ -2263,6 +2263,20 @@ public class TranslogTests extends ESTestCase {
|
||||||
in = out.bytes().streamInput();
|
in = out.bytes().streamInput();
|
||||||
Translog.Delete serializedDelete = new Translog.Delete(in);
|
Translog.Delete serializedDelete = new Translog.Delete(in);
|
||||||
assertEquals(delete, serializedDelete);
|
assertEquals(delete, serializedDelete);
|
||||||
|
|
||||||
|
// simulate legacy delete serialization
|
||||||
|
out = new BytesStreamOutput();
|
||||||
|
out.writeVInt(Translog.Delete.FORMAT_5_0);
|
||||||
|
out.writeString(UidFieldMapper.NAME);
|
||||||
|
out.writeString("my_type#my_id");
|
||||||
|
out.writeLong(3); // version
|
||||||
|
out.writeByte(VersionType.INTERNAL.getValue());
|
||||||
|
out.writeLong(2); // seq no
|
||||||
|
out.writeLong(0); // primary term
|
||||||
|
in = out.bytes().streamInput();
|
||||||
|
serializedDelete = new Translog.Delete(in);
|
||||||
|
assertEquals("my_type", serializedDelete.type());
|
||||||
|
assertEquals("my_id", serializedDelete.id());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRollGeneration() throws IOException {
|
public void testRollGeneration() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue