From 613c70c28915601bb0387e4cc0217eef036b1ed2 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 27 Sep 2012 18:05:16 +0200 Subject: [PATCH] introduce TransportResponse a class that needs to be used when sending a response over the transport layer, with an option to have headers --- .../elasticsearch/action/ActionResponse.java | 18 +++- .../cluster/health/ClusterHealthResponse.java | 21 ++-- .../node/shutdown/NodesShutdownResponse.java | 5 +- .../TransportNodesShutdownAction.java | 18 ++-- .../reroute/ClusterRerouteResponse.java | 4 +- .../ClusterUpdateSettingsResponse.java | 5 +- .../cluster/state/ClusterStateResponse.java | 5 +- .../indices/alias/IndicesAliasesResponse.java | 7 +- .../indices/analyze/AnalyzeResponse.java | 19 ++-- .../indices/close/CloseIndexResponse.java | 7 +- .../indices/create/CreateIndexResponse.java | 7 +- .../indices/delete/DeleteIndexResponse.java | 7 +- .../exists/indices/IndicesExistsResponse.java | 5 +- .../exists/types/TypesExistsResponse.java | 5 +- .../mapping/delete/DeleteMappingResponse.java | 7 +- .../mapping/put/PutMappingResponse.java | 7 +- .../admin/indices/open/OpenIndexResponse.java | 7 +- .../settings/UpdateSettingsResponse.java | 7 +- .../delete/DeleteIndexTemplateResponse.java | 7 +- .../put/PutIndexTemplateResponse.java | 7 +- .../warmer/delete/DeleteWarmerResponse.java | 5 +- .../indices/warmer/put/PutWarmerResponse.java | 5 +- .../action/bulk/BulkResponse.java | 7 +- .../action/bulk/BulkShardResponse.java | 5 +- .../action/delete/DeleteResponse.java | 22 ++-- .../delete/index/IndexDeleteResponse.java | 14 +-- .../delete/index/ShardDeleteResponse.java | 8 +- .../deletebyquery/DeleteByQueryResponse.java | 4 +- .../IndexDeleteByQueryResponse.java | 13 +-- .../ShardDeleteByQueryResponse.java | 7 +- .../action/explain/ExplainResponse.java | 4 +- .../elasticsearch/action/get/GetResponse.java | 5 +- .../action/get/MultiGetResponse.java | 27 ++--- .../action/get/MultiGetShardResponse.java | 4 +- .../action/index/IndexResponse.java | 34 +++--- .../action/percolate/PercolateResponse.java | 8 +- .../action/search/MultiSearchResponse.java | 8 +- .../action/search/SearchResponse.java | 15 +-- .../broadcast/BroadcastOperationResponse.java | 9 +- .../BroadcastShardOperationResponse.java | 11 +- .../support/nodes/NodeOperationResponse.java | 8 +- .../support/nodes/NodesOperationResponse.java | 6 +- ...nsportShardReplicationOperationAction.java | 7 +- .../action/update/UpdateResponse.java | 35 +++--- .../action/index/MappingUpdatedAction.java | 4 +- .../index/NodeAliasesUpdatedAction.java | 5 +- .../action/index/NodeIndexCreatedAction.java | 5 +- .../action/index/NodeIndexDeletedAction.java | 5 +- .../index/NodeMappingCreatedAction.java | 5 +- .../index/NodeMappingRefreshAction.java | 5 +- .../action/shard/ShardStateAction.java | 12 +-- .../common/io/stream/LongStreamable.java | 55 ---------- .../common/io/stream/StringStreamable.java | 55 ---------- .../common/io/stream/VoidStreamable.java | 38 ------- .../discovery/zen/ZenDiscovery.java | 5 +- .../zen/fd/MasterFaultDetection.java | 5 +- .../discovery/zen/fd/NodesFaultDetection.java | 5 +- .../zen/membership/MembershipAction.java | 20 ++-- .../zen/ping/multicast/MulticastZenPing.java | 6 +- .../zen/ping/unicast/UnicastZenPing.java | 5 +- .../publish/PublishClusterStateAction.java | 9 +- .../meta/LocalAllocateDangledIndices.java | 5 +- .../indices/recovery/RecoveryResponse.java | 6 +- .../indices/recovery/RecoverySource.java | 14 +-- .../indices/recovery/RecoveryTarget.java | 13 ++- .../jmx/action/GetJmxServiceUrlAction.java | 43 ++++++-- .../PublishRiverClusterStateAction.java | 5 +- .../action/SearchServiceTransportAction.java | 5 +- .../search/dfs/DfsSearchResult.java | 10 +- .../search/fetch/FetchSearchResult.java | 9 +- .../search/fetch/QueryFetchSearchResult.java | 7 +- .../fetch/ScrollQueryFetchSearchResult.java | 7 +- .../search/query/QuerySearchResult.java | 12 +-- .../search/query/ScrollQuerySearchResult.java | 7 +- .../BaseTransportResponseHandler.java | 6 +- ...ava => EmptyTransportResponseHandler.java} | 13 ++- .../FutureTransportResponseHandler.java | 5 +- .../transport/PlainTransportFuture.java | 3 +- .../transport/TransportChannel.java | 8 +- .../transport/TransportResponse.java | 99 +++++++++++++++++ .../transport/TransportResponseHandler.java | 4 +- .../transport/TransportService.java | 19 ++-- .../transport/local/LocalTransport.java | 8 +- .../local/LocalTransportChannel.java | 14 +-- .../netty/MessageChannelHandler.java | 21 ++-- .../netty/NettyTransportChannel.java | 16 ++- .../{netty => }/BenchmarkMessageRequest.java | 2 +- ...age.java => BenchmarkMessageResponse.java} | 24 ++++- .../BenchmarkNettyLargeMessages.java | 19 ++-- .../transport/TransportBenchmark.java | 23 ++-- .../AbstractSimpleTransportTests.java | 100 +++++++++--------- 91 files changed, 577 insertions(+), 640 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/LongStreamable.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/StringStreamable.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/VoidStreamable.java rename src/main/java/org/elasticsearch/transport/{VoidTransportResponseHandler.java => EmptyTransportResponseHandler.java} (71%) create mode 100644 src/main/java/org/elasticsearch/transport/TransportResponse.java rename src/test/java/org/elasticsearch/benchmark/transport/{netty => }/BenchmarkMessageRequest.java (96%) rename src/test/java/org/elasticsearch/benchmark/transport/{BenchmarkMessage.java => BenchmarkMessageResponse.java} (73%) diff --git a/src/main/java/org/elasticsearch/action/ActionResponse.java b/src/main/java/org/elasticsearch/action/ActionResponse.java index 88cf95d66b2..4991355a75c 100644 --- a/src/main/java/org/elasticsearch/action/ActionResponse.java +++ b/src/main/java/org/elasticsearch/action/ActionResponse.java @@ -19,10 +19,24 @@ package org.elasticsearch.action; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; /** * */ -public interface ActionResponse extends Streamable { +public abstract class ActionResponse extends TransportResponse { + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java index 42426179795..a64031c70da 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -36,30 +36,19 @@ import static org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth.r /** * */ -public class ClusterHealthResponse implements ActionResponse, Iterable { +public class ClusterHealthResponse extends ActionResponse implements Iterable { private String clusterName; - int numberOfNodes = 0; - int numberOfDataNodes = 0; - int activeShards = 0; - int relocatingShards = 0; - int activePrimaryShards = 0; - int initializingShards = 0; - int unassignedShards = 0; - boolean timedOut = false; - ClusterHealthStatus status = ClusterHealthStatus.RED; - private List validationFailures; - Map indices = Maps.newHashMap(); ClusterHealthResponse() { @@ -201,7 +190,8 @@ public class ClusterHealthResponse implements ActionResponse, Iterable { private final Node node; - private final ClusterName clusterName; - private final boolean disabled; - private final TimeValue delay; @Inject @@ -128,9 +124,9 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc latch.countDown(); } else { logger.trace("[cluster_shutdown]: sending shutdown request to [{}]", node); - transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override - public void handleResponse(VoidStreamable response) { + public void handleResponse(TransportResponse.Empty response) { logger.trace("[cluster_shutdown]: received shutdown response from [{}]", node); latch.countDown(); } @@ -152,9 +148,9 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc // now, kill the master logger.trace("[cluster_shutdown]: shutting down the master [{}]", state.nodes().masterNode()); - transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override - public void handleResponse(VoidStreamable response) { + public void handleResponse(TransportResponse.Empty response) { logger.trace("[cluster_shutdown]: received shutdown response from master"); } @@ -196,9 +192,9 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc } logger.trace("[partial_cluster_shutdown]: sending shutdown request to [{}]", node); - transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override - public void handleResponse(VoidStreamable response) { + public void handleResponse(TransportResponse.Empty response) { logger.trace("[partial_cluster_shutdown]: received shutdown response from [{}]", node); latch.countDown(); } @@ -288,7 +284,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc }); t.start(); - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java index 5119c61ba8c..77b253bcfb6 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java @@ -28,7 +28,7 @@ import java.io.IOException; /** */ -public class ClusterRerouteResponse implements ActionResponse { +public class ClusterRerouteResponse extends ActionResponse { private ClusterState state; @@ -50,11 +50,13 @@ public class ClusterRerouteResponse implements ActionResponse { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); state = ClusterState.Builder.readFrom(in, null); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); ClusterState.Builder.writeTo(state, out); } } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java index 709693a9249..8eb475a0b77 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java @@ -22,23 +22,24 @@ package org.elasticsearch.action.admin.cluster.settings; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import java.io.IOException; /** * A response for a cluster update settings action. */ -public class ClusterUpdateSettingsResponse implements ActionResponse, Streamable { +public class ClusterUpdateSettingsResponse extends ActionResponse { ClusterUpdateSettingsResponse() { } @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); } } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java index 5591786a0d6..cbec21f2e83 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java @@ -30,10 +30,9 @@ import java.io.IOException; /** * */ -public class ClusterStateResponse implements ActionResponse { +public class ClusterStateResponse extends ActionResponse { private ClusterName clusterName; - private ClusterState clusterState; public ClusterStateResponse() { @@ -62,12 +61,14 @@ public class ClusterStateResponse implements ActionResponse { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); clusterName = ClusterName.readClusterName(in); clusterState = ClusterState.Builder.readFrom(in, null); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); clusterName.writeTo(out); ClusterState.Builder.writeTo(clusterState, out); } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java index f564386dd30..6af5108e339 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java @@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.alias; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import java.io.IOException; /** * A response for a add/remove alias action. - * - * */ -public class IndicesAliasesResponse implements ActionResponse, Streamable { +public class IndicesAliasesResponse extends ActionResponse { private boolean acknowledged; @@ -54,11 +51,13 @@ public class IndicesAliasesResponse implements ActionResponse, Streamable { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); acknowledged = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeBoolean(acknowledged); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeResponse.java index aa2ed9c4648..6185cd4684b 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeResponse.java @@ -34,7 +34,7 @@ import java.util.List; /** * */ -public class AnalyzeResponse implements ActionResponse, Iterable, ToXContent { +public class AnalyzeResponse extends ActionResponse implements Iterable, ToXContent { public static class AnalyzeToken implements Streamable { private String term; @@ -102,27 +102,20 @@ public class AnalyzeResponse implements ActionResponse, Iterable(size); for (int i = 0; i < size; i++) { @@ -195,6 +189,7 @@ public class AnalyzeResponse implements ActionResponse, Iterable { +public class BulkResponse extends ActionResponse implements Iterable { private BulkItemResponse[] responses; - private long tookInMillis; BulkResponse() { @@ -117,6 +114,7 @@ public class BulkResponse implements ActionResponse, Iterable @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); responses = new BulkItemResponse[in.readVInt()]; for (int i = 0; i < responses.length; i++) { responses[i] = BulkItemResponse.readBulkItem(in); @@ -126,6 +124,7 @@ public class BulkResponse implements ActionResponse, Iterable @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeVInt(responses.length); for (BulkItemResponse response : responses) { response.writeTo(out); diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java b/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java index f3d5d665d21..8c5a8642f0a 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java @@ -29,10 +29,9 @@ import java.io.IOException; /** * */ -public class BulkShardResponse implements ActionResponse { +public class BulkShardResponse extends ActionResponse { private ShardId shardId; - private BulkItemResponse[] responses; BulkShardResponse() { @@ -53,6 +52,7 @@ public class BulkShardResponse implements ActionResponse { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); shardId = ShardId.readShardId(in); responses = new BulkItemResponse[in.readVInt()]; for (int i = 0; i < responses.length; i++) { @@ -62,6 +62,7 @@ public class BulkShardResponse implements ActionResponse { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); shardId.writeTo(out); out.writeVInt(responses.length); for (BulkItemResponse response : responses) { diff --git a/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java b/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java index cef5fa62306..ac1d720832d 100644 --- a/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java +++ b/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java @@ -22,27 +22,21 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import java.io.IOException; /** * The response of the delete action. * - * * @see org.elasticsearch.action.delete.DeleteRequest * @see org.elasticsearch.client.Client#delete(DeleteRequest) */ -public class DeleteResponse implements ActionResponse, Streamable { +public class DeleteResponse extends ActionResponse { private String index; - private String id; - private String type; - private long version; - private boolean notFound; public DeleteResponse() { @@ -129,18 +123,20 @@ public class DeleteResponse implements ActionResponse, Streamable { @Override public void readFrom(StreamInput in) throws IOException { - index = in.readUTF(); - id = in.readUTF(); - type = in.readUTF(); + super.readFrom(in); + index = in.readString(); + id = in.readString(); + type = in.readString(); version = in.readLong(); notFound = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(index); - out.writeUTF(id); - out.writeUTF(type); + super.writeTo(out); + out.writeString(index); + out.writeString(id); + out.writeString(type); out.writeLong(version); out.writeBoolean(notFound); } diff --git a/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteResponse.java b/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteResponse.java index 264e49957c7..e772ba6bd07 100644 --- a/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteResponse.java +++ b/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteResponse.java @@ -22,23 +22,17 @@ package org.elasticsearch.action.delete.index; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import java.io.IOException; /** * Delete by query response executed on a specific index. - * - * */ -public class IndexDeleteResponse implements ActionResponse, Streamable { +public class IndexDeleteResponse extends ActionResponse { private String index; - private int successfulShards; - private int failedShards; - private ShardDeleteResponse[] deleteResponses; IndexDeleteResponse(String index, int successfulShards, int failedShards, ShardDeleteResponse[] deleteResponses) { @@ -114,7 +108,8 @@ public class IndexDeleteResponse implements ActionResponse, Streamable { @Override public void readFrom(StreamInput in) throws IOException { - index = in.readUTF(); + super.readFrom(in); + index = in.readString(); successfulShards = in.readVInt(); failedShards = in.readVInt(); deleteResponses = new ShardDeleteResponse[in.readVInt()]; @@ -126,7 +121,8 @@ public class IndexDeleteResponse implements ActionResponse, Streamable { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(index); + super.writeTo(out); + out.writeString(index); out.writeVInt(successfulShards); out.writeVInt(failedShards); out.writeVInt(deleteResponses.length); diff --git a/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteResponse.java b/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteResponse.java index 8536e2caba4..fa33fa90b86 100644 --- a/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteResponse.java +++ b/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteResponse.java @@ -22,19 +22,15 @@ package org.elasticsearch.action.delete.index; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import java.io.IOException; /** * Delete response executed on a specific shard. - * - * */ -public class ShardDeleteResponse implements ActionResponse, Streamable { +public class ShardDeleteResponse extends ActionResponse { private long version; - private boolean notFound; public ShardDeleteResponse() { @@ -55,12 +51,14 @@ public class ShardDeleteResponse implements ActionResponse, Streamable { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); version = in.readLong(); notFound = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeLong(version); out.writeBoolean(notFound); } diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryResponse.java b/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryResponse.java index dca42990289..fc2f50d2ba6 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryResponse.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryResponse.java @@ -33,7 +33,7 @@ import static com.google.common.collect.Maps.newHashMap; * The response of delete by query action. Holds the {@link IndexDeleteByQueryResponse}s from all the * different indices. */ -public class DeleteByQueryResponse implements ActionResponse, Iterable { +public class DeleteByQueryResponse extends ActionResponse implements Iterable { private Map indices = newHashMap(); @@ -69,6 +69,7 @@ public class DeleteByQueryResponse implements ActionResponse, Iterable, ToXContent { +public class GetResponse extends ActionResponse implements Iterable, ToXContent { private GetResult getResult; @@ -203,11 +202,13 @@ public class GetResponse implements ActionResponse, Streamable, Iterable, ToXContent { +public class MultiGetResponse extends ActionResponse implements Iterable, ToXContent { /** * Represents a failure. @@ -117,25 +117,18 @@ public class MultiGetResponse implements ActionResponse, Iterable responses; @@ -54,6 +54,7 @@ public class MultiGetShardResponse implements ActionResponse { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); int size = in.readVInt(); locations = new TIntArrayList(size); responses = new ArrayList(size); @@ -77,6 +78,7 @@ public class MultiGetShardResponse implements ActionResponse { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeVInt(locations.size()); for (int i = 0; i < locations.size(); i++) { out.writeVInt(locations.get(i)); diff --git a/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 5410d02bbc0..e7952117782 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -34,16 +34,12 @@ import java.util.List; * @see org.elasticsearch.action.index.IndexRequest * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexResponse implements ActionResponse { +public class IndexResponse extends ActionResponse { private String index; - private String id; - private String type; - private long version; - private List matches; public IndexResponse() { @@ -136,28 +132,29 @@ public class IndexResponse implements ActionResponse { @Override public void readFrom(StreamInput in) throws IOException { - index = in.readUTF(); - id = in.readUTF(); - type = in.readUTF(); + super.readFrom(in); + index = in.readString(); + id = in.readString(); + type = in.readString(); version = in.readLong(); if (in.readBoolean()) { int size = in.readVInt(); if (size == 0) { matches = ImmutableList.of(); } else if (size == 1) { - matches = ImmutableList.of(in.readUTF()); + matches = ImmutableList.of(in.readString()); } else if (size == 2) { - matches = ImmutableList.of(in.readUTF(), in.readUTF()); + matches = ImmutableList.of(in.readString(), in.readString()); } else if (size == 3) { - matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF()); + matches = ImmutableList.of(in.readString(), in.readString(), in.readString()); } else if (size == 4) { - matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF()); + matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString()); } else if (size == 5) { - matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF()); + matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString(), in.readString()); } else { matches = new ArrayList(); for (int i = 0; i < size; i++) { - matches.add(in.readUTF()); + matches.add(in.readString()); } } } @@ -165,9 +162,10 @@ public class IndexResponse implements ActionResponse { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(index); - out.writeUTF(id); - out.writeUTF(type); + super.writeTo(out); + out.writeString(index); + out.writeString(id); + out.writeString(type); out.writeLong(version); if (matches == null) { out.writeBoolean(false); @@ -175,7 +173,7 @@ public class IndexResponse implements ActionResponse { out.writeBoolean(true); out.writeVInt(matches.size()); for (String match : matches) { - out.writeUTF(match); + out.writeString(match); } } } diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateResponse.java b/src/main/java/org/elasticsearch/action/percolate/PercolateResponse.java index f56a9d1bb97..3beaed7065b 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateResponse.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateResponse.java @@ -31,7 +31,7 @@ import java.util.List; /** * */ -public class PercolateResponse implements ActionResponse, Iterable { +public class PercolateResponse extends ActionResponse implements Iterable { private List matches; @@ -54,18 +54,20 @@ public class PercolateResponse implements ActionResponse, Iterable { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); int size = in.readVInt(); matches = new ArrayList(size); for (int i = 0; i < size; i++) { - matches.add(in.readUTF()); + matches.add(in.readString()); } } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeVInt(matches.size()); for (String match : matches) { - out.writeUTF(match); + out.writeString(match); } } } diff --git a/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index bb1abea37c0..bdc3e5aeb1a 100644 --- a/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -16,7 +16,7 @@ import java.util.Iterator; /** * A multi search response. */ -public class MultiSearchResponse implements ActionResponse, Iterable, ToXContent { +public class MultiSearchResponse extends ActionResponse implements Iterable, ToXContent { /** * A search response item, holding the actual search response, or an error message if it failed. @@ -85,7 +85,7 @@ public class MultiSearchResponse implements ActionResponse, Iterable shardFailures = ImmutableList.of(); protected BroadcastOperationResponse() { @@ -120,6 +115,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); totalShards = in.readVInt(); successfulShards = in.readVInt(); failedShards = in.readVInt(); @@ -134,6 +130,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeVInt(totalShards); out.writeVInt(successfulShards); out.writeVInt(failedShards); diff --git a/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationResponse.java b/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationResponse.java index 2e19741619d..61038d5c751 100644 --- a/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationResponse.java +++ b/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationResponse.java @@ -21,17 +21,16 @@ package org.elasticsearch.action.support.broadcast; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; /** * */ -public abstract class BroadcastShardOperationResponse implements Streamable { +public abstract class BroadcastShardOperationResponse extends TransportResponse { String index; - int shardId; protected BroadcastShardOperationResponse() { @@ -61,13 +60,15 @@ public abstract class BroadcastShardOperationResponse implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { - index = in.readUTF(); + super.readFrom(in); + index = in.readString(); shardId = in.readVInt(); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(index); + super.writeTo(out); + out.writeString(index); out.writeVInt(shardId); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/support/nodes/NodeOperationResponse.java b/src/main/java/org/elasticsearch/action/support/nodes/NodeOperationResponse.java index ca648a935f6..59c839a3695 100644 --- a/src/main/java/org/elasticsearch/action/support/nodes/NodeOperationResponse.java +++ b/src/main/java/org/elasticsearch/action/support/nodes/NodeOperationResponse.java @@ -22,16 +22,14 @@ package org.elasticsearch.action.support.nodes; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; /** * A base class for node level operations. - * - * */ -public abstract class NodeOperationResponse implements Streamable { +public abstract class NodeOperationResponse extends TransportResponse { private DiscoveryNode node; @@ -58,11 +56,13 @@ public abstract class NodeOperationResponse implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); node = DiscoveryNode.readNode(in); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); node.writeTo(out); } } diff --git a/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationResponse.java b/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationResponse.java index a3d5bcd0af8..a324c351386 100644 --- a/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationResponse.java +++ b/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationResponse.java @@ -32,12 +32,10 @@ import java.util.Map; /** * */ -public abstract class NodesOperationResponse implements ActionResponse, Iterable { +public abstract class NodesOperationResponse extends ActionResponse implements Iterable { private ClusterName clusterName; - protected NodeResponse[] nodes; - private Map nodesMap; protected NodesOperationResponse() { @@ -89,11 +87,13 @@ public abstract class NodesOperationResponse matches; - private GetResult getResult; public UpdateResponse() { @@ -147,28 +142,29 @@ public class UpdateResponse implements ActionResponse { @Override public void readFrom(StreamInput in) throws IOException { - index = in.readUTF(); - id = in.readUTF(); - type = in.readUTF(); + super.readFrom(in); + index = in.readString(); + id = in.readString(); + type = in.readString(); version = in.readLong(); if (in.readBoolean()) { int size = in.readVInt(); if (size == 0) { matches = ImmutableList.of(); } else if (size == 1) { - matches = ImmutableList.of(in.readUTF()); + matches = ImmutableList.of(in.readString()); } else if (size == 2) { - matches = ImmutableList.of(in.readUTF(), in.readUTF()); + matches = ImmutableList.of(in.readString(), in.readString()); } else if (size == 3) { - matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF()); + matches = ImmutableList.of(in.readString(), in.readString(), in.readString()); } else if (size == 4) { - matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF()); + matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString()); } else if (size == 5) { - matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF()); + matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString(), in.readString()); } else { matches = new ArrayList(); for (int i = 0; i < size; i++) { - matches.add(in.readUTF()); + matches.add(in.readString()); } } } @@ -179,9 +175,10 @@ public class UpdateResponse implements ActionResponse { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(index); - out.writeUTF(id); - out.writeUTF(type); + super.writeTo(out); + out.writeString(index); + out.writeString(id); + out.writeString(type); out.writeLong(version); if (matches == null) { out.writeBoolean(false); @@ -189,7 +186,7 @@ public class UpdateResponse implements ActionResponse { out.writeBoolean(true); out.writeVInt(matches.size()); for (String match : matches) { - out.writeUTF(match); + out.writeString(match); } } if (getResult == null) { diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 7b8f1a7281a..5b83cecd769 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -104,13 +104,15 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction startedShardsQueue = ConcurrentCollections.newBlockingQueue(); @@ -83,7 +79,7 @@ public class ShardStateAction extends AbstractComponent { innerShardFailed(shardRouting, reason); } else { transportService.sendRequest(clusterService.state().nodes().masterNode(), - ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to send failed shard to [{}]", exp, clusterService.state().nodes().masterNode()); @@ -101,7 +97,7 @@ public class ShardStateAction extends AbstractComponent { innerShardStarted(shardRouting, reason); } else { transportService.sendRequest(clusterService.state().nodes().masterNode(), - ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to send shard started to [{}]", exp, clusterService.state().nodes().masterNode()); @@ -206,7 +202,7 @@ public class ShardStateAction extends AbstractComponent { @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { innerShardFailed(request.shardRouting, request.reason); - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } @Override @@ -227,7 +223,7 @@ public class ShardStateAction extends AbstractComponent { @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { innerShardStarted(request.shardRouting, request.reason); - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } @Override diff --git a/src/main/java/org/elasticsearch/common/io/stream/LongStreamable.java b/src/main/java/org/elasticsearch/common/io/stream/LongStreamable.java deleted file mode 100644 index 27c4d7dc0c2..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/LongStreamable.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import java.io.IOException; - -/** - * - */ -public class LongStreamable implements Streamable { - - private long value; - - public LongStreamable() { - } - - public LongStreamable(long value) { - this.value = value; - } - - public void set(long newValue) { - value = newValue; - } - - public long get() { - return this.value; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - value = in.readLong(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeLong(value); - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/io/stream/StringStreamable.java b/src/main/java/org/elasticsearch/common/io/stream/StringStreamable.java deleted file mode 100644 index 08bdb8e051a..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/StringStreamable.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import java.io.IOException; - -/** - * - */ -public class StringStreamable implements Streamable { - - private String value; - - public StringStreamable() { - } - - public StringStreamable(String value) { - this.value = value; - } - - public void set(String newValue) { - value = newValue; - } - - public String get() { - return this.value; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - value = in.readUTF(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(value); - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/io/stream/VoidStreamable.java b/src/main/java/org/elasticsearch/common/io/stream/VoidStreamable.java deleted file mode 100644 index f6841d9444c..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/VoidStreamable.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import java.io.IOException; - -/** - * - */ -public class VoidStreamable implements Streamable { - - public static final VoidStreamable INSTANCE = new VoidStreamable(); - - @Override - public void readFrom(StreamInput in) throws IOException { - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - } -} diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 370439bf335..f03946da3bf 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; @@ -509,7 +508,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return rejoin(currentState, "zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]"); } else { logger.warn("received cluster state from [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster", newState.nodes().masterNode(), newState.nodes().masterNode()); - transportService.sendRequest(newState.nodes().masterNode(), RejoinClusterRequestHandler.ACTION, new RejoinClusterRequest(currentState.nodes().localNodeId()), new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(newState.nodes().masterNode(), RejoinClusterRequestHandler.ACTION, new RejoinClusterRequest(currentState.nodes().localNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to send rejoin request to [{}]", exp, newState.nodes().masterNode()); @@ -808,7 +807,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Override public ClusterState execute(ClusterState currentState) { try { - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (Exception e) { logger.warn("failed to send response on rejoin cluster request handling", e); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index e517d089f9d..827dd710e5b 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; @@ -424,7 +423,7 @@ public class MasterFaultDetection extends AbstractComponent { } } - private static class MasterPingResponseResponse implements Streamable { + private static class MasterPingResponseResponse extends TransportResponse { private boolean connectedToMaster; @@ -437,11 +436,13 @@ public class MasterFaultDetection extends AbstractComponent { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); connectedToMaster = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeBoolean(connectedToMaster); } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index fe6c3100ae7..9bcf7f95937 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; @@ -330,17 +329,19 @@ public class NodesFaultDetection extends AbstractComponent { } } - private static class PingResponse implements Streamable { + private static class PingResponse extends TransportResponse { private PingResponse() { } @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index fcd6ee7144c..b5369a78d56 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -25,8 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; @@ -71,15 +69,15 @@ public class MembershipAction extends AbstractComponent { } public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) { - transportService.sendRequest(node, LeaveRequestRequestHandler.ACTION, new LeaveRequest(masterNode), VoidTransportResponseHandler.INSTANCE_SAME); + transportService.sendRequest(node, LeaveRequestRequestHandler.ACTION, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME); } public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException { - transportService.submitRequest(masterNode, LeaveRequestRequestHandler.ACTION, new LeaveRequest(node), VoidTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); + transportService.submitRequest(masterNode, LeaveRequestRequestHandler.ACTION, new LeaveRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) { - transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), VoidTransportResponseHandler.INSTANCE_SAME); + transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), EmptyTransportResponseHandler.INSTANCE_SAME); } public ClusterState sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException { @@ -95,7 +93,7 @@ public class MembershipAction extends AbstractComponent { * Validates the join request, throwing a failure if it failed. */ public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState clusterState, TimeValue timeout) throws ElasticSearchException { - transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(clusterState), VoidTransportResponseHandler.INSTANCE_SAME) + transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(clusterState), EmptyTransportResponseHandler.INSTANCE_SAME) .txGet(timeout.millis(), TimeUnit.MILLISECONDS); } @@ -128,7 +126,7 @@ public class MembershipAction extends AbstractComponent { } } - class JoinResponse implements Streamable { + class JoinResponse extends TransportResponse { ClusterState clusterState; @@ -141,11 +139,13 @@ public class MembershipAction extends AbstractComponent { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); ClusterState.Builder.writeTo(clusterState, out); } } @@ -165,7 +165,7 @@ public class MembershipAction extends AbstractComponent { if (request.withClusterState) { channel.sendResponse(new JoinResponse(clusterState)); } else { - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -211,7 +211,7 @@ public class MembershipAction extends AbstractComponent { @Override public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception { // for now, the mere fact that we can serialize the cluster state acts as validation.... - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } @Override @@ -256,7 +256,7 @@ public class MembershipAction extends AbstractComponent { @Override public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception { listener.onLeave(request.node); - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } @Override diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 68ed75d5cde..2c77cd5ccdd 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -316,7 +316,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } else { responses.put(request.pingResponse.target(), request.pingResponse); } - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } @Override @@ -534,7 +534,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem // connect to the node if possible try { transportService.connectToNode(requestingNode); - transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); @@ -546,7 +546,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } }); } else { - transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index a6f090b6a93..4e063a3c88e 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -431,7 +430,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } } - static class UnicastPingResponse implements Streamable { + static class UnicastPingResponse extends TransportResponse { int id; @@ -442,6 +441,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); id = in.readInt(); pingResponses = new PingResponse[in.readVInt()]; for (int i = 0; i < pingResponses.length; i++) { @@ -451,6 +451,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeInt(id); out.writeVInt(pingResponses.length); for (PingResponse pingResponse : pingResponses) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index cffa4b24b94..956b9dc7686 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -27,7 +27,10 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; -import org.elasticsearch.common.io.stream.*; +import org.elasticsearch.common.io.stream.CachedStreamInput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.threadpool.ThreadPool; @@ -95,7 +98,7 @@ public class PublishClusterStateAction extends AbstractComponent { new PublishClusterStateRequest(entry.bytes().bytes()), TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes - new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); @@ -156,7 +159,7 @@ public class PublishClusterStateAction extends AbstractComponent { in.setVersion(request.version); ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); listener.onNewClusterState(clusterState); - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } @Override diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalAllocateDangledIndices.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalAllocateDangledIndices.java index e678183cb69..cecdcb0e84f 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalAllocateDangledIndices.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalAllocateDangledIndices.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.threadpool.ThreadPool; @@ -202,7 +201,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent { } } - public static class AllocateDangledResponse implements Streamable { + public static class AllocateDangledResponse extends TransportResponse { private boolean ack; @@ -219,11 +218,13 @@ public class LocalAllocateDangledIndices extends AbstractComponent { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); ack = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeBoolean(ack); } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java index 51cd49cb299..44832cd420c 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java @@ -22,7 +22,7 @@ package org.elasticsearch.indices.recovery; import com.google.common.collect.Lists; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; import java.util.List; @@ -30,7 +30,7 @@ import java.util.List; /** * */ -class RecoveryResponse implements Streamable { +class RecoveryResponse extends TransportResponse { List phase1FileNames = Lists.newArrayList(); List phase1FileSizes = Lists.newArrayList(); @@ -54,6 +54,7 @@ class RecoveryResponse implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); int size = in.readVInt(); phase1FileNames = Lists.newArrayListWithCapacity(size); for (int i = 0; i < size; i++) { @@ -89,6 +90,7 @@ class RecoveryResponse implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeVInt(phase1FileNames.size()); for (String name : phase1FileNames) { out.writeUTF(name); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index afdd3c27dc8..0069c758c65 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -122,7 +122,7 @@ public class RecoverySource extends AbstractComponent { RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); final AtomicReference lastException = new AtomicReference(); @@ -157,7 +157,7 @@ public class RecoverySource extends AbstractComponent { indexInput.readBytes(buf, 0, toRead, false); BytesArray content = new BytesArray(buf, 0, toRead); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), name, position, len, md.checksum(), content), - TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); readCount += toRead; } indexInput.close(); @@ -185,7 +185,7 @@ public class RecoverySource extends AbstractComponent { // now, set the clean files request Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); stopWatch.stop(); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); @@ -202,7 +202,7 @@ public class RecoverySource extends AbstractComponent { } logger.trace("[{}][{}] recovery [phase2] to {}: start", request.shardId().index().name(), request.shardId().id(), request.targetNode()); StopWatch stopWatch = new StopWatch().start(); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis(); logger.trace("[{}][{}] recovery [phase2] to {}: start took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); @@ -224,7 +224,7 @@ public class RecoverySource extends AbstractComponent { logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); StopWatch stopWatch = new StopWatch().start(); int totalOperations = sendSnapshot(snapshot); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); if (request.markAsRelocated()) { // TODO what happens if the recovery process fails afterwards, we need to mark this back to started try { @@ -262,7 +262,7 @@ public class RecoverySource extends AbstractComponent { } RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); ops = 0; size = 0; operations.clear(); @@ -271,7 +271,7 @@ public class RecoverySource extends AbstractComponent { // send the leftover if (!operations.isEmpty()) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } return totalOperations; } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 0caf6ec54da..e152fba07d9 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -373,7 +372,7 @@ public class RecoveryTarget extends AbstractComponent { onGoingRecovery.stage = RecoveryStatus.Stage.TRANSLOG; onGoingRecovery.indexShard.performRecoveryPrepareForTranslog(); - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -405,7 +404,7 @@ public class RecoveryTarget extends AbstractComponent { onGoingRecovery.indexShard.performRecoveryFinalization(false, onGoingRecovery); onGoingRecovery.time = System.currentTimeMillis() - onGoingRecovery.startTime; onGoingRecovery.stage = RecoveryStatus.Stage.DONE; - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -443,7 +442,7 @@ public class RecoveryTarget extends AbstractComponent { shard.performRecoveryOperation(operation); onGoingRecovery.currentTranslogOperations++; } - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -478,7 +477,7 @@ public class RecoveryTarget extends AbstractComponent { onGoingRecovery.phase1TotalSize = request.phase1TotalSize; onGoingRecovery.phase1ExistingTotalSize = request.phase1ExistingTotalSize; onGoingRecovery.stage = RecoveryStatus.Stage.INDEX; - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -549,7 +548,7 @@ public class RecoveryTarget extends AbstractComponent { } } } - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -646,7 +645,7 @@ public class RecoveryTarget extends AbstractComponent { throw e; } } - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } } diff --git a/src/main/java/org/elasticsearch/jmx/action/GetJmxServiceUrlAction.java b/src/main/java/org/elasticsearch/jmx/action/GetJmxServiceUrlAction.java index c2ff257457e..b1e5c8e0ff0 100644 --- a/src/main/java/org/elasticsearch/jmx/action/GetJmxServiceUrlAction.java +++ b/src/main/java/org/elasticsearch/jmx/action/GetJmxServiceUrlAction.java @@ -24,12 +24,15 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StringStreamable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.jmx.JmxService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; +import java.io.IOException; + /** * */ @@ -56,12 +59,12 @@ public class GetJmxServiceUrlAction extends AbstractComponent { if (clusterService.state().nodes().localNodeId().equals(node.id())) { return jmxService.publishUrl(); } else { - return transportService.submitRequest(node, GetJmxServiceUrlTransportHandler.ACTION, TransportRequest.Empty.INSTANCE, new FutureTransportResponseHandler() { + return transportService.submitRequest(node, GetJmxServiceUrlTransportHandler.ACTION, TransportRequest.Empty.INSTANCE, new FutureTransportResponseHandler() { @Override - public StringStreamable newInstance() { - return new StringStreamable(); + public GetJmxServiceUrlResponse newInstance() { + return new GetJmxServiceUrlResponse(); } - }).txGet().get(); + }).txGet().url(); } } @@ -81,7 +84,35 @@ public class GetJmxServiceUrlAction extends AbstractComponent { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) throws Exception { - channel.sendResponse(new StringStreamable(jmxService.publishUrl())); + channel.sendResponse(new GetJmxServiceUrlResponse(jmxService.publishUrl())); + } + } + + static class GetJmxServiceUrlResponse extends TransportResponse { + + private String url; + + GetJmxServiceUrlResponse() { + } + + GetJmxServiceUrlResponse(String url) { + this.url = url; + } + + public String url() { + return this.url; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + url = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(url); } } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java b/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java index bc550aa87d8..c39412886a6 100644 --- a/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java +++ b/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -76,7 +75,7 @@ public class PublishRiverClusterStateAction extends AbstractComponent { continue; } - transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); @@ -126,7 +125,7 @@ public class PublishRiverClusterStateAction extends AbstractComponent { @Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception { listener.onNewClusterState(request.clusterState); - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } } } diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index a37bd76eec8..4a9edd6a5d2 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.search.SearchService; @@ -51,7 +50,7 @@ import java.io.IOException; */ public class SearchServiceTransportAction extends AbstractComponent { - static final class FreeContextResponseHandler extends VoidTransportResponseHandler { + static final class FreeContextResponseHandler extends EmptyTransportResponseHandler { private final ESLogger logger; @@ -483,7 +482,7 @@ public class SearchServiceTransportAction extends AbstractComponent { @Override public void messageReceived(SearchFreeContextRequest request, TransportChannel channel) throws Exception { searchService.freeContext(request.id()); - channel.sendResponse(VoidStreamable.INSTANCE); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } @Override diff --git a/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java b/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java index 17ccdb3a8a6..777cb8da3ef 100644 --- a/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java +++ b/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java @@ -24,26 +24,22 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; /** * */ -public class DfsSearchResult implements SearchPhaseResult { +public class DfsSearchResult extends TransportResponse implements SearchPhaseResult { private static Term[] EMPTY_TERMS = new Term[0]; - private static int[] EMPTY_FREQS = new int[0]; private SearchShardTarget shardTarget; - private long id; - private Term[] terms; - private int[] freqs; - private int maxDoc; public DfsSearchResult() { @@ -99,6 +95,7 @@ public class DfsSearchResult implements SearchPhaseResult { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); id = in.readLong(); // shardTarget = readSearchShardTarget(in); int termsSize = in.readVInt(); @@ -124,6 +121,7 @@ public class DfsSearchResult implements SearchPhaseResult { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeLong(id); // shardTarget.writeTo(out); out.writeVInt(terms.length); diff --git a/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index f3bed30383d..dd8bca86e0c 100644 --- a/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -21,9 +21,9 @@ package org.elasticsearch.search.fetch; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchHits; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; @@ -32,14 +32,11 @@ import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext /** * */ -public class FetchSearchResult implements Streamable, FetchSearchResultProvider { +public class FetchSearchResult extends TransportResponse implements FetchSearchResultProvider { private long id; - private SearchShardTarget shardTarget; - private InternalSearchHits hits; - // client side counter private transient int counter; @@ -95,12 +92,14 @@ public class FetchSearchResult implements Streamable, FetchSearchResultProvider @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); id = in.readLong(); hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeLong(id); hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); } diff --git a/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java b/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java index dec5a4dd2be..fa37c45f40f 100644 --- a/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java +++ b/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java @@ -21,10 +21,10 @@ package org.elasticsearch.search.fetch; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; @@ -34,10 +34,9 @@ import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchRe /** * */ -public class QueryFetchSearchResult implements Streamable, QuerySearchResultProvider, FetchSearchResultProvider { +public class QueryFetchSearchResult extends TransportResponse implements QuerySearchResultProvider, FetchSearchResultProvider { private QuerySearchResult queryResult; - private FetchSearchResult fetchResult; public QueryFetchSearchResult() { @@ -84,12 +83,14 @@ public class QueryFetchSearchResult implements Streamable, QuerySearchResultProv @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); queryResult = readQuerySearchResult(in); fetchResult = readFetchSearchResult(in); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); queryResult.writeTo(out); fetchResult.writeTo(out); } diff --git a/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java b/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java index 7efad49a640..b83d61b605f 100644 --- a/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java +++ b/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java @@ -21,8 +21,8 @@ package org.elasticsearch.search.fetch; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; @@ -32,10 +32,9 @@ import static org.elasticsearch.search.fetch.QueryFetchSearchResult.readQueryFet /** * */ -public class ScrollQueryFetchSearchResult implements Streamable { +public class ScrollQueryFetchSearchResult extends TransportResponse { private QueryFetchSearchResult result; - private SearchShardTarget shardTarget; public ScrollQueryFetchSearchResult() { @@ -56,6 +55,7 @@ public class ScrollQueryFetchSearchResult implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); shardTarget = readSearchShardTarget(in); result = readQueryFetchSearchResult(in); result.shardTarget(shardTarget); @@ -63,6 +63,7 @@ public class ScrollQueryFetchSearchResult implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); shardTarget.writeTo(out); result.writeTo(out); } diff --git a/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 8a574ef3aaf..6ed9ae81fa5 100644 --- a/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -22,10 +22,10 @@ package org.elasticsearch.search.query; import org.apache.lucene.search.TopDocs; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.facet.Facets; import org.elasticsearch.search.facet.InternalFacets; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; @@ -35,20 +35,14 @@ import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; /** * */ -public class QuerySearchResult implements Streamable, QuerySearchResultProvider { +public class QuerySearchResult extends TransportResponse implements QuerySearchResultProvider { private long id; - private SearchShardTarget shardTarget; - private int from; - private int size; - private TopDocs topDocs; - private InternalFacets facets; - private boolean searchTimedOut; public QuerySearchResult() { @@ -133,6 +127,7 @@ public class QuerySearchResult implements Streamable, QuerySearchResultProvider @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); id = in.readLong(); // shardTarget = readSearchShardTarget(in); from = in.readVInt(); @@ -146,6 +141,7 @@ public class QuerySearchResult implements Streamable, QuerySearchResultProvider @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeLong(id); // shardTarget.writeTo(out); out.writeVInt(from); diff --git a/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java b/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java index c73880612d3..e481e5a797c 100644 --- a/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java +++ b/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java @@ -21,8 +21,8 @@ package org.elasticsearch.search.query; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; @@ -32,10 +32,9 @@ import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchRe /** * */ -public class ScrollQuerySearchResult implements Streamable { +public class ScrollQuerySearchResult extends TransportResponse { private QuerySearchResult queryResult; - private SearchShardTarget shardTarget; public ScrollQuerySearchResult() { @@ -56,6 +55,7 @@ public class ScrollQuerySearchResult implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); shardTarget = readSearchShardTarget(in); queryResult = readQuerySearchResult(in); queryResult.shardTarget(shardTarget); @@ -63,6 +63,7 @@ public class ScrollQuerySearchResult implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); shardTarget.writeTo(out); queryResult.writeTo(out); } diff --git a/src/main/java/org/elasticsearch/transport/BaseTransportResponseHandler.java b/src/main/java/org/elasticsearch/transport/BaseTransportResponseHandler.java index 41c676d7e83..376e9c324b1 100644 --- a/src/main/java/org/elasticsearch/transport/BaseTransportResponseHandler.java +++ b/src/main/java/org/elasticsearch/transport/BaseTransportResponseHandler.java @@ -19,13 +19,9 @@ package org.elasticsearch.transport; -import org.elasticsearch.common.io.stream.Streamable; - /** * A simple based class that always spawns. - * - * */ -public abstract class BaseTransportResponseHandler implements TransportResponseHandler { +public abstract class BaseTransportResponseHandler implements TransportResponseHandler { } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/transport/VoidTransportResponseHandler.java b/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java similarity index 71% rename from src/main/java/org/elasticsearch/transport/VoidTransportResponseHandler.java rename to src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java index ced0a6a610e..ba6c5f8c767 100644 --- a/src/main/java/org/elasticsearch/transport/VoidTransportResponseHandler.java +++ b/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java @@ -19,29 +19,28 @@ package org.elasticsearch.transport; -import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.threadpool.ThreadPool; /** * */ -public class VoidTransportResponseHandler implements TransportResponseHandler { +public class EmptyTransportResponseHandler implements TransportResponseHandler { - public static final VoidTransportResponseHandler INSTANCE_SAME = new VoidTransportResponseHandler(ThreadPool.Names.SAME); + public static final EmptyTransportResponseHandler INSTANCE_SAME = new EmptyTransportResponseHandler(ThreadPool.Names.SAME); private final String executor; - public VoidTransportResponseHandler(String executor) { + public EmptyTransportResponseHandler(String executor) { this.executor = executor; } @Override - public VoidStreamable newInstance() { - return VoidStreamable.INSTANCE; + public TransportResponse.Empty newInstance() { + return TransportResponse.Empty.INSTANCE; } @Override - public void handleResponse(VoidStreamable response) { + public void handleResponse(TransportResponse.Empty response) { } @Override diff --git a/src/main/java/org/elasticsearch/transport/FutureTransportResponseHandler.java b/src/main/java/org/elasticsearch/transport/FutureTransportResponseHandler.java index ffd8d7d20a7..66da0cb1b2e 100644 --- a/src/main/java/org/elasticsearch/transport/FutureTransportResponseHandler.java +++ b/src/main/java/org/elasticsearch/transport/FutureTransportResponseHandler.java @@ -19,15 +19,12 @@ package org.elasticsearch.transport; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.threadpool.ThreadPool; /** * A response handler to be used when all interaction will be done through the {@link TransportFuture}. - * - * */ -public abstract class FutureTransportResponseHandler extends BaseTransportResponseHandler { +public abstract class FutureTransportResponseHandler extends BaseTransportResponseHandler { @Override public void handleResponse(T response) { diff --git a/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java b/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java index dc4f9686a80..43caa45c5dd 100644 --- a/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java +++ b/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java @@ -22,7 +22,6 @@ package org.elasticsearch.transport; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchInterruptedException; import org.elasticsearch.ElasticSearchTimeoutException; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.util.concurrent.BaseFuture; import java.util.concurrent.ExecutionException; @@ -32,7 +31,7 @@ import java.util.concurrent.TimeoutException; /** * */ -public class PlainTransportFuture extends BaseFuture implements TransportFuture, TransportResponseHandler { +public class PlainTransportFuture extends BaseFuture implements TransportFuture, TransportResponseHandler { private final TransportResponseHandler handler; diff --git a/src/main/java/org/elasticsearch/transport/TransportChannel.java b/src/main/java/org/elasticsearch/transport/TransportChannel.java index 355eb98804d..0910aacb21c 100644 --- a/src/main/java/org/elasticsearch/transport/TransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/TransportChannel.java @@ -19,22 +19,18 @@ package org.elasticsearch.transport; -import org.elasticsearch.common.io.stream.Streamable; - import java.io.IOException; /** * A transport channel allows to send a response to a request on the channel. - * - * */ public interface TransportChannel { String action(); - void sendResponse(Streamable message) throws IOException; + void sendResponse(TransportResponse response) throws IOException; - void sendResponse(Streamable message, TransportResponseOptions options) throws IOException; + void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException; void sendResponse(Throwable error) throws IOException; } diff --git a/src/main/java/org/elasticsearch/transport/TransportResponse.java b/src/main/java/org/elasticsearch/transport/TransportResponse.java new file mode 100644 index 00000000000..4fc2d0d8f38 --- /dev/null +++ b/src/main/java/org/elasticsearch/transport/TransportResponse.java @@ -0,0 +1,99 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import com.google.common.collect.Maps; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + */ +public abstract class TransportResponse implements Streamable { + + public static class Empty extends TransportResponse { + + public static final Empty INSTANCE = new Empty(); + + public Empty() { + super(); + } + + public Empty(TransportResponse request) { + super(request); + } + } + + private Map headers; + + protected TransportResponse() { + + } + + protected TransportResponse(TransportResponse request) { + // create a new copy of the headers, since we are creating a new request which might have + // its headers changed in the context of that specific request + if (request.getHeaders() != null) { + this.headers = new HashMap(request.getHeaders()); + } + } + + @SuppressWarnings("unchecked") + public final TransportResponse putHeader(String key, Object value) { + if (headers == null) { + headers = Maps.newHashMap(); + } + headers.put(key, value); + return this; + } + + @SuppressWarnings("unchecked") + public final V getHeader(String key) { + if (headers == null) { + return null; + } + return (V) headers.get(key); + } + + public Map getHeaders() { + return this.headers; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + if (in.readBoolean()) { + headers = in.readMap(); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (headers == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeMap(headers); + } + } +} diff --git a/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index d40bf0e2751..d278e61a2c9 100644 --- a/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -19,12 +19,10 @@ package org.elasticsearch.transport; -import org.elasticsearch.common.io.stream.Streamable; - /** * */ -public interface TransportResponseHandler { +public interface TransportResponseHandler { /** * creates a new instance of the return type from the remote call. diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index 25eab8a26d2..718fd0faa8f 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -155,25 +154,25 @@ public class TransportService extends AbstractLifecycleComponent TransportFuture submitRequest(DiscoveryNode node, String action, TransportRequest request, - TransportResponseHandler handler) throws TransportException { + public TransportFuture submitRequest(DiscoveryNode node, String action, TransportRequest request, + TransportResponseHandler handler) throws TransportException { return submitRequest(node, action, request, TransportRequestOptions.EMPTY, handler); } - public TransportFuture submitRequest(DiscoveryNode node, String action, TransportRequest request, - TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { + public TransportFuture submitRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { PlainTransportFuture futureHandler = new PlainTransportFuture(handler); sendRequest(node, action, request, options, futureHandler); return futureHandler; } - public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, - final TransportResponseHandler handler) throws TransportException { + public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, + final TransportResponseHandler handler) throws TransportException { sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler); } - public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, - final TransportRequestOptions options, final TransportResponseHandler handler) throws TransportException { + public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, + final TransportRequestOptions options, final TransportResponseHandler handler) throws TransportException { final long requestId = newRequestId(); TimeoutHandler timeoutHandler = null; try { @@ -380,7 +379,7 @@ public class TransportService extends AbstractLifecycleComponent { + static class RequestHolder { private final TransportResponseHandler handler; diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 81c03420e47..7e94c26e1ef 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -253,11 +253,11 @@ public class LocalTransport extends AbstractLifecycleComponent implem private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) { - final Streamable streamable = handler.newInstance(); + final TransportResponse response = handler.newInstance(); try { - streamable.readFrom(buffer); + response.readFrom(buffer); } catch (Exception e) { - handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e)); + handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); return; } threadPool.executor(handler.executor()).execute(new Runnable() { @@ -265,7 +265,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem @Override public void run() { try { - handler.handleResponse(streamable); + handler.handleResponse(response); } catch (Exception e) { handleException(handler, new ResponseHandlerFailureTransportException(e)); } diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index 567435b87b4..8f98758253e 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -23,11 +23,7 @@ import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.transport.NotSerializableTransportException; -import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportResponseOptions; +import org.elasticsearch.transport.*; import org.elasticsearch.transport.support.TransportStatus; import java.io.IOException; @@ -60,12 +56,12 @@ public class LocalTransportChannel implements TransportChannel { } @Override - public void sendResponse(Streamable message) throws IOException { - sendResponse(message, TransportResponseOptions.EMPTY); + public void sendResponse(TransportResponse response) throws IOException { + sendResponse(response, TransportResponseOptions.EMPTY); } @Override - public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { StreamOutput stream = cachedEntry.handles(); @@ -73,7 +69,7 @@ public class LocalTransportChannel implements TransportChannel { byte status = 0; status = TransportStatus.setResponse(status); stream.writeByte(status); // 0 for request, 1 for response. - message.writeTo(stream); + response.writeTo(stream); stream.close(); final byte[] data = cachedEntry.bytes().bytes().copyBytesArray().toBytes(); targetTransport.threadPool().generic().execute(new Runnable() { diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 8bf8a459f71..77a4cd2dcbd 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -144,19 +143,19 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) { - final Streamable streamable = handler.newInstance(); + final TransportResponse response = handler.newInstance(); try { - streamable.readFrom(buffer); + response.readFrom(buffer); } catch (Exception e) { - handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e)); + handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); return; } try { if (handler.executor() == ThreadPool.Names.SAME) { //noinspection unchecked - handler.handleResponse(streamable); + handler.handleResponse(response); } else { - threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, streamable)); + threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response)); } } catch (Exception e) { handleException(handler, new ResponseHandlerFailureTransportException(e)); @@ -196,7 +195,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { - final String action = buffer.readUTF(); + final String action = buffer.readString(); final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); try { @@ -231,18 +230,18 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { class ResponseHandler implements Runnable { private final TransportResponseHandler handler; - private final Streamable streamable; + private final TransportResponse response; - public ResponseHandler(TransportResponseHandler handler, Streamable streamable) { + public ResponseHandler(TransportResponseHandler handler, TransportResponse response) { this.handler = handler; - this.streamable = streamable; + this.response = response; } @SuppressWarnings({"unchecked"}) @Override public void run() { try { - handler.handleResponse(streamable); + handler.handleResponse(response); } catch (Exception e) { handleException(handler, new ResponseHandlerFailureTransportException(e)); } diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 04850d98e7d..54afbd4ad91 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -25,11 +25,7 @@ import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.transport.NotSerializableTransportException; -import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportResponseOptions; +import org.elasticsearch.transport.*; import org.elasticsearch.transport.support.TransportStatus; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; @@ -67,12 +63,12 @@ public class NettyTransportChannel implements TransportChannel { } @Override - public void sendResponse(Streamable message) throws IOException { - sendResponse(message, TransportResponseOptions.EMPTY); + public void sendResponse(TransportResponse response) throws IOException { + sendResponse(response, TransportResponseOptions.EMPTY); } @Override - public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { if (transport.compress) { options.withCompress(true); } @@ -86,13 +82,13 @@ public class NettyTransportChannel implements TransportChannel { cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor()); stream.setVersion(version); - message.writeTo(stream); + response.writeTo(stream); stream.close(); } else { StreamOutput stream = cachedEntry.handles(); stream.setVersion(version); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); - message.writeTo(stream); + response.writeTo(stream); stream.close(); } ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); diff --git a/src/test/java/org/elasticsearch/benchmark/transport/netty/BenchmarkMessageRequest.java b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessageRequest.java similarity index 96% rename from src/test/java/org/elasticsearch/benchmark/transport/netty/BenchmarkMessageRequest.java rename to src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessageRequest.java index b7015a8eb38..73edc9948d1 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/netty/BenchmarkMessageRequest.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessageRequest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.benchmark.transport.netty; +package org.elasticsearch.benchmark.transport; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; diff --git a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessage.java b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessageResponse.java similarity index 73% rename from src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessage.java rename to src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessageResponse.java index dca7e8b3ccf..7895083c2da 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessage.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkMessageResponse.java @@ -21,29 +21,42 @@ package org.elasticsearch.benchmark.transport; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; /** * */ -public class BenchmarkMessage implements Streamable { +public class BenchmarkMessageResponse extends TransportResponse { long id; - byte[] payload; - public BenchmarkMessage(long id, byte[] payload) { + public BenchmarkMessageResponse(BenchmarkMessageRequest request) { + this.id = request.id; + this.payload = request.payload; + } + + public BenchmarkMessageResponse(long id, byte[] payload) { this.id = id; this.payload = payload; } - public BenchmarkMessage() { + public BenchmarkMessageResponse() { + } + + public long id() { + return id; + } + + public byte[] payload() { + return payload; } @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); id = in.readLong(); payload = new byte[in.readVInt()]; in.readFully(payload); @@ -51,6 +64,7 @@ public class BenchmarkMessage implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeLong(id); out.writeVInt(payload.length); out.writeBytes(payload); diff --git a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index 7b273d34fd6..92ddf7c0d7e 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -19,7 +19,6 @@ package org.elasticsearch.benchmark.transport; -import org.elasticsearch.benchmark.transport.netty.BenchmarkMessageRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Bytes; import org.elasticsearch.common.settings.ImmutableSettings; @@ -73,7 +72,7 @@ public class BenchmarkNettyLargeMessages { @Override public void messageReceived(BenchmarkMessageRequest request, TransportChannel channel) throws Exception { - channel.sendResponse(request); + channel.sendResponse(new BenchmarkMessageResponse(request)); } }); @@ -84,10 +83,10 @@ public class BenchmarkNettyLargeMessages { public void run() { for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload); - transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler() { @Override - public BenchmarkMessage newInstance() { - return new BenchmarkMessage(); + public BenchmarkMessageResponse newInstance() { + return new BenchmarkMessageResponse(); } @Override @@ -96,7 +95,7 @@ public class BenchmarkNettyLargeMessages { } @Override - public void handleResponse(BenchmarkMessage response) { + public void handleResponse(BenchmarkMessageResponse response) { } @Override @@ -116,10 +115,10 @@ public class BenchmarkNettyLargeMessages { for (int i = 0; i < 1; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, Bytes.EMPTY_ARRAY); long start = System.currentTimeMillis(); - transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler() { @Override - public BenchmarkMessage newInstance() { - return new BenchmarkMessage(); + public BenchmarkMessageResponse newInstance() { + return new BenchmarkMessageResponse(); } @Override @@ -128,7 +127,7 @@ public class BenchmarkNettyLargeMessages { } @Override - public void handleResponse(BenchmarkMessage response) { + public void handleResponse(BenchmarkMessageResponse response) { } @Override diff --git a/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java b/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java index 314762b004f..071fd43e38d 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java @@ -19,7 +19,6 @@ package org.elasticsearch.benchmark.transport; -import org.elasticsearch.benchmark.transport.netty.BenchmarkMessageRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.settings.ImmutableSettings; @@ -91,7 +90,7 @@ public class TransportBenchmark { @Override public void messageReceived(BenchmarkMessageRequest request, TransportChannel channel) throws Exception { - channel.sendResponse(request); + channel.sendResponse(new BenchmarkMessageResponse(request)); } }); @@ -99,10 +98,10 @@ public class TransportBenchmark { for (int i = 0; i < 10000; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload); - clientTransportService.submitRequest(node, "benchmark", message, new BaseTransportResponseHandler() { + clientTransportService.submitRequest(node, "benchmark", message, new BaseTransportResponseHandler() { @Override - public BenchmarkMessage newInstance() { - return new BenchmarkMessage(); + public BenchmarkMessageResponse newInstance() { + return new BenchmarkMessageResponse(); } @Override @@ -111,7 +110,7 @@ public class TransportBenchmark { } @Override - public void handleResponse(BenchmarkMessage response) { + public void handleResponse(BenchmarkMessageResponse response) { } @Override @@ -131,10 +130,10 @@ public class TransportBenchmark { for (int j = 0; j < NUMBER_OF_ITERATIONS; j++) { final long id = idGenerator.incrementAndGet(); BenchmarkMessageRequest request = new BenchmarkMessageRequest(id, payload); - BaseTransportResponseHandler handler = new BaseTransportResponseHandler() { + BaseTransportResponseHandler handler = new BaseTransportResponseHandler() { @Override - public BenchmarkMessage newInstance() { - return new BenchmarkMessage(); + public BenchmarkMessageResponse newInstance() { + return new BenchmarkMessageResponse(); } @Override @@ -143,9 +142,9 @@ public class TransportBenchmark { } @Override - public void handleResponse(BenchmarkMessage response) { - if (response.id != id) { - System.out.println("NO ID MATCH [" + response.id + "] and [" + id + "]"); + public void handleResponse(BenchmarkMessageResponse response) { + if (response.id() != id) { + System.out.println("NO ID MATCH [" + response.id() + "] and [" + id + "]"); } latch.countDown(); } diff --git a/src/test/java/org/elasticsearch/test/unit/transport/AbstractSimpleTransportTests.java b/src/test/java/org/elasticsearch/test/unit/transport/AbstractSimpleTransportTests.java index 4376bf006dc..5d1219dad06 100644 --- a/src/test/java/org/elasticsearch/test/unit/transport/AbstractSimpleTransportTests.java +++ b/src/test/java/org/elasticsearch/test/unit/transport/AbstractSimpleTransportTests.java @@ -22,8 +22,6 @@ package org.elasticsearch.test.unit.transport; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -86,7 +84,7 @@ public abstract class AbstractSimpleTransportTests { public void messageReceived(StringMessageRequest request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); try { - channel.sendResponse(new StringMessage("hello " + request.message)); + channel.sendResponse(new StringMessageResponse("hello " + request.message)); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -94,11 +92,11 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", - new StringMessageRequest("moshe"), new BaseTransportResponseHandler() { + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", + new StringMessageRequest("moshe"), new BaseTransportResponseHandler() { @Override - public StringMessage newInstance() { - return new StringMessage(); + public StringMessageResponse newInstance() { + return new StringMessageResponse(); } @Override @@ -107,7 +105,7 @@ public abstract class AbstractSimpleTransportTests { } @Override - public void handleResponse(StringMessage response) { + public void handleResponse(StringMessageResponse response) { assertThat("hello moshe", equalTo(response.message)); } @@ -119,7 +117,7 @@ public abstract class AbstractSimpleTransportTests { }); try { - StringMessage message = res.get(); + StringMessageResponse message = res.get(); assertThat("hello moshe", equalTo(message.message)); } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); @@ -144,7 +142,7 @@ public abstract class AbstractSimpleTransportTests { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { try { - channel.sendResponse(VoidStreamable.INSTANCE, TransportResponseOptions.options().withCompress(true)); + channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options().withCompress(true)); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -152,11 +150,11 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", + TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { @Override - public VoidStreamable newInstance() { - return VoidStreamable.INSTANCE; + public TransportResponse.Empty newInstance() { + return TransportResponse.Empty.INSTANCE; } @Override @@ -165,7 +163,7 @@ public abstract class AbstractSimpleTransportTests { } @Override - public void handleResponse(VoidStreamable response) { + public void handleResponse(TransportResponse.Empty response) { } @Override @@ -176,7 +174,7 @@ public abstract class AbstractSimpleTransportTests { }); try { - VoidStreamable message = res.get(); + TransportResponse.Empty message = res.get(); assertThat(message, notNullValue()); } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); @@ -202,7 +200,7 @@ public abstract class AbstractSimpleTransportTests { public void messageReceived(StringMessageRequest request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); try { - channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress(true)); + channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.options().withCompress(true)); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -210,11 +208,11 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", + new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { @Override - public StringMessage newInstance() { - return new StringMessage(); + public StringMessageResponse newInstance() { + return new StringMessageResponse(); } @Override @@ -223,7 +221,7 @@ public abstract class AbstractSimpleTransportTests { } @Override - public void handleResponse(StringMessage response) { + public void handleResponse(StringMessageResponse response) { assertThat("hello moshe", equalTo(response.message)); } @@ -235,7 +233,7 @@ public abstract class AbstractSimpleTransportTests { }); try { - StringMessage message = res.get(); + StringMessageResponse message = res.get(); assertThat("hello moshe", equalTo(message.message)); } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); @@ -264,11 +262,11 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloException", - new StringMessageRequest("moshe"), new BaseTransportResponseHandler() { + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloException", + new StringMessageRequest("moshe"), new BaseTransportResponseHandler() { @Override - public StringMessage newInstance() { - return new StringMessage(); + public StringMessageResponse newInstance() { + return new StringMessageResponse(); } @Override @@ -277,7 +275,7 @@ public abstract class AbstractSimpleTransportTests { } @Override - public void handleResponse(StringMessage response) { + public void handleResponse(StringMessageResponse response) { assertThat("got response instead of exception", false, equalTo(true)); } @@ -342,11 +340,11 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse", - new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler() { + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse", + new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override - public StringMessage newInstance() { - return new StringMessage(); + public StringMessageResponse newInstance() { + return new StringMessageResponse(); } @Override @@ -355,7 +353,7 @@ public abstract class AbstractSimpleTransportTests { } @Override - public void handleResponse(StringMessage response) { + public void handleResponse(StringMessageResponse response) { assertThat("got response instead of exception", false, equalTo(true)); } @@ -366,7 +364,7 @@ public abstract class AbstractSimpleTransportTests { }); try { - StringMessage message = res.txGet(); + StringMessageResponse message = res.txGet(); assertThat("exception should be thrown", false, equalTo(true)); } catch (Exception e) { assertThat(e, instanceOf(ReceiveTimeoutTransportException.class)); @@ -397,7 +395,7 @@ public abstract class AbstractSimpleTransportTests { // ignore } try { - channel.sendResponse(new StringMessage("hello " + request.message)); + channel.sendResponse(new StringMessageResponse("hello " + request.message)); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -405,11 +403,11 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler() { + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse", + new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override - public StringMessage newInstance() { - return new StringMessage(); + public StringMessageResponse newInstance() { + return new StringMessageResponse(); } @Override @@ -418,7 +416,7 @@ public abstract class AbstractSimpleTransportTests { } @Override - public void handleResponse(StringMessage response) { + public void handleResponse(StringMessageResponse response) { assertThat("got response instead of exception", false, equalTo(true)); } @@ -429,7 +427,7 @@ public abstract class AbstractSimpleTransportTests { }); try { - StringMessage message = res.txGet(); + StringMessageResponse message = res.txGet(); assertThat("exception should be thrown", false, equalTo(true)); } catch (Exception e) { assertThat(e, instanceOf(ReceiveTimeoutTransportException.class)); @@ -442,10 +440,10 @@ public abstract class AbstractSimpleTransportTests { final int counter = i; // now, try and send another request, this times, with a short timeout res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest(counter + "ms"), options().withTimeout(100), new BaseTransportResponseHandler() { + new StringMessageRequest(counter + "ms"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override - public StringMessage newInstance() { - return new StringMessage(); + public StringMessageResponse newInstance() { + return new StringMessageResponse(); } @Override @@ -454,7 +452,7 @@ public abstract class AbstractSimpleTransportTests { } @Override - public void handleResponse(StringMessage response) { + public void handleResponse(StringMessageResponse response) { assertThat("hello " + counter + "ms", equalTo(response.message)); } @@ -465,14 +463,14 @@ public abstract class AbstractSimpleTransportTests { } }); - StringMessage message = res.txGet(); + StringMessageResponse message = res.txGet(); assertThat(message.message, equalTo("hello " + counter + "ms")); } serviceA.removeHandler("sayHelloTimeoutDelayedResponse"); } - class StringMessageRequest extends TransportRequest { + static class StringMessageRequest extends TransportRequest { private String message; @@ -496,24 +494,26 @@ public abstract class AbstractSimpleTransportTests { } } - class StringMessage implements Streamable { + static class StringMessageResponse extends TransportResponse { private String message; - StringMessage(String message) { + StringMessageResponse(String message) { this.message = message; } - StringMessage() { + StringMessageResponse() { } @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); message = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeString(message); } }