From 4734c645f1af9d0c3f7e9bfbe98894f8d75bb121 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 23 Mar 2020 19:00:47 -0400 Subject: [PATCH] Fix serialization bug for aggs (#54029) I created this bug today in #53793. When a `DelayableWriteable` that references an existing object serializes itself it wasn't taking the version of the node on the other side of the wire into account. This fixes that. --- .../common/io/stream/DelayableWriteable.java | 7 +-- .../io/stream/DelayableWriteableTests.java | 45 ++++++++++++++----- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java index e75b81fff4e..b27010017e5 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -19,12 +19,12 @@ package org.elasticsearch.common.io.stream; -import org.elasticsearch.Version; -import org.elasticsearch.common.bytes.BytesReference; - import java.io.IOException; import java.util.function.Supplier; +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; + /** * A holder for {@link Writeable}s that can delays reading the underlying * {@linkplain Writeable} when it is read from a remote node. @@ -60,6 +60,7 @@ public abstract class DelayableWriteable implements Supplie @Override public void writeTo(StreamOutput out) throws IOException { try (BytesStreamOutput buffer = new BytesStreamOutput()) { + buffer.setVersion(out.getVersion()); reference.writeTo(buffer); out.writeBytesReference(buffer.bytes()); } diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java index fbd8702608e..d12a15c1a72 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java @@ -19,25 +19,25 @@ package org.elasticsearch.common.io.stream; +import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.equalTo; + +import java.io.IOException; + import org.elasticsearch.Version; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; -import java.io.IOException; - -import static java.util.Collections.singletonList; -import static org.hamcrest.Matchers.equalTo; - public class DelayableWriteableTests extends ESTestCase { // NOTE: we don't use AbstractWireSerializingTestCase because we don't implement equals and hashCode. - public static class Example implements NamedWriteable { + private static class Example implements NamedWriteable { private final String s; - public Example(String s) { + Example(String s) { this.s = s; } - public Example(StreamInput in) throws IOException { + Example(StreamInput in) throws IOException { s = in.readString(); } @@ -66,14 +66,14 @@ public class DelayableWriteableTests extends ESTestCase { } } - public static class NamedHolder implements Writeable { + private static class NamedHolder implements Writeable { private final Example e; - public NamedHolder(Example e) { + NamedHolder(Example e) { this.e = e; } - public NamedHolder(StreamInput in) throws IOException { + NamedHolder(StreamInput in) throws IOException { e = in.readNamedWriteable(Example.class); } @@ -97,6 +97,23 @@ public class DelayableWriteableTests extends ESTestCase { } } + private static class SneakOtherSideVersionOnWire implements Writeable { + private final Version version; + + SneakOtherSideVersionOnWire() { + version = Version.CURRENT; + } + + SneakOtherSideVersionOnWire(StreamInput in) throws IOException { + version = Version.readVersion(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + Version.writeVersion(out.getVersion(), out); + } + } + public void testRoundTripFromReferencing() throws IOException { Example e = new Example(randomAlphaOfLength(5)); DelayableWriteable original = DelayableWriteable.referencing(e); @@ -139,6 +156,12 @@ public class DelayableWriteableTests extends ESTestCase { roundTripTestCase(original, NamedHolder::new); } + public void testSerializesWithRemoteVersion() throws IOException { + Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); + DelayableWriteable original = DelayableWriteable.referencing(new SneakOtherSideVersionOnWire()); + assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).get().version, equalTo(remoteVersion)); + } + private void roundTripTestCase(DelayableWriteable original, Writeable.Reader reader) throws IOException { DelayableWriteable roundTripped = roundTrip(original, reader, Version.CURRENT); assertTrue(roundTripped.isDelayed());