diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index f7637592613..13e3381b115 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -19,6 +19,11 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -32,7 +37,7 @@ import java.util.stream.Collectors; * otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence * number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr"). */ -public final class RetentionLease { +public final class RetentionLease implements Writeable { private final String id; @@ -116,6 +121,33 @@ public final class RetentionLease { this.source = source; } + /** + * Constructs a new retention lease from a stream. The retention lease should have been written via {@link #writeTo(StreamOutput)}. + * + * @param in the stream to construct the retention lease from + * @throws IOException if an I/O exception occurs reading from the stream + */ + public RetentionLease(final StreamInput in) throws IOException { + id = in.readString(); + retainingSequenceNumber = in.readZLong(); + timestamp = in.readVLong(); + source = in.readString(); + } + + /** + * Writes a retention lease to a stream in a manner suitable for later reconstruction via {@link #RetentionLease(StreamInput)}. + * + * @param out the stream to write the retention lease to + * @throws IOException if an I/O exception occurs writing to the stream + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeString(id); + out.writeZLong(retainingSequenceNumber); + out.writeVLong(timestamp); + out.writeString(source); + } + /** * Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is * encoded in the format id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}. diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index c4340a381ce..500393f2cfa 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -19,8 +19,11 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -82,6 +85,20 @@ public class RetentionLeaseTests extends ESTestCase { assertThat(e, hasToString(containsString("retention lease source can not be empty"))); } + public void testRetentionLeaseSerialization() throws IOException { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); + try (BytesStreamOutput out = new BytesStreamOutput()) { + retentionLease.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(retentionLease, equalTo(new RetentionLease(in))); + } + } + } + public void testRetentionLeaseEncoding() { final String id = randomAlphaOfLength(8); final long retainingSequenceNumber = randomNonNegativeLong();