Add Writeable.Reader support to TransportResponseHandler (#28010)

Allows TransportResponse objects not to implement Streamable anymore. As an example, I've adapted the response handler for ShardActiveResponse, allowing the fields in that class to become final.
This commit is contained in:
Yannick Welsch 2018-01-04 10:27:08 +01:00 committed by GitHub
parent d36ec18029
commit 7cdbae2da8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 50 additions and 31 deletions

View File

@ -238,8 +238,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
@Override
public ShardActiveResponse newInstance() {
return new ShardActiveResponse();
public ShardActiveResponse read(StreamInput in) throws IOException {
return new ShardActiveResponse(in);
}
@Override
@ -417,20 +417,15 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private static class ShardActiveResponse extends TransportResponse {
private boolean shardActive;
private DiscoveryNode node;
ShardActiveResponse() {
}
private final boolean shardActive;
private final DiscoveryNode node;
ShardActiveResponse(boolean shardActive, DiscoveryNode node) {
this.shardActive = shardActive;
this.node = node;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
ShardActiveResponse(StreamInput in) throws IOException {
shardActive = in.readBoolean();
node = new DiscoveryNode(in);
}

View File

@ -21,8 +21,10 @@ package org.elasticsearch.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -70,8 +72,8 @@ public class PlainTransportFuture<V extends TransportResponse> extends BaseFutur
}
@Override
public V newInstance() {
return handler.newInstance();
public V read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override

View File

@ -1432,13 +1432,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new TransportAddress(remoteAddress));
final TransportResponse response;
try {
response.readFrom(stream);
response = handler.read(stream);
response.remoteAddress(new TransportAddress(remoteAddress));
} catch (Exception e) {
handleException(handler, new TransportSerializationException(
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
"Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
return;
}
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {

View File

@ -19,15 +19,34 @@
package org.elasticsearch.transport;
public interface TransportResponseHandler<T extends TransportResponse> {
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
public interface TransportResponseHandler<T extends TransportResponse> extends Writeable.Reader<T> {
/**
* creates a new instance of the return type from the remote call.
* called by the infra before de-serializing the response.
*
* @return a new response copy.
* @deprecated Implement {@link #read(StreamInput)} instead.
*/
T newInstance();
@Deprecated
default T newInstance() {
throw new UnsupportedOperationException();
}
/**
* deserializes a new instance of the return type from the stream.
* called by the infra when de-serializing the response.
*
* @return the deserialized response.
*/
@SuppressWarnings("deprecation")
@Override
default T read(StreamInput in) throws IOException {
T instance = newInstance();
instance.readFrom(in);
return instance;
}
void handleResponse(T response);

View File

@ -1079,8 +1079,8 @@ public class TransportService extends AbstractLifecycleComponent {
}
@Override
public T newInstance() {
return delegate.newInstance();
public T read(StreamInput in) throws IOException {
return delegate.read(in);
}
@Override

View File

@ -31,7 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
@ -176,8 +176,8 @@ public class TransportClientNodesServiceTests extends ESTestCase {
ClusterName clusterName) {
return new TransportResponseHandler<T>() {
@Override
public T newInstance() {
return handler.newInstance();
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
@ -899,8 +900,8 @@ public class UnicastZenPingTests extends ESTestCase {
TransportResponseHandler<UnicastPingResponse> original = super.getPingResponseHandler(pingingRound, node);
return new TransportResponseHandler<UnicastPingResponse>() {
@Override
public UnicastPingResponse newInstance() {
return original.newInstance();
public UnicastPingResponse read(StreamInput in) throws IOException {
return original.read(in);
}
@Override

View File

@ -20,6 +20,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -30,6 +31,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@ -100,8 +102,8 @@ public final class AssertingTransportInterceptor implements TransportInterceptor
assertVersionSerializable(request);
sender.sendRequest(connection, action, request, options, new TransportResponseHandler<T>() {
@Override
public T newInstance() {
return handler.newInstance();
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override