Fix resync request serialization

This commit addresses a subtle bug in the serialization routine for
resync requests. The problem here is that Translog.Operation#readType is
not compatible with the implementations of
Translog.Operation#writeTo. Unfortunately, this issue prevents
primary-replica from succeeding, issues which we will address in
follow-ups.

Relates #27418
This commit is contained in:
Jason Tedor 2017-11-20 20:56:48 -05:00 committed by GitHub
parent 4e04f95ab4
commit 28660be40a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 166 additions and 43 deletions

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.resync;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -25,35 +26,60 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.List;
import java.util.Arrays;
/**
* Represents a batch of operations sent from the primary to its replicas during the primary-replica resync.
*/
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
private List<Translog.Operation> operations;
private Translog.Operation[] operations;
ResyncReplicationRequest() {
super();
}
public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) {
super(shardId);
this.operations = operations;
}
public List<Translog.Operation> getOperations() {
public Translog.Operation[] getOperations() {
return operations;
}
@Override
public void readFrom(StreamInput in) throws IOException {
public void readFrom(final StreamInput in) throws IOException {
if (in.getVersion().equals(Version.V_6_0_0)) {
/*
* Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a
* byte indicating the type of the operation.
*/
// TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x.
assert Version.CURRENT.major <= 7;
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
super.readFrom(in);
operations = in.readList(Translog.Operation::readType);
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(operations);
out.writeArray(Translog.Operation::writeOperation, operations);
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return Arrays.equals(operations, that.operations);
}
@Override
public int hashCode() {
return Arrays.hashCode(operations);
}
@Override
@ -62,7 +88,8 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", ops=" + operations.size() +
", ops=" + operations.length +
"}";
}
}

View File

@ -688,9 +688,21 @@ public abstract class StreamInput extends InputStream {
return bytes;
}
public <T> T[] readArray(Writeable.Reader<T> reader, IntFunction<T[]> arraySupplier) throws IOException {
int length = readArraySize();
T[] values = arraySupplier.apply(length);
/**
* Reads an array from the stream using the specified {@link org.elasticsearch.common.io.stream.Writeable.Reader} to read array elements
* from the stream. This method can be seen as the reader version of {@link StreamOutput#writeArray(Writeable.Writer, Object[])}. It is
* assumed that the stream first contains a variable-length integer representing the size of the array, and then contains that many
* elements that can be read from the stream.
*
* @param reader the reader used to read individual elements
* @param arraySupplier a supplier used to construct a new array
* @param <T> the type of the elements of the array
* @return an array read from the stream
* @throws IOException if an I/O exception occurs while reading the array
*/
public <T> T[] readArray(final Writeable.Reader<T> reader, final IntFunction<T[]> arraySupplier) throws IOException {
final int length = readArraySize();
final T[] values = arraySupplier.apply(length);
for (int i = 0; i < length; i++) {
values[i] = reader.read(this);
}

View File

@ -58,6 +58,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.IntFunction;
/**
* A stream from another node to this node. Technically, it can also be streamed from a byte array but that is mostly for testing.
@ -706,6 +707,23 @@ public abstract class StreamOutput extends OutputStream {
}
}
/**
* Writes the specified array to the stream using the specified {@link Writer} for each element in the array. This method can be seen as
* writer version of {@link StreamInput#readArray(Writeable.Reader, IntFunction)}. The length of array encoded as a variable-length
* integer is first written to the stream, and then the elements of the array are written to the stream.
*
* @param writer the writer used to write individual elements
* @param array the array
* @param <T> the type of the elements of the array
* @throws IOException if an I/O exception occurs while writing the array
*/
public <T> void writeArray(final Writer<T> writer, final T[] array) throws IOException {
writeVInt(array.length);
for (T value : array) {
writer.write(this, value);
}
}
public <T extends Writeable> void writeArray(T[] array) throws IOException {
writeVInt(array.length);
for (T value: array) {

View File

@ -218,6 +218,8 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
}
}
private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];
@Override
protected void doRun() throws Exception {
long size = 0;
@ -247,7 +249,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
if (!operations.isEmpty()) {
task.setPhase("sending_ops");
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);

View File

@ -33,7 +33,6 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
@ -847,7 +846,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* A generic interface representing an operation performed on the transaction log.
* Each is associated with a type.
*/
public interface Operation extends Writeable {
public interface Operation {
enum Type {
@Deprecated
CREATE((byte) 1),
@ -876,7 +875,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
case 4:
return NO_OP;
default:
throw new IllegalArgumentException("No type mapped for [" + id + "]");
throw new IllegalArgumentException("no type mapped for [" + id + "]");
}
}
}
@ -893,31 +892,44 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
* {@link Operation#writeOperation(StreamOutput, Operation)}
*/
static Operation readType(StreamInput input) throws IOException {
Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
static Operation readOperation(final StreamInput input) throws IOException {
final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
switch (type) {
case CREATE:
// the deserialization logic in Index was identical to that of Create when create was deprecated
// the de-serialization logic in Index was identical to that of Create when create was deprecated
case INDEX:
return new Index(input);
case DELETE:
return new Delete(input);
case INDEX:
return new Index(input);
case NO_OP:
return new NoOp(input);
default:
throw new IOException("No type for [" + type + "]");
throw new AssertionError("no case for [" + type + "]");
}
}
/**
* Writes the type and translog operation to the given stream
*/
static void writeType(Translog.Operation operation, StreamOutput output) throws IOException {
static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
output.writeByte(operation.opType().id());
operation.writeTo(output);
switch(operation.opType()) {
case CREATE:
// the serialization logic in Index was identical to that of Create when create was deprecated
case INDEX:
((Index) operation).write(output);
break;
case DELETE:
((Delete) operation).write(output);
break;
case NO_OP:
((NoOp) operation).write(output);
break;
default:
throw new AssertionError("no case for [" + operation.opType() + "]");
}
}
}
@ -954,7 +966,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final String routing;
private final String parent;
public Index(StreamInput in) throws IOException {
private Index(final StreamInput in) throws IOException {
final int format = in.readVInt(); // SERIALIZATION_FORMAT
assert format >= FORMAT_2_X : "format was: " + format;
id = in.readString();
@ -1067,8 +1079,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return new Source(source, routing, parent);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(id);
out.writeString(type);
@ -1156,7 +1167,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final long version;
private final VersionType versionType;
public Delete(StreamInput in) throws IOException {
private Delete(final StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
assert format >= FORMAT_5_0 : "format was: " + format;
if (format >= FORMAT_SINGLE_TYPE) {
@ -1251,8 +1262,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
throw new IllegalStateException("trying to read doc source from delete operation");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(type);
out.writeString(id);
@ -1322,7 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return reason;
}
NoOp(final StreamInput in) throws IOException {
private NoOp(final StreamInput in) throws IOException {
seqNo = in.readLong();
primaryTerm = in.readLong();
reason = in.readString();
@ -1337,8 +1347,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
this.reason = reason;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeLong(seqNo);
out.writeLong(primaryTerm);
out.writeString(reason);
@ -1440,7 +1449,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
verifyChecksum(in);
in.reset();
}
operation = Translog.Operation.readType(in);
operation = Translog.Operation.readOperation(in);
verifyChecksum(in);
} catch (TranslogCorruptedException e) {
throw e;
@ -1483,7 +1492,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// because closing it closes the underlying stream, which we don't
// want to do here.
out.resetDigest();
Translog.Operation.writeType(op, out);
Translog.Operation.writeOperation(out, op);
long checksum = out.getChecksum();
out.writeInt((int) checksum);
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.resync;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.nio.charset.Charset;
import static org.hamcrest.Matchers.equalTo;
public class ResyncReplicationRequestTests extends ESTestCase {
public void testSerialization() throws IOException {
final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8"));
final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index});
final BytesStreamOutput out = new BytesStreamOutput();
before.writeTo(out);
final StreamInput in = out.bytes().streamInput();
final ResyncReplicationRequest after = new ResyncReplicationRequest();
after.readFrom(in);
assertThat(after, equalTo(before));
}
}

View File

@ -48,7 +48,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
AtomicBoolean syncActionCalled = new AtomicBoolean();
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
listener.onResponse(new ResyncReplicationResponse());
@ -98,7 +98,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
CountDownLatch syncCalledLatch = new CountDownLatch(1);
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
syncCalledLatch.countDown();
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));

View File

@ -2343,9 +2343,9 @@ public class TranslogTests extends ESTestCase {
Translog.Index index = new Translog.Index(eIndex, eIndexResult);
BytesStreamOutput out = new BytesStreamOutput();
index.writeTo(out);
Translog.Operation.writeOperation(out, index);
StreamInput in = out.bytes().streamInput();
Translog.Index serializedIndex = new Translog.Index(in);
Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in);
assertEquals(index, serializedIndex);
Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm,
@ -2354,13 +2354,14 @@ public class TranslogTests extends ESTestCase {
Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);
out = new BytesStreamOutput();
delete.writeTo(out);
Translog.Operation.writeOperation(out, delete);
in = out.bytes().streamInput();
Translog.Delete serializedDelete = new Translog.Delete(in);
Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in);
assertEquals(delete, serializedDelete);
// simulate legacy delete serialization
out = new BytesStreamOutput();
out.writeByte(Translog.Operation.Type.DELETE.id());
out.writeVInt(Translog.Delete.FORMAT_5_0);
out.writeString(UidFieldMapper.NAME);
out.writeString("my_type#my_id");
@ -2369,7 +2370,7 @@ public class TranslogTests extends ESTestCase {
out.writeLong(2); // seq no
out.writeLong(0); // primary term
in = out.bytes().streamInput();
serializedDelete = new Translog.Delete(in);
serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in);
assertEquals("my_type", serializedDelete.type());
assertEquals("my_id", serializedDelete.id());
}