diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 6f6382d7174..99a0f736051 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action.resync; +import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,35 +26,60 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import java.io.IOException; -import java.util.List; +import java.util.Arrays; +/** + * Represents a batch of operations sent from the primary to its replicas during the primary-replica resync. + */ public final class ResyncReplicationRequest extends ReplicatedWriteRequest { - private List operations; + private Translog.Operation[] operations; ResyncReplicationRequest() { super(); } - public ResyncReplicationRequest(ShardId shardId, List operations) { + public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) { super(shardId); this.operations = operations; } - public List getOperations() { + public Translog.Operation[] getOperations() { return operations; } @Override - public void readFrom(StreamInput in) throws IOException { + public void readFrom(final StreamInput in) throws IOException { + if (in.getVersion().equals(Version.V_6_0_0)) { + /* + * Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a + * byte indicating the type of the operation. + */ + // TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x. + assert Version.CURRENT.major <= 7; + throw new IllegalStateException("resync replication request serialization is broken in 6.0.0"); + } super.readFrom(in); - operations = in.readList(Translog.Operation::readType); + operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - out.writeList(operations); + out.writeArray(Translog.Operation::writeOperation, operations); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final ResyncReplicationRequest that = (ResyncReplicationRequest) o; + return Arrays.equals(operations, that.operations); + } + + @Override + public int hashCode() { + return Arrays.hashCode(operations); } @Override @@ -62,7 +88,8 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest T[] readArray(Writeable.Reader reader, IntFunction arraySupplier) throws IOException { - int length = readArraySize(); - T[] values = arraySupplier.apply(length); + /** + * Reads an array from the stream using the specified {@link org.elasticsearch.common.io.stream.Writeable.Reader} to read array elements + * from the stream. This method can be seen as the reader version of {@link StreamOutput#writeArray(Writeable.Writer, Object[])}. It is + * assumed that the stream first contains a variable-length integer representing the size of the array, and then contains that many + * elements that can be read from the stream. + * + * @param reader the reader used to read individual elements + * @param arraySupplier a supplier used to construct a new array + * @param the type of the elements of the array + * @return an array read from the stream + * @throws IOException if an I/O exception occurs while reading the array + */ + public T[] readArray(final Writeable.Reader reader, final IntFunction arraySupplier) throws IOException { + final int length = readArraySize(); + final T[] values = arraySupplier.apply(length); for (int i = 0; i < length; i++) { values[i] = reader.read(this); } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index eb2bbdc3578..0242d71bbdf 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -58,6 +58,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.IntFunction; /** * A stream from another node to this node. Technically, it can also be streamed from a byte array but that is mostly for testing. @@ -706,6 +707,23 @@ public abstract class StreamOutput extends OutputStream { } } + /** + * Writes the specified array to the stream using the specified {@link Writer} for each element in the array. This method can be seen as + * writer version of {@link StreamInput#readArray(Writeable.Reader, IntFunction)}. The length of array encoded as a variable-length + * integer is first written to the stream, and then the elements of the array are written to the stream. + * + * @param writer the writer used to write individual elements + * @param array the array + * @param the type of the elements of the array + * @throws IOException if an I/O exception occurs while writing the array + */ + public void writeArray(final Writer writer, final T[] array) throws IOException { + writeVInt(array.length); + for (T value : array) { + writer.write(this, value); + } + } + public void writeArray(T[] array) throws IOException { writeVInt(array.length); for (T value: array) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 08d64cb82bc..b1bd1c5b313 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -218,6 +218,8 @@ public class PrimaryReplicaSyncer extends AbstractComponent { } } + private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0]; + @Override protected void doRun() throws Exception { long size = 0; @@ -247,7 +249,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { if (!operations.isEmpty()) { task.setPhase("sending_ops"); - ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations); + ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY)); logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(), new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get()); syncAction.sync(request, task, primaryAllocationId, primaryTerm, this); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 20c428960f7..4373c8d0539 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; @@ -847,7 +846,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * A generic interface representing an operation performed on the transaction log. * Each is associated with a type. */ - public interface Operation extends Writeable { + public interface Operation { enum Type { @Deprecated CREATE((byte) 1), @@ -876,7 +875,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC case 4: return NO_OP; default: - throw new IllegalArgumentException("No type mapped for [" + id + "]"); + throw new IllegalArgumentException("no type mapped for [" + id + "]"); } } } @@ -893,31 +892,44 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC /** * Reads the type and the operation from the given stream. The operation must be written with - * {@link Operation#writeType(Operation, StreamOutput)} + * {@link Operation#writeOperation(StreamOutput, Operation)} */ - static Operation readType(StreamInput input) throws IOException { - Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); + static Operation readOperation(final StreamInput input) throws IOException { + final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); switch (type) { case CREATE: - // the deserialization logic in Index was identical to that of Create when create was deprecated + // the de-serialization logic in Index was identical to that of Create when create was deprecated + case INDEX: return new Index(input); case DELETE: return new Delete(input); - case INDEX: - return new Index(input); case NO_OP: return new NoOp(input); default: - throw new IOException("No type for [" + type + "]"); + throw new AssertionError("no case for [" + type + "]"); } } /** * Writes the type and translog operation to the given stream */ - static void writeType(Translog.Operation operation, StreamOutput output) throws IOException { + static void writeOperation(final StreamOutput output, final Operation operation) throws IOException { output.writeByte(operation.opType().id()); - operation.writeTo(output); + switch(operation.opType()) { + case CREATE: + // the serialization logic in Index was identical to that of Create when create was deprecated + case INDEX: + ((Index) operation).write(output); + break; + case DELETE: + ((Delete) operation).write(output); + break; + case NO_OP: + ((NoOp) operation).write(output); + break; + default: + throw new AssertionError("no case for [" + operation.opType() + "]"); + } } } @@ -954,7 +966,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final String routing; private final String parent; - public Index(StreamInput in) throws IOException { + private Index(final StreamInput in) throws IOException { final int format = in.readVInt(); // SERIALIZATION_FORMAT assert format >= FORMAT_2_X : "format was: " + format; id = in.readString(); @@ -1067,8 +1079,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return new Source(source, routing, parent); } - @Override - public void writeTo(StreamOutput out) throws IOException { + private void write(final StreamOutput out) throws IOException { out.writeVInt(SERIALIZATION_FORMAT); out.writeString(id); out.writeString(type); @@ -1156,7 +1167,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final long version; private final VersionType versionType; - public Delete(StreamInput in) throws IOException { + private Delete(final StreamInput in) throws IOException { final int format = in.readVInt();// SERIALIZATION_FORMAT assert format >= FORMAT_5_0 : "format was: " + format; if (format >= FORMAT_SINGLE_TYPE) { @@ -1251,8 +1262,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC throw new IllegalStateException("trying to read doc source from delete operation"); } - @Override - public void writeTo(StreamOutput out) throws IOException { + private void write(final StreamOutput out) throws IOException { out.writeVInt(SERIALIZATION_FORMAT); out.writeString(type); out.writeString(id); @@ -1322,7 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return reason; } - NoOp(final StreamInput in) throws IOException { + private NoOp(final StreamInput in) throws IOException { seqNo = in.readLong(); primaryTerm = in.readLong(); reason = in.readString(); @@ -1337,8 +1347,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC this.reason = reason; } - @Override - public void writeTo(StreamOutput out) throws IOException { + private void write(final StreamOutput out) throws IOException { out.writeLong(seqNo); out.writeLong(primaryTerm); out.writeString(reason); @@ -1440,7 +1449,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC verifyChecksum(in); in.reset(); } - operation = Translog.Operation.readType(in); + operation = Translog.Operation.readOperation(in); verifyChecksum(in); } catch (TranslogCorruptedException e) { throw e; @@ -1483,7 +1492,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // because closing it closes the underlying stream, which we don't // want to do here. out.resetDigest(); - Translog.Operation.writeType(op, out); + Translog.Operation.writeOperation(out, op); long checksum = out.getChecksum(); out.writeInt((int) checksum); } diff --git a/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java new file mode 100644 index 00000000000..f1f9fec34de --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.resync; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.charset.Charset; + +import static org.hamcrest.Matchers.equalTo; + +public class ResyncReplicationRequestTests extends ESTestCase { + + public void testSerialization() throws IOException { + final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8")); + final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1); + final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); + final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index}); + + final BytesStreamOutput out = new BytesStreamOutput(); + before.writeTo(out); + + final StreamInput in = out.bytes().streamInput(); + final ResyncReplicationRequest after = new ResyncReplicationRequest(); + after.readFrom(in); + + assertThat(after, equalTo(before)); + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 725d39279d2..9c01cab57dd 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -48,7 +48,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { AtomicBoolean syncActionCalled = new AtomicBoolean(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { - logger.info("Sending off {} operations", request.getOperations().size()); + logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class)); listener.onResponse(new ResyncReplicationResponse()); @@ -98,7 +98,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { CountDownLatch syncCalledLatch = new CountDownLatch(1); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { - logger.info("Sending off {} operations", request.getOperations().size()); + logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); syncCalledLatch.countDown(); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 78ed6697b22..1a17e0dc6a0 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2343,9 +2343,9 @@ public class TranslogTests extends ESTestCase { Translog.Index index = new Translog.Index(eIndex, eIndexResult); BytesStreamOutput out = new BytesStreamOutput(); - index.writeTo(out); + Translog.Operation.writeOperation(out, index); StreamInput in = out.bytes().streamInput(); - Translog.Index serializedIndex = new Translog.Index(in); + Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in); assertEquals(index, serializedIndex); Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm, @@ -2354,13 +2354,14 @@ public class TranslogTests extends ESTestCase { Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult); out = new BytesStreamOutput(); - delete.writeTo(out); + Translog.Operation.writeOperation(out, delete); in = out.bytes().streamInput(); - Translog.Delete serializedDelete = new Translog.Delete(in); + Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in); assertEquals(delete, serializedDelete); // simulate legacy delete serialization out = new BytesStreamOutput(); + out.writeByte(Translog.Operation.Type.DELETE.id()); out.writeVInt(Translog.Delete.FORMAT_5_0); out.writeString(UidFieldMapper.NAME); out.writeString("my_type#my_id"); @@ -2369,7 +2370,7 @@ public class TranslogTests extends ESTestCase { out.writeLong(2); // seq no out.writeLong(0); // primary term in = out.bytes().streamInput(); - serializedDelete = new Translog.Delete(in); + serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in); assertEquals("my_type", serializedDelete.type()); assertEquals("my_id", serializedDelete.id()); }