Fix Translog.Delete serialization for sequence numbers (#22543)
* Fix Translog.Delete serialization for sequence numbers Translog.Delete used `.writeVLong` instead of `.writeLong` for the sequence number and primary term (and their respective "read" variants). This could lead to issues where a 5.x node sent a translog operation with a negative sequence number (-2 for unassigned seq no) that tripped an assertion serializing a negative number and causing ES to exit. Adds a unit test for serialization and a mixed-cluster REST test, since that was how this was originally caught. * Use more realistic values for random seqNum and primary term * Add comment with TODO for removal in 7.0 * Change comment into an assert
This commit is contained in:
parent
389ffc93d8
commit
fed2a1a822
|
@ -855,7 +855,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
in.readLong(); // ttl
|
in.readLong(); // ttl
|
||||||
}
|
}
|
||||||
this.versionType = VersionType.fromValue(in.readByte());
|
this.versionType = VersionType.fromValue(in.readByte());
|
||||||
assert versionType.validateVersionForWrites(this.version);
|
assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version;
|
||||||
if (format >= FORMAT_AUTO_GENERATED_IDS) {
|
if (format >= FORMAT_AUTO_GENERATED_IDS) {
|
||||||
this.autoGeneratedIdTimestamp = in.readLong();
|
this.autoGeneratedIdTimestamp = in.readLong();
|
||||||
} else {
|
} else {
|
||||||
|
@ -1036,8 +1036,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
this.versionType = VersionType.fromValue(in.readByte());
|
this.versionType = VersionType.fromValue(in.readByte());
|
||||||
assert versionType.validateVersionForWrites(this.version);
|
assert versionType.validateVersionForWrites(this.version);
|
||||||
if (format >= FORMAT_SEQ_NO) {
|
if (format >= FORMAT_SEQ_NO) {
|
||||||
seqNo = in.readVLong();
|
seqNo = in.readLong();
|
||||||
primaryTerm = in.readVLong();
|
primaryTerm = in.readLong();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1100,8 +1100,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
out.writeString(uid.text());
|
out.writeString(uid.text());
|
||||||
out.writeLong(version);
|
out.writeLong(version);
|
||||||
out.writeByte(versionType.getValue());
|
out.writeByte(versionType.getValue());
|
||||||
out.writeVLong(seqNo);
|
out.writeLong(seqNo);
|
||||||
out.writeVLong(primaryTerm);
|
out.writeLong(primaryTerm);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,6 +23,9 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.apache.lucene.codecs.CodecUtil;
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
|
import org.apache.lucene.document.Field;
|
||||||
|
import org.apache.lucene.document.NumericDocValuesField;
|
||||||
|
import org.apache.lucene.document.TextField;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.mockfile.FilterFileChannel;
|
import org.apache.lucene.mockfile.FilterFileChannel;
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
|
@ -31,6 +34,7 @@ import org.apache.lucene.store.MockDirectoryWrapper;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.LineFileDocs;
|
import org.apache.lucene.util.LineFileDocs;
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
@ -47,6 +51,12 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
import org.elasticsearch.index.engine.Engine.Operation.Origin;
|
||||||
|
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||||
|
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
|
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.translog.Translog.Location;
|
import org.elasticsearch.index.translog.Translog.Location;
|
||||||
|
@ -67,6 +77,7 @@ import java.nio.file.InvalidPathException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -297,14 +308,14 @@ public class TranslogTests extends ESTestCase {
|
||||||
{
|
{
|
||||||
final TranslogStats stats = stats();
|
final TranslogStats stats = stats();
|
||||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(2L));
|
assertThat(stats.estimatedNumberOfOperations(), equalTo(2L));
|
||||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(125L));
|
assertThat(stats.getTranslogSizeInBytes(), equalTo(139L));
|
||||||
}
|
}
|
||||||
|
|
||||||
translog.add(new Translog.Delete(newUid("3")));
|
translog.add(new Translog.Delete(newUid("3")));
|
||||||
{
|
{
|
||||||
final TranslogStats stats = stats();
|
final TranslogStats stats = stats();
|
||||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(3L));
|
assertThat(stats.estimatedNumberOfOperations(), equalTo(3L));
|
||||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(153L));
|
assertThat(stats.getTranslogSizeInBytes(), equalTo(181L));
|
||||||
}
|
}
|
||||||
|
|
||||||
final long seqNo = 1;
|
final long seqNo = 1;
|
||||||
|
@ -313,10 +324,10 @@ public class TranslogTests extends ESTestCase {
|
||||||
{
|
{
|
||||||
final TranslogStats stats = stats();
|
final TranslogStats stats = stats();
|
||||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
|
assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
|
||||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(195L));
|
assertThat(stats.getTranslogSizeInBytes(), equalTo(223L));
|
||||||
}
|
}
|
||||||
|
|
||||||
final long expectedSizeInBytes = 238L;
|
final long expectedSizeInBytes = 266L;
|
||||||
translog.prepareCommit();
|
translog.prepareCommit();
|
||||||
{
|
{
|
||||||
final TranslogStats stats = stats();
|
final TranslogStats stats = stats();
|
||||||
|
@ -1993,4 +2004,47 @@ public class TranslogTests extends ESTestCase {
|
||||||
public static Translog.Location randomTranslogLocation() {
|
public static Translog.Location randomTranslogLocation() {
|
||||||
return new Translog.Location(randomLong(), randomLong(), randomInt());
|
return new Translog.Location(randomLong(), randomLong(), randomInt());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testTranslogOpSerialization() throws Exception {
|
||||||
|
BytesReference B_1 = new BytesArray(new byte[]{1});
|
||||||
|
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
|
||||||
|
assert Version.CURRENT.major <= 6 : "Using UNASSIGNED_SEQ_NO can be removed in 7.0, because 6.0+ nodes have actual sequence numbers";
|
||||||
|
long randomSeqNum = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong();
|
||||||
|
long randomPrimaryTerm = randomBoolean() ? 0 : randomNonNegativeLong();
|
||||||
|
seqID.seqNo.setLongValue(randomSeqNum);
|
||||||
|
seqID.seqNoDocValue.setLongValue(randomSeqNum);
|
||||||
|
seqID.primaryTerm.setLongValue(randomPrimaryTerm);
|
||||||
|
Field uidField = new Field("_uid", "1", UidFieldMapper.Defaults.FIELD_TYPE);
|
||||||
|
Field versionField = new NumericDocValuesField("_version", 1);
|
||||||
|
Document document = new Document();
|
||||||
|
document.add(new TextField("value", "test", Field.Store.YES));
|
||||||
|
document.add(uidField);
|
||||||
|
document.add(versionField);
|
||||||
|
document.add(seqID.seqNo);
|
||||||
|
document.add(seqID.seqNoDocValue);
|
||||||
|
document.add(seqID.primaryTerm);
|
||||||
|
ParsedDocument doc = new ParsedDocument(versionField, seqID, "1", "type", null, Arrays.asList(document), B_1, null);
|
||||||
|
|
||||||
|
Engine.Index eIndex = new Engine.Index(newUid("1"), doc, randomSeqNum, randomPrimaryTerm,
|
||||||
|
1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false);
|
||||||
|
Engine.IndexResult eIndexResult = new Engine.IndexResult(1, randomSeqNum, true);
|
||||||
|
Translog.Index index = new Translog.Index(eIndex, eIndexResult);
|
||||||
|
|
||||||
|
BytesStreamOutput out = new BytesStreamOutput();
|
||||||
|
index.writeTo(out);
|
||||||
|
StreamInput in = out.bytes().streamInput();
|
||||||
|
Translog.Index serializedIndex = new Translog.Index(in);
|
||||||
|
assertEquals(index, serializedIndex);
|
||||||
|
|
||||||
|
Engine.Delete eDelete = new Engine.Delete("type", "1", newUid("1"), randomSeqNum, randomPrimaryTerm,
|
||||||
|
2, VersionType.INTERNAL, Origin.PRIMARY, 0);
|
||||||
|
Engine.DeleteResult eDeleteResult = new Engine.DeleteResult(2, randomSeqNum, true);
|
||||||
|
Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);
|
||||||
|
|
||||||
|
out = new BytesStreamOutput();
|
||||||
|
delete.writeTo(out);
|
||||||
|
in = out.bytes().streamInput();
|
||||||
|
Translog.Delete serializedDelete = new Translog.Delete(in);
|
||||||
|
assertEquals(delete, serializedDelete);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,13 @@
|
||||||
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
|
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
|
||||||
- '{"f1": "v5_mixed", "f2": 9}'
|
- '{"f1": "v5_mixed", "f2": 9}'
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_index
|
||||||
|
type: test_type
|
||||||
|
id: d10
|
||||||
|
body: {"f1": "v6_mixed", "f2": 10}
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
indices.flush:
|
indices.flush:
|
||||||
index: test_index
|
index: test_index
|
||||||
|
@ -34,7 +41,23 @@
|
||||||
search:
|
search:
|
||||||
index: test_index
|
index: test_index
|
||||||
|
|
||||||
- match: { hits.total: 10 } # 5 docs from old cluster, 5 docs from mixed cluster
|
- match: { hits.total: 11 } # 5 docs from old cluster, 6 docs from mixed cluster
|
||||||
|
|
||||||
|
- do:
|
||||||
|
delete:
|
||||||
|
index: test_index
|
||||||
|
type: test_type
|
||||||
|
id: d10
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.flush:
|
||||||
|
index: test_index
|
||||||
|
|
||||||
|
- do:
|
||||||
|
search:
|
||||||
|
index: test_index
|
||||||
|
|
||||||
|
- match: { hits.total: 10 }
|
||||||
|
|
||||||
---
|
---
|
||||||
"Verify custom cluster metadata still exists during upgrade":
|
"Verify custom cluster metadata still exists during upgrade":
|
||||||
|
|
Loading…
Reference in New Issue