From 7cdbae2da8311d2d2eac1eba84a3964f434b8714 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Jan 2018 10:27:08 +0100 Subject: [PATCH] Add Writeable.Reader support to TransportResponseHandler (#28010) Allows TransportResponse objects not to implement Streamable anymore. As an example, I've adapted the response handler for ShardActiveResponse, allowing the fields in that class to become final. --- .../indices/store/IndicesStore.java | 15 +++------ .../transport/PlainTransportFuture.java | 6 ++-- .../elasticsearch/transport/TcpTransport.java | 8 ++--- .../transport/TransportResponseHandler.java | 31 +++++++++++++++---- .../transport/TransportService.java | 4 +-- .../TransportClientNodesServiceTests.java | 6 ++-- .../discovery/zen/UnicastZenPingTests.java | 5 +-- .../AssertingTransportInterceptor.java | 6 ++-- 8 files changed, 50 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 2ae8d12a9fe..294484c6598 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -238,8 +238,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } @Override - public ShardActiveResponse newInstance() { - return new ShardActiveResponse(); + public ShardActiveResponse read(StreamInput in) throws IOException { + return new ShardActiveResponse(in); } @Override @@ -417,20 +417,15 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private static class ShardActiveResponse extends TransportResponse { - private boolean shardActive; - private DiscoveryNode node; - - ShardActiveResponse() { - } + private final boolean shardActive; + private final DiscoveryNode node; ShardActiveResponse(boolean shardActive, DiscoveryNode node) { this.shardActive = shardActive; this.node = node; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + ShardActiveResponse(StreamInput in) throws IOException { shardActive = in.readBoolean(); node = new DiscoveryNode(in); } diff --git a/core/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java b/core/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java index fe01a1fdbcc..4dc530bd40c 100644 --- a/core/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java +++ b/core/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java @@ -21,8 +21,10 @@ package org.elasticsearch.transport; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.concurrent.BaseFuture; +import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -70,8 +72,8 @@ public class PlainTransportFuture extends BaseFutur } @Override - public V newInstance() { - return handler.newInstance(); + public V read(StreamInput in) throws IOException { + return handler.read(in); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 5fbc45a804c..dd2346443a6 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1432,13 +1432,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) { - final TransportResponse response = handler.newInstance(); - response.remoteAddress(new TransportAddress(remoteAddress)); + final TransportResponse response; try { - response.readFrom(stream); + response = handler.read(stream); + response.remoteAddress(new TransportAddress(remoteAddress)); } catch (Exception e) { handleException(handler, new TransportSerializationException( - "Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); + "Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e)); return; } threadPool.executor(handler.executor()).execute(new AbstractRunnable() { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/core/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index 9d2c1801240..447bbd92dd2 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -19,15 +19,34 @@ package org.elasticsearch.transport; -public interface TransportResponseHandler { +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +public interface TransportResponseHandler extends Writeable.Reader { /** - * creates a new instance of the return type from the remote call. - * called by the infra before de-serializing the response. - * - * @return a new response copy. + * @deprecated Implement {@link #read(StreamInput)} instead. */ - T newInstance(); + @Deprecated + default T newInstance() { + throw new UnsupportedOperationException(); + } + + /** + * deserializes a new instance of the return type from the stream. + * called by the infra when de-serializing the response. + * + * @return the deserialized response. + */ + @SuppressWarnings("deprecation") + @Override + default T read(StreamInput in) throws IOException { + T instance = newInstance(); + instance.readFrom(in); + return instance; + } void handleResponse(T response); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 2f87deb3bd7..a59ffcaa872 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1079,8 +1079,8 @@ public class TransportService extends AbstractLifecycleComponent { } @Override - public T newInstance() { - return delegate.newInstance(); + public T read(StreamInput in) throws IOException { + return delegate.read(in); } @Override diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index b120c7a3e7d..ad894906cfb 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -31,7 +31,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -176,8 +176,8 @@ public class TransportClientNodesServiceTests extends ESTestCase { ClusterName clusterName) { return new TransportResponseHandler() { @Override - public T newInstance() { - return handler.newInstance(); + public T read(StreamInput in) throws IOException { + return handler.read(in); } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index d40d558d20b..e0593a694d0 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -899,8 +900,8 @@ public class UnicastZenPingTests extends ESTestCase { TransportResponseHandler original = super.getPingResponseHandler(pingingRound, node); return new TransportResponseHandler() { @Override - public UnicastPingResponse newInstance() { - return original.newInstance(); + public UnicastPingResponse read(StreamInput in) throws IOException { + return original.read(in); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java index cbe2006bf74..bbb6c956736 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -30,6 +31,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Random; @@ -100,8 +102,8 @@ public final class AssertingTransportInterceptor implements TransportInterceptor assertVersionSerializable(request); sender.sendRequest(connection, action, request, options, new TransportResponseHandler() { @Override - public T newInstance() { - return handler.newInstance(); + public T read(StreamInput in) throws IOException { + return handler.read(in); } @Override