Fix serialization bug for aggs (#54029)

I created this bug today in #53793. When a `DelayableWriteable` that
references an existing object serializes itself it wasn't taking the
version of the node on the other side of the wire into account. This
fixes that.
This commit is contained in:
Nik Everett 2020-03-23 19:00:47 -04:00 committed by GitHub
parent 19af869243
commit 4734c645f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 14 deletions

View File

@ -19,12 +19,12 @@
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import java.io.IOException; import java.io.IOException;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
/** /**
* A holder for {@link Writeable}s that can delays reading the underlying * A holder for {@link Writeable}s that can delays reading the underlying
* {@linkplain Writeable} when it is read from a remote node. * {@linkplain Writeable} when it is read from a remote node.
@ -60,6 +60,7 @@ public abstract class DelayableWriteable<T extends Writeable> implements Supplie
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
try (BytesStreamOutput buffer = new BytesStreamOutput()) { try (BytesStreamOutput buffer = new BytesStreamOutput()) {
buffer.setVersion(out.getVersion());
reference.writeTo(buffer); reference.writeTo(buffer);
out.writeBytesReference(buffer.bytes()); out.writeBytesReference(buffer.bytes());
} }

View File

@ -19,25 +19,25 @@
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io.stream;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
import java.io.IOException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
public class DelayableWriteableTests extends ESTestCase { public class DelayableWriteableTests extends ESTestCase {
// NOTE: we don't use AbstractWireSerializingTestCase because we don't implement equals and hashCode. // NOTE: we don't use AbstractWireSerializingTestCase because we don't implement equals and hashCode.
public static class Example implements NamedWriteable { private static class Example implements NamedWriteable {
private final String s; private final String s;
public Example(String s) { Example(String s) {
this.s = s; this.s = s;
} }
public Example(StreamInput in) throws IOException { Example(StreamInput in) throws IOException {
s = in.readString(); s = in.readString();
} }
@ -66,14 +66,14 @@ public class DelayableWriteableTests extends ESTestCase {
} }
} }
public static class NamedHolder implements Writeable { private static class NamedHolder implements Writeable {
private final Example e; private final Example e;
public NamedHolder(Example e) { NamedHolder(Example e) {
this.e = e; this.e = e;
} }
public NamedHolder(StreamInput in) throws IOException { NamedHolder(StreamInput in) throws IOException {
e = in.readNamedWriteable(Example.class); e = in.readNamedWriteable(Example.class);
} }
@ -97,6 +97,23 @@ public class DelayableWriteableTests extends ESTestCase {
} }
} }
private static class SneakOtherSideVersionOnWire implements Writeable {
private final Version version;
SneakOtherSideVersionOnWire() {
version = Version.CURRENT;
}
SneakOtherSideVersionOnWire(StreamInput in) throws IOException {
version = Version.readVersion(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
Version.writeVersion(out.getVersion(), out);
}
}
public void testRoundTripFromReferencing() throws IOException { public void testRoundTripFromReferencing() throws IOException {
Example e = new Example(randomAlphaOfLength(5)); Example e = new Example(randomAlphaOfLength(5));
DelayableWriteable<Example> original = DelayableWriteable.referencing(e); DelayableWriteable<Example> original = DelayableWriteable.referencing(e);
@ -139,6 +156,12 @@ public class DelayableWriteableTests extends ESTestCase {
roundTripTestCase(original, NamedHolder::new); roundTripTestCase(original, NamedHolder::new);
} }
public void testSerializesWithRemoteVersion() throws IOException {
Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
DelayableWriteable<SneakOtherSideVersionOnWire> original = DelayableWriteable.referencing(new SneakOtherSideVersionOnWire());
assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).get().version, equalTo(remoteVersion));
}
private <T extends Writeable> void roundTripTestCase(DelayableWriteable<T> original, Writeable.Reader<T> reader) throws IOException { private <T extends Writeable> void roundTripTestCase(DelayableWriteable<T> original, Writeable.Reader<T> reader) throws IOException {
DelayableWriteable<T> roundTripped = roundTrip(original, reader, Version.CURRENT); DelayableWriteable<T> roundTripped = roundTrip(original, reader, Version.CURRENT);
assertTrue(roundTripped.isDelayed()); assertTrue(roundTripped.isDelayed());