introduce TransportResponse

a class that needs to be used when sending a response over the transport layer, with an option to have headers
This commit is contained in:
Shay Banon 2012-09-27 18:05:16 +02:00
parent 15453272f7
commit 613c70c289
91 changed files with 577 additions and 640 deletions

View File

@ -19,10 +19,24 @@
package org.elasticsearch.action;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
/**
*
*/
public interface ActionResponse extends Streamable {
public abstract class ActionResponse extends TransportResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

View File

@ -36,30 +36,19 @@ import static org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth.r
/**
*
*/
public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIndexHealth> {
public class ClusterHealthResponse extends ActionResponse implements Iterable<ClusterIndexHealth> {
private String clusterName;
int numberOfNodes = 0;
int numberOfDataNodes = 0;
int activeShards = 0;
int relocatingShards = 0;
int activePrimaryShards = 0;
int initializingShards = 0;
int unassignedShards = 0;
boolean timedOut = false;
ClusterHealthStatus status = ClusterHealthStatus.RED;
private List<String> validationFailures;
Map<String, ClusterIndexHealth> indices = Maps.newHashMap();
ClusterHealthResponse() {
@ -201,7 +190,8 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
@Override
public void readFrom(StreamInput in) throws IOException {
clusterName = in.readUTF();
super.readFrom(in);
clusterName = in.readString();
activePrimaryShards = in.readVInt();
activeShards = in.readVInt();
relocatingShards = in.readVInt();
@ -221,14 +211,15 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
validationFailures = ImmutableList.of();
} else {
for (int i = 0; i < size; i++) {
validationFailures.add(in.readUTF());
validationFailures.add(in.readString());
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(clusterName);
super.writeTo(out);
out.writeString(clusterName);
out.writeVInt(activePrimaryShards);
out.writeVInt(activeShards);
out.writeVInt(relocatingShards);

View File

@ -30,10 +30,9 @@ import java.io.IOException;
/**
*
*/
public class NodesShutdownResponse implements ActionResponse {
public class NodesShutdownResponse extends ActionResponse {
private ClusterName clusterName;
private DiscoveryNode[] nodes;
NodesShutdownResponse() {
@ -62,6 +61,7 @@ public class NodesShutdownResponse implements ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
nodes = new DiscoveryNode[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
@ -71,6 +71,7 @@ public class NodesShutdownResponse implements ActionResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
out.writeVInt(nodes.length);
for (DiscoveryNode node : nodes) {

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
@ -47,11 +46,8 @@ import java.util.concurrent.CountDownLatch;
public class TransportNodesShutdownAction extends TransportMasterNodeOperationAction<NodesShutdownRequest, NodesShutdownResponse> {
private final Node node;
private final ClusterName clusterName;
private final boolean disabled;
private final TimeValue delay;
@Inject
@ -128,9 +124,9 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
latch.countDown();
} else {
logger.trace("[cluster_shutdown]: sending shutdown request to [{}]", node);
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(VoidStreamable response) {
public void handleResponse(TransportResponse.Empty response) {
logger.trace("[cluster_shutdown]: received shutdown response from [{}]", node);
latch.countDown();
}
@ -152,9 +148,9 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
// now, kill the master
logger.trace("[cluster_shutdown]: shutting down the master [{}]", state.nodes().masterNode());
transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(VoidStreamable response) {
public void handleResponse(TransportResponse.Empty response) {
logger.trace("[cluster_shutdown]: received shutdown response from master");
}
@ -196,9 +192,9 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
}
logger.trace("[partial_cluster_shutdown]: sending shutdown request to [{}]", node);
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(VoidStreamable response) {
public void handleResponse(TransportResponse.Empty response) {
logger.trace("[partial_cluster_shutdown]: received shutdown response from [{}]", node);
latch.countDown();
}
@ -288,7 +284,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
});
t.start();
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

View File

@ -28,7 +28,7 @@ import java.io.IOException;
/**
*/
public class ClusterRerouteResponse implements ActionResponse {
public class ClusterRerouteResponse extends ActionResponse {
private ClusterState state;
@ -50,11 +50,13 @@ public class ClusterRerouteResponse implements ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
state = ClusterState.Builder.readFrom(in, null);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
ClusterState.Builder.writeTo(state, out);
}
}

View File

@ -22,23 +22,24 @@ package org.elasticsearch.action.admin.cluster.settings;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a cluster update settings action.
*/
public class ClusterUpdateSettingsResponse implements ActionResponse, Streamable {
public class ClusterUpdateSettingsResponse extends ActionResponse {
ClusterUpdateSettingsResponse() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

View File

@ -30,10 +30,9 @@ import java.io.IOException;
/**
*
*/
public class ClusterStateResponse implements ActionResponse {
public class ClusterStateResponse extends ActionResponse {
private ClusterName clusterName;
private ClusterState clusterState;
public ClusterStateResponse() {
@ -62,12 +61,14 @@ public class ClusterStateResponse implements ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
clusterState = ClusterState.Builder.readFrom(in, null);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
ClusterState.Builder.writeTo(clusterState, out);
}

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.alias;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a add/remove alias action.
*
*
*/
public class IndicesAliasesResponse implements ActionResponse, Streamable {
public class IndicesAliasesResponse extends ActionResponse {
private boolean acknowledged;
@ -54,11 +51,13 @@ public class IndicesAliasesResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -34,7 +34,7 @@ import java.util.List;
/**
*
*/
public class AnalyzeResponse implements ActionResponse, Iterable<AnalyzeResponse.AnalyzeToken>, ToXContent {
public class AnalyzeResponse extends ActionResponse implements Iterable<AnalyzeResponse.AnalyzeToken>, ToXContent {
public static class AnalyzeToken implements Streamable {
private String term;
@ -102,27 +102,20 @@ public class AnalyzeResponse implements ActionResponse, Iterable<AnalyzeResponse
@Override
public void readFrom(StreamInput in) throws IOException {
term = in.readUTF();
term = in.readString();
startOffset = in.readInt();
endOffset = in.readInt();
position = in.readVInt();
if (in.readBoolean()) {
type = in.readUTF();
}
type = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(term);
out.writeString(term);
out.writeInt(startOffset);
out.writeInt(endOffset);
out.writeVInt(position);
if (type == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(type);
}
out.writeOptionalString(type);
}
}
@ -186,6 +179,7 @@ public class AnalyzeResponse implements ActionResponse, Iterable<AnalyzeResponse
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
tokens = new ArrayList<AnalyzeToken>(size);
for (int i = 0; i < size; i++) {
@ -195,6 +189,7 @@ public class AnalyzeResponse implements ActionResponse, Iterable<AnalyzeResponse
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(tokens.size());
for (AnalyzeToken token : tokens) {
token.writeTo(out);

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.close;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a close index action.
*
*
*/
public class CloseIndexResponse implements ActionResponse, Streamable {
public class CloseIndexResponse extends ActionResponse {
private boolean acknowledged;
@ -52,11 +49,13 @@ public class CloseIndexResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a create index action.
*
*
*/
public class CreateIndexResponse implements ActionResponse, Streamable {
public class CreateIndexResponse extends ActionResponse {
private boolean acknowledged;
@ -60,11 +57,13 @@ public class CreateIndexResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a delete index action.
*
*
*/
public class DeleteIndexResponse implements ActionResponse, Streamable {
public class DeleteIndexResponse extends ActionResponse {
private boolean acknowledged;
@ -60,11 +57,13 @@ public class DeleteIndexResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -22,11 +22,10 @@ package org.elasticsearch.action.admin.indices.exists.indices;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
public class IndicesExistsResponse implements ActionResponse, Streamable {
public class IndicesExistsResponse extends ActionResponse {
private boolean exists;
@ -47,11 +46,13 @@ public class IndicesExistsResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
exists = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(exists);
}
}

View File

@ -22,14 +22,13 @@ package org.elasticsearch.action.admin.indices.exists.types;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* Whether all of the existed types exist.
*/
public class TypesExistsResponse implements ActionResponse, Streamable {
public class TypesExistsResponse extends ActionResponse {
private boolean exists;
@ -49,10 +48,12 @@ public class TypesExistsResponse implements ActionResponse, Streamable {
}
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
exists = in.readBoolean();
}
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(exists);
}
}

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.mapping.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* The response of remove mapping operation.
*
*
*/
public class DeleteMappingResponse implements ActionResponse, Streamable {
public class DeleteMappingResponse extends ActionResponse {
DeleteMappingResponse() {
@ -39,9 +36,11 @@ public class DeleteMappingResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.mapping.put;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* The response of put mapping operation.
*
*
*/
public class PutMappingResponse implements ActionResponse, Streamable {
public class PutMappingResponse extends ActionResponse {
private boolean acknowledged;
@ -61,11 +58,13 @@ public class PutMappingResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.open;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a open index action.
*
*
*/
public class OpenIndexResponse implements ActionResponse, Streamable {
public class OpenIndexResponse extends ActionResponse {
private boolean acknowledged;
@ -52,11 +49,13 @@ public class OpenIndexResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -22,25 +22,24 @@ package org.elasticsearch.action.admin.indices.settings;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a update settings action.
*
*
*/
public class UpdateSettingsResponse implements ActionResponse, Streamable {
public class UpdateSettingsResponse extends ActionResponse {
UpdateSettingsResponse() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.template.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a delete index template.
*
*
*/
public class DeleteIndexTemplateResponse implements ActionResponse, Streamable {
public class DeleteIndexTemplateResponse extends ActionResponse {
private boolean acknowledged;
@ -52,11 +49,13 @@ public class DeleteIndexTemplateResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -22,16 +22,13 @@ package org.elasticsearch.action.admin.indices.template.put;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a put index template action.
*
*
*/
public class PutIndexTemplateResponse implements ActionResponse, Streamable {
public class PutIndexTemplateResponse extends ActionResponse {
private boolean acknowledged;
@ -52,11 +49,13 @@ public class PutIndexTemplateResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -22,14 +22,13 @@ package org.elasticsearch.action.admin.indices.warmer.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a delete warmer.
*/
public class DeleteWarmerResponse implements ActionResponse, Streamable {
public class DeleteWarmerResponse extends ActionResponse {
private boolean acknowledged;
@ -50,11 +49,13 @@ public class DeleteWarmerResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -22,14 +22,13 @@ package org.elasticsearch.action.admin.indices.warmer.put;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* The response of put warmer operation.
*/
public class PutWarmerResponse implements ActionResponse, Streamable {
public class PutWarmerResponse extends ActionResponse {
private boolean acknowledged;
@ -57,11 +56,13 @@ public class PutWarmerResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
}

View File

@ -32,13 +32,10 @@ import java.util.Iterator;
* A response of a bulk execution. Holding a response for each item responding (in order) of the
* bulk requests. Each item holds the index/type/id is operated on, and if it failed or not (with the
* failure message).
*
*
*/
public class BulkResponse implements ActionResponse, Iterable<BulkItemResponse> {
public class BulkResponse extends ActionResponse implements Iterable<BulkItemResponse> {
private BulkItemResponse[] responses;
private long tookInMillis;
BulkResponse() {
@ -117,6 +114,7 @@ public class BulkResponse implements ActionResponse, Iterable<BulkItemResponse>
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = BulkItemResponse.readBulkItem(in);
@ -126,6 +124,7 @@ public class BulkResponse implements ActionResponse, Iterable<BulkItemResponse>
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(responses.length);
for (BulkItemResponse response : responses) {
response.writeTo(out);

View File

@ -29,10 +29,9 @@ import java.io.IOException;
/**
*
*/
public class BulkShardResponse implements ActionResponse {
public class BulkShardResponse extends ActionResponse {
private ShardId shardId;
private BulkItemResponse[] responses;
BulkShardResponse() {
@ -53,6 +52,7 @@ public class BulkShardResponse implements ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
@ -62,6 +62,7 @@ public class BulkShardResponse implements ActionResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeVInt(responses.length);
for (BulkItemResponse response : responses) {

View File

@ -22,27 +22,21 @@ package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* The response of the delete action.
*
*
* @see org.elasticsearch.action.delete.DeleteRequest
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
*/
public class DeleteResponse implements ActionResponse, Streamable {
public class DeleteResponse extends ActionResponse {
private String index;
private String id;
private String type;
private long version;
private boolean notFound;
public DeleteResponse() {
@ -129,18 +123,20 @@ public class DeleteResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
id = in.readUTF();
type = in.readUTF();
super.readFrom(in);
index = in.readString();
id = in.readString();
type = in.readString();
version = in.readLong();
notFound = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(id);
out.writeUTF(type);
super.writeTo(out);
out.writeString(index);
out.writeString(id);
out.writeString(type);
out.writeLong(version);
out.writeBoolean(notFound);
}

View File

@ -22,23 +22,17 @@ package org.elasticsearch.action.delete.index;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* Delete by query response executed on a specific index.
*
*
*/
public class IndexDeleteResponse implements ActionResponse, Streamable {
public class IndexDeleteResponse extends ActionResponse {
private String index;
private int successfulShards;
private int failedShards;
private ShardDeleteResponse[] deleteResponses;
IndexDeleteResponse(String index, int successfulShards, int failedShards, ShardDeleteResponse[] deleteResponses) {
@ -114,7 +108,8 @@ public class IndexDeleteResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
super.readFrom(in);
index = in.readString();
successfulShards = in.readVInt();
failedShards = in.readVInt();
deleteResponses = new ShardDeleteResponse[in.readVInt()];
@ -126,7 +121,8 @@ public class IndexDeleteResponse implements ActionResponse, Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
super.writeTo(out);
out.writeString(index);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);
out.writeVInt(deleteResponses.length);

View File

@ -22,19 +22,15 @@ package org.elasticsearch.action.delete.index;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* Delete response executed on a specific shard.
*
*
*/
public class ShardDeleteResponse implements ActionResponse, Streamable {
public class ShardDeleteResponse extends ActionResponse {
private long version;
private boolean notFound;
public ShardDeleteResponse() {
@ -55,12 +51,14 @@ public class ShardDeleteResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
version = in.readLong();
notFound = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(version);
out.writeBoolean(notFound);
}

View File

@ -33,7 +33,7 @@ import static com.google.common.collect.Maps.newHashMap;
* The response of delete by query action. Holds the {@link IndexDeleteByQueryResponse}s from all the
* different indices.
*/
public class DeleteByQueryResponse implements ActionResponse, Iterable<IndexDeleteByQueryResponse> {
public class DeleteByQueryResponse extends ActionResponse implements Iterable<IndexDeleteByQueryResponse> {
private Map<String, IndexDeleteByQueryResponse> indices = newHashMap();
@ -69,6 +69,7 @@ public class DeleteByQueryResponse implements ActionResponse, Iterable<IndexDele
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
@ -79,6 +80,7 @@ public class DeleteByQueryResponse implements ActionResponse, Iterable<IndexDele
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(indices.size());
for (IndexDeleteByQueryResponse indexResponse : indices.values()) {
indexResponse.writeTo(out);

View File

@ -22,21 +22,16 @@ package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* Delete by query response executed on a specific index.
*
*
*/
public class IndexDeleteByQueryResponse implements ActionResponse, Streamable {
public class IndexDeleteByQueryResponse extends ActionResponse {
private String index;
private int successfulShards;
private int failedShards;
IndexDeleteByQueryResponse(String index, int successfulShards, int failedShards) {
@ -107,14 +102,16 @@ public class IndexDeleteByQueryResponse implements ActionResponse, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
super.readFrom(in);
index = in.readString();
successfulShards = in.readVInt();
failedShards = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
super.writeTo(out);
out.writeString(index);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);
}

View File

@ -22,22 +22,21 @@ package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* Delete by query response executed on a specific shard.
*
*
*/
public class ShardDeleteByQueryResponse implements ActionResponse, Streamable {
public class ShardDeleteByQueryResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

View File

@ -33,7 +33,7 @@ import static org.elasticsearch.common.lucene.Lucene.writeExplanation;
/**
* Response containing the score explanation.
*/
public class ExplainResponse implements ActionResponse {
public class ExplainResponse extends ActionResponse {
private boolean exists;
private Explanation explanation;
@ -94,6 +94,7 @@ public class ExplainResponse implements ActionResponse {
}
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
exists = in.readBoolean();
if (in.readBoolean()) {
explanation = readExplanation(in);
@ -104,6 +105,7 @@ public class ExplainResponse implements ActionResponse {
}
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(exists);
if (explanation == null) {
out.writeBoolean(false);

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.get.GetField;
@ -40,7 +39,7 @@ import java.util.Map;
* @see GetRequest
* @see org.elasticsearch.client.Client#get(GetRequest)
*/
public class GetResponse implements ActionResponse, Streamable, Iterable<GetField>, ToXContent {
public class GetResponse extends ActionResponse implements Iterable<GetField>, ToXContent {
private GetResult getResult;
@ -203,11 +202,13 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
getResult = GetResult.readGetResult(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
getResult.writeTo(out);
}
}

View File

@ -31,7 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.Iterator;
public class MultiGetResponse implements ActionResponse, Iterable<MultiGetItemResponse>, ToXContent {
public class MultiGetResponse extends ActionResponse implements Iterable<MultiGetItemResponse>, ToXContent {
/**
* Represents a failure.
@ -117,25 +117,18 @@ public class MultiGetResponse implements ActionResponse, Iterable<MultiGetItemRe
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
if (in.readBoolean()) {
type = in.readUTF();
}
id = in.readUTF();
message = in.readUTF();
index = in.readString();
type = in.readOptionalString();
id = in.readString();
message = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
if (type == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(type);
}
out.writeUTF(id);
out.writeUTF(message);
out.writeString(index);
out.writeOptionalString(type);
out.writeString(id);
out.writeString(message);
}
}
@ -190,6 +183,7 @@ public class MultiGetResponse implements ActionResponse, Iterable<MultiGetItemRe
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
responses = new MultiGetItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = MultiGetItemResponse.readItemResponse(in);
@ -198,6 +192,7 @@ public class MultiGetResponse implements ActionResponse, Iterable<MultiGetItemRe
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(responses.length);
for (MultiGetItemResponse response : responses) {
response.writeTo(out);

View File

@ -28,7 +28,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class MultiGetShardResponse implements ActionResponse {
public class MultiGetShardResponse extends ActionResponse {
TIntArrayList locations;
List<GetResponse> responses;
@ -54,6 +54,7 @@ public class MultiGetShardResponse implements ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
locations = new TIntArrayList(size);
responses = new ArrayList<GetResponse>(size);
@ -77,6 +78,7 @@ public class MultiGetShardResponse implements ActionResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(locations.size());
for (int i = 0; i < locations.size(); i++) {
out.writeVInt(locations.get(i));

View File

@ -34,16 +34,12 @@ import java.util.List;
* @see org.elasticsearch.action.index.IndexRequest
* @see org.elasticsearch.client.Client#index(IndexRequest)
*/
public class IndexResponse implements ActionResponse {
public class IndexResponse extends ActionResponse {
private String index;
private String id;
private String type;
private long version;
private List<String> matches;
public IndexResponse() {
@ -136,28 +132,29 @@ public class IndexResponse implements ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
id = in.readUTF();
type = in.readUTF();
super.readFrom(in);
index = in.readString();
id = in.readString();
type = in.readString();
version = in.readLong();
if (in.readBoolean()) {
int size = in.readVInt();
if (size == 0) {
matches = ImmutableList.of();
} else if (size == 1) {
matches = ImmutableList.of(in.readUTF());
matches = ImmutableList.of(in.readString());
} else if (size == 2) {
matches = ImmutableList.of(in.readUTF(), in.readUTF());
matches = ImmutableList.of(in.readString(), in.readString());
} else if (size == 3) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF());
matches = ImmutableList.of(in.readString(), in.readString(), in.readString());
} else if (size == 4) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString());
} else if (size == 5) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString(), in.readString());
} else {
matches = new ArrayList<String>();
for (int i = 0; i < size; i++) {
matches.add(in.readUTF());
matches.add(in.readString());
}
}
}
@ -165,9 +162,10 @@ public class IndexResponse implements ActionResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(id);
out.writeUTF(type);
super.writeTo(out);
out.writeString(index);
out.writeString(id);
out.writeString(type);
out.writeLong(version);
if (matches == null) {
out.writeBoolean(false);
@ -175,7 +173,7 @@ public class IndexResponse implements ActionResponse {
out.writeBoolean(true);
out.writeVInt(matches.size());
for (String match : matches) {
out.writeUTF(match);
out.writeString(match);
}
}
}

View File

@ -31,7 +31,7 @@ import java.util.List;
/**
*
*/
public class PercolateResponse implements ActionResponse, Iterable<String> {
public class PercolateResponse extends ActionResponse implements Iterable<String> {
private List<String> matches;
@ -54,18 +54,20 @@ public class PercolateResponse implements ActionResponse, Iterable<String> {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
matches = new ArrayList<String>(size);
for (int i = 0; i < size; i++) {
matches.add(in.readUTF());
matches.add(in.readString());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(matches.size());
for (String match : matches) {
out.writeUTF(match);
out.writeString(match);
}
}
}

View File

@ -16,7 +16,7 @@ import java.util.Iterator;
/**
* A multi search response.
*/
public class MultiSearchResponse implements ActionResponse, Iterable<MultiSearchResponse.Item>, ToXContent {
public class MultiSearchResponse extends ActionResponse implements Iterable<MultiSearchResponse.Item>, ToXContent {
/**
* A search response item, holding the actual search response, or an error message if it failed.
@ -85,7 +85,7 @@ public class MultiSearchResponse implements ActionResponse, Iterable<MultiSearch
this.response = new SearchResponse();
response.readFrom(in);
} else {
failureMessage = in.readUTF();
failureMessage = in.readString();
}
}
@ -95,7 +95,7 @@ public class MultiSearchResponse implements ActionResponse, Iterable<MultiSearch
out.writeBoolean(true);
response.writeTo(out);
} else {
out.writeUTF(failureMessage);
out.writeString(failureMessage);
}
}
}
@ -130,6 +130,7 @@ public class MultiSearchResponse implements ActionResponse, Iterable<MultiSearch
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
items = new Item[in.readVInt()];
for (int i = 0; i < items.length; i++) {
items[i] = Item.readItem(in);
@ -138,6 +139,7 @@ public class MultiSearchResponse implements ActionResponse, Iterable<MultiSearch
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(items.length);
for (Item item : items) {
item.writeTo(out);

View File

@ -40,7 +40,7 @@ import static org.elasticsearch.search.internal.InternalSearchResponse.readInter
/**
* A response of a search request.
*/
public class SearchResponse implements ActionResponse, ToXContent {
public class SearchResponse extends ActionResponse implements ToXContent {
private InternalSearchResponse internalResponse;
@ -281,6 +281,7 @@ public class SearchResponse implements ActionResponse, ToXContent {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
internalResponse = readInternalSearchResponse(in);
totalShards = in.readVInt();
successfulShards = in.readVInt();
@ -293,14 +294,13 @@ public class SearchResponse implements ActionResponse, ToXContent {
shardFailures[i] = readShardSearchFailure(in);
}
}
if (in.readBoolean()) {
scrollId = in.readUTF();
}
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
internalResponse.writeTo(out);
out.writeVInt(totalShards);
out.writeVInt(successfulShards);
@ -310,12 +310,7 @@ public class SearchResponse implements ActionResponse, ToXContent {
shardSearchFailure.writeTo(out);
}
if (scrollId == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(scrollId);
}
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
}

View File

@ -33,17 +33,12 @@ import static org.elasticsearch.action.support.DefaultShardOperationFailedExcept
/**
* Base class for all broadcast operation based responses.
*
*
*/
public abstract class BroadcastOperationResponse implements ActionResponse {
public abstract class BroadcastOperationResponse extends ActionResponse {
private int totalShards;
private int successfulShards;
private int failedShards;
private List<ShardOperationFailedException> shardFailures = ImmutableList.of();
protected BroadcastOperationResponse() {
@ -120,6 +115,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
totalShards = in.readVInt();
successfulShards = in.readVInt();
failedShards = in.readVInt();
@ -134,6 +130,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(totalShards);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);

View File

@ -21,17 +21,16 @@ package org.elasticsearch.action.support.broadcast;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
/**
*
*/
public abstract class BroadcastShardOperationResponse implements Streamable {
public abstract class BroadcastShardOperationResponse extends TransportResponse {
String index;
int shardId;
protected BroadcastShardOperationResponse() {
@ -61,13 +60,15 @@ public abstract class BroadcastShardOperationResponse implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
super.readFrom(in);
index = in.readString();
shardId = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
super.writeTo(out);
out.writeString(index);
out.writeVInt(shardId);
}
}

View File

@ -22,16 +22,14 @@ package org.elasticsearch.action.support.nodes;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
/**
* A base class for node level operations.
*
*
*/
public abstract class NodeOperationResponse implements Streamable {
public abstract class NodeOperationResponse extends TransportResponse {
private DiscoveryNode node;
@ -58,11 +56,13 @@ public abstract class NodeOperationResponse implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
node = DiscoveryNode.readNode(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
node.writeTo(out);
}
}

View File

@ -32,12 +32,10 @@ import java.util.Map;
/**
*
*/
public abstract class NodesOperationResponse<NodeResponse extends NodeOperationResponse> implements ActionResponse, Iterable<NodeResponse> {
public abstract class NodesOperationResponse<NodeResponse extends NodeOperationResponse> extends ActionResponse implements Iterable<NodeResponse> {
private ClusterName clusterName;
protected NodeResponse[] nodes;
private Map<String, NodeResponse> nodesMap;
protected NodesOperationResponse() {
@ -89,11 +87,13 @@ public abstract class NodesOperationResponse<NodeResponse extends NodeOperationR
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
}
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexShardMissingException;
@ -253,7 +252,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
shardOperationOnReplica(request);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -631,9 +630,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(VoidStreamable vResponse) {
public void handleResponse(TransportResponse.Empty vResponse) {
finishIfPossible();
}

View File

@ -31,18 +31,13 @@ import java.util.List;
/**
*/
public class UpdateResponse implements ActionResponse {
public class UpdateResponse extends ActionResponse {
private String index;
private String id;
private String type;
private long version;
private List<String> matches;
private GetResult getResult;
public UpdateResponse() {
@ -147,28 +142,29 @@ public class UpdateResponse implements ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
id = in.readUTF();
type = in.readUTF();
super.readFrom(in);
index = in.readString();
id = in.readString();
type = in.readString();
version = in.readLong();
if (in.readBoolean()) {
int size = in.readVInt();
if (size == 0) {
matches = ImmutableList.of();
} else if (size == 1) {
matches = ImmutableList.of(in.readUTF());
matches = ImmutableList.of(in.readString());
} else if (size == 2) {
matches = ImmutableList.of(in.readUTF(), in.readUTF());
matches = ImmutableList.of(in.readString(), in.readString());
} else if (size == 3) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF());
matches = ImmutableList.of(in.readString(), in.readString(), in.readString());
} else if (size == 4) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString());
} else if (size == 5) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString(), in.readString());
} else {
matches = new ArrayList<String>();
for (int i = 0; i < size; i++) {
matches.add(in.readUTF());
matches.add(in.readString());
}
}
}
@ -179,9 +175,10 @@ public class UpdateResponse implements ActionResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(id);
out.writeUTF(type);
super.writeTo(out);
out.writeString(index);
out.writeString(id);
out.writeString(type);
out.writeLong(version);
if (matches == null) {
out.writeBoolean(false);
@ -189,7 +186,7 @@ public class UpdateResponse implements ActionResponse {
out.writeBoolean(true);
out.writeVInt(matches.size());
for (String match : matches) {
out.writeUTF(match);
out.writeString(match);
}
}
if (getResult == null) {

View File

@ -104,13 +104,15 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
return new MappingUpdatedResponse();
}
public static class MappingUpdatedResponse implements ActionResponse {
public static class MappingUpdatedResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
@ -86,7 +85,7 @@ public class NodeAliasesUpdatedAction extends AbstractComponent {
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeAliasesUpdatedTransportHandler.ACTION, response, VoidTransportResponseHandler.INSTANCE_SAME);
NodeAliasesUpdatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
@ -115,7 +114,7 @@ public class NodeAliasesUpdatedAction extends AbstractComponent {
@Override
public void messageReceived(NodeAliasesUpdatedResponse response, TransportChannel channel) throws Exception {
innerNodeAliasesUpdated(response);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -76,7 +75,7 @@ public class NodeIndexCreatedAction extends AbstractComponent {
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeIndexCreatedTransportHandler.ACTION, new NodeIndexCreatedMessage(index, nodeId), VoidTransportResponseHandler.INSTANCE_SAME);
NodeIndexCreatedTransportHandler.ACTION, new NodeIndexCreatedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
@ -102,7 +101,7 @@ public class NodeIndexCreatedAction extends AbstractComponent {
@Override
public void messageReceived(NodeIndexCreatedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexCreated(message.index, message.nodeId);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -76,7 +75,7 @@ public class NodeIndexDeletedAction extends AbstractComponent {
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeIndexDeletedTransportHandler.ACTION, new NodeIndexDeletedMessage(index, nodeId), VoidTransportResponseHandler.INSTANCE_SAME);
NodeIndexDeletedTransportHandler.ACTION, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
@ -102,7 +101,7 @@ public class NodeIndexDeletedAction extends AbstractComponent {
@Override
public void messageReceived(NodeIndexDeletedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexDeleted(message.index, message.nodeId);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
@ -86,7 +85,7 @@ public class NodeMappingCreatedAction extends AbstractComponent {
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeMappingCreatedTransportHandler.ACTION, response, VoidTransportResponseHandler.INSTANCE_SAME);
NodeMappingCreatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
@ -115,7 +114,7 @@ public class NodeMappingCreatedAction extends AbstractComponent {
@Override
public void messageReceived(NodeMappingCreatedResponse response, TransportChannel channel) throws Exception {
innerNodeIndexCreated(response);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -69,7 +68,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeMappingRefreshTransportHandler.ACTION, request, VoidTransportResponseHandler.INSTANCE_SAME);
NodeMappingRefreshTransportHandler.ACTION, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
@ -89,7 +88,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
@Override
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
innerMappingRefresh(request);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
@ -54,11 +53,8 @@ import static org.elasticsearch.cluster.routing.ImmutableShardRouting.readShardR
public class ShardStateAction extends AbstractComponent {
private final TransportService transportService;
private final ClusterService clusterService;
private final AllocationService allocationService;
private final ThreadPool threadPool;
private final BlockingQueue<ShardRouting> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
@ -83,7 +79,7 @@ public class ShardStateAction extends AbstractComponent {
innerShardFailed(shardRouting, reason);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to [{}]", exp, clusterService.state().nodes().masterNode());
@ -101,7 +97,7 @@ public class ShardStateAction extends AbstractComponent {
innerShardStarted(shardRouting, reason);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, clusterService.state().nodes().masterNode());
@ -206,7 +202,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardFailed(request.shardRouting, request.reason);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
@ -227,7 +223,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardStarted(request.shardRouting, request.reason);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -1,55 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.IOException;
/**
*
*/
public class LongStreamable implements Streamable {
private long value;
public LongStreamable() {
}
public LongStreamable(long value) {
this.value = value;
}
public void set(long newValue) {
value = newValue;
}
public long get() {
return this.value;
}
@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value);
}
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.IOException;
/**
*
*/
public class StringStreamable implements Streamable {
private String value;
public StringStreamable() {
}
public StringStreamable(String value) {
this.value = value;
}
public void set(String newValue) {
value = newValue;
}
public String get() {
return this.value;
}
@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readUTF();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(value);
}
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.IOException;
/**
*
*/
public class VoidStreamable implements Streamable {
public static final VoidStreamable INSTANCE = new VoidStreamable();
@Override
public void readFrom(StreamInput in) throws IOException {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
}

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
@ -509,7 +508,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]");
} else {
logger.warn("received cluster state from [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster", newState.nodes().masterNode(), newState.nodes().masterNode());
transportService.sendRequest(newState.nodes().masterNode(), RejoinClusterRequestHandler.ACTION, new RejoinClusterRequest(currentState.nodes().localNodeId()), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(newState.nodes().masterNode(), RejoinClusterRequestHandler.ACTION, new RejoinClusterRequest(currentState.nodes().localNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send rejoin request to [{}]", exp, newState.nodes().masterNode());
@ -808,7 +807,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Override
public ClusterState execute(ClusterState currentState) {
try {
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {
logger.warn("failed to send response on rejoin cluster request handling", e);
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
@ -424,7 +423,7 @@ public class MasterFaultDetection extends AbstractComponent {
}
}
private static class MasterPingResponseResponse implements Streamable {
private static class MasterPingResponseResponse extends TransportResponse {
private boolean connectedToMaster;
@ -437,11 +436,13 @@ public class MasterFaultDetection extends AbstractComponent {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
connectedToMaster = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(connectedToMaster);
}
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
@ -330,17 +329,19 @@ public class NodesFaultDetection extends AbstractComponent {
}
}
private static class PingResponse implements Streamable {
private static class PingResponse extends TransportResponse {
private PingResponse() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
}

View File

@ -25,8 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
@ -71,15 +69,15 @@ public class MembershipAction extends AbstractComponent {
}
public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(node, LeaveRequestRequestHandler.ACTION, new LeaveRequest(masterNode), VoidTransportResponseHandler.INSTANCE_SAME);
transportService.sendRequest(node, LeaveRequestRequestHandler.ACTION, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME);
}
public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException {
transportService.submitRequest(masterNode, LeaveRequestRequestHandler.ACTION, new LeaveRequest(node), VoidTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
transportService.submitRequest(masterNode, LeaveRequestRequestHandler.ACTION, new LeaveRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), VoidTransportResponseHandler.INSTANCE_SAME);
transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), EmptyTransportResponseHandler.INSTANCE_SAME);
}
public ClusterState sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException {
@ -95,7 +93,7 @@ public class MembershipAction extends AbstractComponent {
* Validates the join request, throwing a failure if it failed.
*/
public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState clusterState, TimeValue timeout) throws ElasticSearchException {
transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(clusterState), VoidTransportResponseHandler.INSTANCE_SAME)
transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(clusterState), EmptyTransportResponseHandler.INSTANCE_SAME)
.txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
@ -128,7 +126,7 @@ public class MembershipAction extends AbstractComponent {
}
}
class JoinResponse implements Streamable {
class JoinResponse extends TransportResponse {
ClusterState clusterState;
@ -141,11 +139,13 @@ public class MembershipAction extends AbstractComponent {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
ClusterState.Builder.writeTo(clusterState, out);
}
}
@ -165,7 +165,7 @@ public class MembershipAction extends AbstractComponent {
if (request.withClusterState) {
channel.sendResponse(new JoinResponse(clusterState));
} else {
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -211,7 +211,7 @@ public class MembershipAction extends AbstractComponent {
@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
// for now, the mere fact that we can serialize the cluster state acts as validation....
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
@ -256,7 +256,7 @@ public class MembershipAction extends AbstractComponent {
@Override
public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception {
listener.onLeave(request.node);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -316,7 +316,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
} else {
responses.put(request.pingResponse.target(), request.pingResponse);
}
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
@ -534,7 +534,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
@ -546,7 +546,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
});
} else {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
@ -431,7 +430,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
}
static class UnicastPingResponse implements Streamable {
static class UnicastPingResponse extends TransportResponse {
int id;
@ -442,6 +441,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readInt();
pingResponses = new PingResponse[in.readVInt()];
for (int i = 0; i < pingResponses.length; i++) {
@ -451,6 +451,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(id);
out.writeVInt(pingResponses.length);
for (PingResponse pingResponse : pingResponses) {

View File

@ -27,7 +27,10 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
@ -95,7 +98,7 @@ public class PublishClusterStateAction extends AbstractComponent {
new PublishClusterStateRequest(entry.bytes().bytes()),
TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes
new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
@ -156,7 +159,7 @@ public class PublishClusterStateAction extends AbstractComponent {
in.setVersion(request.version);
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
listener.onNewClusterState(clusterState);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -33,7 +33,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.threadpool.ThreadPool;
@ -202,7 +201,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
}
}
public static class AllocateDangledResponse implements Streamable {
public static class AllocateDangledResponse extends TransportResponse {
private boolean ack;
@ -219,11 +218,13 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
ack = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(ack);
}
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.indices.recovery;
import com.google.common.collect.Lists;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.List;
@ -30,7 +30,7 @@ import java.util.List;
/**
*
*/
class RecoveryResponse implements Streamable {
class RecoveryResponse extends TransportResponse {
List<String> phase1FileNames = Lists.newArrayList();
List<Long> phase1FileSizes = Lists.newArrayList();
@ -54,6 +54,7 @@ class RecoveryResponse implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
phase1FileNames = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
@ -89,6 +90,7 @@ class RecoveryResponse implements Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(phase1FileNames.size());
for (String name : phase1FileNames) {
out.writeUTF(name);

View File

@ -122,7 +122,7 @@ public class RecoverySource extends AbstractComponent {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), response.phase1FileNames, response.phase1FileSizes,
response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE_SAME).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
@ -157,7 +157,7 @@ public class RecoverySource extends AbstractComponent {
indexInput.readBytes(buf, 0, toRead, false);
BytesArray content = new BytesArray(buf, 0, toRead);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), name, position, len, md.checksum(), content),
TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
readCount += toRead;
}
indexInput.close();
@ -185,7 +185,7 @@ public class RecoverySource extends AbstractComponent {
// now, set the clean files request
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
@ -202,7 +202,7 @@ public class RecoverySource extends AbstractComponent {
}
logger.trace("[{}][{}] recovery [phase2] to {}: start", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
stopWatch.stop();
response.startTime = stopWatch.totalTime().millis();
logger.trace("[{}][{}] recovery [phase2] to {}: start took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
@ -224,7 +224,7 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
@ -262,7 +262,7 @@ public class RecoverySource extends AbstractComponent {
}
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
ops = 0;
size = 0;
operations.clear();
@ -271,7 +271,7 @@ public class RecoverySource extends AbstractComponent {
// send the leftover
if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
return totalOperations;
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -373,7 +372,7 @@ public class RecoveryTarget extends AbstractComponent {
onGoingRecovery.stage = RecoveryStatus.Stage.TRANSLOG;
onGoingRecovery.indexShard.performRecoveryPrepareForTranslog();
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -405,7 +404,7 @@ public class RecoveryTarget extends AbstractComponent {
onGoingRecovery.indexShard.performRecoveryFinalization(false, onGoingRecovery);
onGoingRecovery.time = System.currentTimeMillis() - onGoingRecovery.startTime;
onGoingRecovery.stage = RecoveryStatus.Stage.DONE;
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -443,7 +442,7 @@ public class RecoveryTarget extends AbstractComponent {
shard.performRecoveryOperation(operation);
onGoingRecovery.currentTranslogOperations++;
}
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -478,7 +477,7 @@ public class RecoveryTarget extends AbstractComponent {
onGoingRecovery.phase1TotalSize = request.phase1TotalSize;
onGoingRecovery.phase1ExistingTotalSize = request.phase1ExistingTotalSize;
onGoingRecovery.stage = RecoveryStatus.Stage.INDEX;
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -549,7 +548,7 @@ public class RecoveryTarget extends AbstractComponent {
}
}
}
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -646,7 +645,7 @@ public class RecoveryTarget extends AbstractComponent {
throw e;
}
}
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}

View File

@ -24,12 +24,15 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StringStreamable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.jmx.JmxService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
/**
*
*/
@ -56,12 +59,12 @@ public class GetJmxServiceUrlAction extends AbstractComponent {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
return jmxService.publishUrl();
} else {
return transportService.submitRequest(node, GetJmxServiceUrlTransportHandler.ACTION, TransportRequest.Empty.INSTANCE, new FutureTransportResponseHandler<StringStreamable>() {
return transportService.submitRequest(node, GetJmxServiceUrlTransportHandler.ACTION, TransportRequest.Empty.INSTANCE, new FutureTransportResponseHandler<GetJmxServiceUrlResponse>() {
@Override
public StringStreamable newInstance() {
return new StringStreamable();
public GetJmxServiceUrlResponse newInstance() {
return new GetJmxServiceUrlResponse();
}
}).txGet().get();
}).txGet().url();
}
}
@ -81,7 +84,35 @@ public class GetJmxServiceUrlAction extends AbstractComponent {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) throws Exception {
channel.sendResponse(new StringStreamable(jmxService.publishUrl()));
channel.sendResponse(new GetJmxServiceUrlResponse(jmxService.publishUrl()));
}
}
static class GetJmxServiceUrlResponse extends TransportResponse {
private String url;
GetJmxServiceUrlResponse() {
}
GetJmxServiceUrlResponse(String url) {
this.url = url;
}
public String url() {
return this.url;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
url = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(url);
}
}
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -76,7 +75,7 @@ public class PublishRiverClusterStateAction extends AbstractComponent {
continue;
}
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
@ -126,7 +125,7 @@ public class PublishRiverClusterStateAction extends AbstractComponent {
@Override
public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
listener.onNewClusterState(request.clusterState);
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
@ -51,7 +50,7 @@ import java.io.IOException;
*/
public class SearchServiceTransportAction extends AbstractComponent {
static final class FreeContextResponseHandler extends VoidTransportResponseHandler {
static final class FreeContextResponseHandler extends EmptyTransportResponseHandler {
private final ESLogger logger;
@ -483,7 +482,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
@Override
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel) throws Exception {
searchService.freeContext(request.id());
channel.sendResponse(VoidStreamable.INSTANCE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override

View File

@ -24,26 +24,22 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
/**
*
*/
public class DfsSearchResult implements SearchPhaseResult {
public class DfsSearchResult extends TransportResponse implements SearchPhaseResult {
private static Term[] EMPTY_TERMS = new Term[0];
private static int[] EMPTY_FREQS = new int[0];
private SearchShardTarget shardTarget;
private long id;
private Term[] terms;
private int[] freqs;
private int maxDoc;
public DfsSearchResult() {
@ -99,6 +95,7 @@ public class DfsSearchResult implements SearchPhaseResult {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
// shardTarget = readSearchShardTarget(in);
int termsSize = in.readVInt();
@ -124,6 +121,7 @@ public class DfsSearchResult implements SearchPhaseResult {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
// shardTarget.writeTo(out);
out.writeVInt(terms.length);

View File

@ -21,9 +21,9 @@ package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
@ -32,14 +32,11 @@ import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext
/**
*
*/
public class FetchSearchResult implements Streamable, FetchSearchResultProvider {
public class FetchSearchResult extends TransportResponse implements FetchSearchResultProvider {
private long id;
private SearchShardTarget shardTarget;
private InternalSearchHits hits;
// client side counter
private transient int counter;
@ -95,12 +92,14 @@ public class FetchSearchResult implements Streamable, FetchSearchResultProvider
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
}

View File

@ -21,10 +21,10 @@ package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
@ -34,10 +34,9 @@ import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchRe
/**
*
*/
public class QueryFetchSearchResult implements Streamable, QuerySearchResultProvider, FetchSearchResultProvider {
public class QueryFetchSearchResult extends TransportResponse implements QuerySearchResultProvider, FetchSearchResultProvider {
private QuerySearchResult queryResult;
private FetchSearchResult fetchResult;
public QueryFetchSearchResult() {
@ -84,12 +83,14 @@ public class QueryFetchSearchResult implements Streamable, QuerySearchResultProv
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
queryResult = readQuerySearchResult(in);
fetchResult = readFetchSearchResult(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
queryResult.writeTo(out);
fetchResult.writeTo(out);
}

View File

@ -21,8 +21,8 @@ package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
@ -32,10 +32,9 @@ import static org.elasticsearch.search.fetch.QueryFetchSearchResult.readQueryFet
/**
*
*/
public class ScrollQueryFetchSearchResult implements Streamable {
public class ScrollQueryFetchSearchResult extends TransportResponse {
private QueryFetchSearchResult result;
private SearchShardTarget shardTarget;
public ScrollQueryFetchSearchResult() {
@ -56,6 +55,7 @@ public class ScrollQueryFetchSearchResult implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardTarget = readSearchShardTarget(in);
result = readQueryFetchSearchResult(in);
result.shardTarget(shardTarget);
@ -63,6 +63,7 @@ public class ScrollQueryFetchSearchResult implements Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardTarget.writeTo(out);
result.writeTo(out);
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.query;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.facet.Facets;
import org.elasticsearch.search.facet.InternalFacets;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
@ -35,20 +35,14 @@ import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
/**
*
*/
public class QuerySearchResult implements Streamable, QuerySearchResultProvider {
public class QuerySearchResult extends TransportResponse implements QuerySearchResultProvider {
private long id;
private SearchShardTarget shardTarget;
private int from;
private int size;
private TopDocs topDocs;
private InternalFacets facets;
private boolean searchTimedOut;
public QuerySearchResult() {
@ -133,6 +127,7 @@ public class QuerySearchResult implements Streamable, QuerySearchResultProvider
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
// shardTarget = readSearchShardTarget(in);
from = in.readVInt();
@ -146,6 +141,7 @@ public class QuerySearchResult implements Streamable, QuerySearchResultProvider
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
// shardTarget.writeTo(out);
out.writeVInt(from);

View File

@ -21,8 +21,8 @@ package org.elasticsearch.search.query;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
@ -32,10 +32,9 @@ import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchRe
/**
*
*/
public class ScrollQuerySearchResult implements Streamable {
public class ScrollQuerySearchResult extends TransportResponse {
private QuerySearchResult queryResult;
private SearchShardTarget shardTarget;
public ScrollQuerySearchResult() {
@ -56,6 +55,7 @@ public class ScrollQuerySearchResult implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardTarget = readSearchShardTarget(in);
queryResult = readQuerySearchResult(in);
queryResult.shardTarget(shardTarget);
@ -63,6 +63,7 @@ public class ScrollQuerySearchResult implements Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardTarget.writeTo(out);
queryResult.writeTo(out);
}

View File

@ -19,13 +19,9 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.Streamable;
/**
* A simple based class that always spawns.
*
*
*/
public abstract class BaseTransportResponseHandler<T extends Streamable> implements TransportResponseHandler<T> {
public abstract class BaseTransportResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
}

View File

@ -19,29 +19,28 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.threadpool.ThreadPool;
/**
*
*/
public class VoidTransportResponseHandler implements TransportResponseHandler<VoidStreamable> {
public class EmptyTransportResponseHandler implements TransportResponseHandler<TransportResponse.Empty> {
public static final VoidTransportResponseHandler INSTANCE_SAME = new VoidTransportResponseHandler(ThreadPool.Names.SAME);
public static final EmptyTransportResponseHandler INSTANCE_SAME = new EmptyTransportResponseHandler(ThreadPool.Names.SAME);
private final String executor;
public VoidTransportResponseHandler(String executor) {
public EmptyTransportResponseHandler(String executor) {
this.executor = executor;
}
@Override
public VoidStreamable newInstance() {
return VoidStreamable.INSTANCE;
public TransportResponse.Empty newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(VoidStreamable response) {
public void handleResponse(TransportResponse.Empty response) {
}
@Override

View File

@ -19,15 +19,12 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.threadpool.ThreadPool;
/**
* A response handler to be used when all interaction will be done through the {@link TransportFuture}.
*
*
*/
public abstract class FutureTransportResponseHandler<T extends Streamable> extends BaseTransportResponseHandler<T> {
public abstract class FutureTransportResponseHandler<T extends TransportResponse> extends BaseTransportResponseHandler<T> {
@Override
public void handleResponse(T response) {

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ElasticSearchTimeoutException;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import java.util.concurrent.ExecutionException;
@ -32,7 +31,7 @@ import java.util.concurrent.TimeoutException;
/**
*
*/
public class PlainTransportFuture<V extends Streamable> extends BaseFuture<V> implements TransportFuture<V>, TransportResponseHandler<V> {
public class PlainTransportFuture<V extends TransportResponse> extends BaseFuture<V> implements TransportFuture<V>, TransportResponseHandler<V> {
private final TransportResponseHandler<V> handler;

View File

@ -19,22 +19,18 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A transport channel allows to send a response to a request on the channel.
*
*
*/
public interface TransportChannel {
String action();
void sendResponse(Streamable message) throws IOException;
void sendResponse(TransportResponse response) throws IOException;
void sendResponse(Streamable message, TransportResponseOptions options) throws IOException;
void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException;
void sendResponse(Throwable error) throws IOException;
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import com.google.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*/
public abstract class TransportResponse implements Streamable {
public static class Empty extends TransportResponse {
public static final Empty INSTANCE = new Empty();
public Empty() {
super();
}
public Empty(TransportResponse request) {
super(request);
}
}
private Map<String, Object> headers;
protected TransportResponse() {
}
protected TransportResponse(TransportResponse request) {
// create a new copy of the headers, since we are creating a new request which might have
// its headers changed in the context of that specific request
if (request.getHeaders() != null) {
this.headers = new HashMap<String, Object>(request.getHeaders());
}
}
@SuppressWarnings("unchecked")
public final TransportResponse putHeader(String key, Object value) {
if (headers == null) {
headers = Maps.newHashMap();
}
headers.put(key, value);
return this;
}
@SuppressWarnings("unchecked")
public final <V> V getHeader(String key) {
if (headers == null) {
return null;
}
return (V) headers.get(key);
}
public Map<String, Object> getHeaders() {
return this.headers;
}
@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
headers = in.readMap();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (headers == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(headers);
}
}
}

View File

@ -19,12 +19,10 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.Streamable;
/**
*
*/
public interface TransportResponseHandler<T extends Streamable> {
public interface TransportResponseHandler<T extends TransportResponse> {
/**
* creates a new instance of the return type from the remote call.

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -155,25 +154,25 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
this.throwConnectException = throwConnectException;
}
public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, TransportRequest request,
TransportResponseHandler<T> handler) throws TransportException {
public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryNode node, String action, TransportRequest request,
TransportResponseHandler<T> handler) throws TransportException {
return submitRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
}
public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryNode node, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
PlainTransportFuture<T> futureHandler = new PlainTransportFuture<T>(handler);
sendRequest(node, action, request, options, futureHandler);
return futureHandler;
}
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportResponseHandler<T> handler) throws TransportException {
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportResponseHandler<T> handler) throws TransportException {
sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
}
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportRequestOptions options, final TransportResponseHandler<T> handler) throws TransportException {
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportRequestOptions options, final TransportResponseHandler<T> handler) throws TransportException {
final long requestId = newRequestId();
TimeoutHandler timeoutHandler = null;
try {
@ -380,7 +379,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
}
static class RequestHolder<T extends Streamable> {
static class RequestHolder<T extends TransportResponse> {
private final TransportResponseHandler<T> handler;

View File

@ -253,11 +253,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
final Streamable streamable = handler.newInstance();
final TransportResponse response = handler.newInstance();
try {
streamable.readFrom(buffer);
response.readFrom(buffer);
} catch (Exception e) {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e));
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
threadPool.executor(handler.executor()).execute(new Runnable() {
@ -265,7 +265,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
@Override
public void run() {
try {
handler.handleResponse(streamable);
handler.handleResponse(response);
} catch (Exception e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
}

View File

@ -23,11 +23,7 @@ import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus;
import java.io.IOException;
@ -60,12 +56,12 @@ public class LocalTransportChannel implements TransportChannel {
}
@Override
public void sendResponse(Streamable message) throws IOException {
sendResponse(message, TransportResponseOptions.EMPTY);
public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
}
@Override
public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
StreamOutput stream = cachedEntry.handles();
@ -73,7 +69,7 @@ public class LocalTransportChannel implements TransportChannel {
byte status = 0;
status = TransportStatus.setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream);
response.writeTo(stream);
stream.close();
final byte[] data = cachedEntry.bytes().bytes().copyBytesArray().toBytes();
targetTransport.threadPool().generic().execute(new Runnable() {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -144,19 +143,19 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
final Streamable streamable = handler.newInstance();
final TransportResponse response = handler.newInstance();
try {
streamable.readFrom(buffer);
response.readFrom(buffer);
} catch (Exception e) {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e));
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
try {
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.handleResponse(streamable);
handler.handleResponse(response);
} else {
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, streamable));
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
}
} catch (Exception e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
@ -196,7 +195,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readUTF();
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);
try {
@ -231,18 +230,18 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
class ResponseHandler implements Runnable {
private final TransportResponseHandler handler;
private final Streamable streamable;
private final TransportResponse response;
public ResponseHandler(TransportResponseHandler handler, Streamable streamable) {
public ResponseHandler(TransportResponseHandler handler, TransportResponse response) {
this.handler = handler;
this.streamable = streamable;
this.response = response;
}
@SuppressWarnings({"unchecked"})
@Override
public void run() {
try {
handler.handleResponse(streamable);
handler.handleResponse(response);
} catch (Exception e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
}

View File

@ -25,11 +25,7 @@ import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
@ -67,12 +63,12 @@ public class NettyTransportChannel implements TransportChannel {
}
@Override
public void sendResponse(Streamable message) throws IOException {
sendResponse(message, TransportResponseOptions.EMPTY);
public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
}
@Override
public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
if (transport.compress) {
options.withCompress(true);
}
@ -86,13 +82,13 @@ public class NettyTransportChannel implements TransportChannel {
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
stream.setVersion(version);
message.writeTo(stream);
response.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.handles();
stream.setVersion(version);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
message.writeTo(stream);
response.writeTo(stream);
stream.close();
}
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.benchmark.transport.netty;
package org.elasticsearch.benchmark.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -21,29 +21,42 @@ package org.elasticsearch.benchmark.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
/**
*
*/
public class BenchmarkMessage implements Streamable {
public class BenchmarkMessageResponse extends TransportResponse {
long id;
byte[] payload;
public BenchmarkMessage(long id, byte[] payload) {
public BenchmarkMessageResponse(BenchmarkMessageRequest request) {
this.id = request.id;
this.payload = request.payload;
}
public BenchmarkMessageResponse(long id, byte[] payload) {
this.id = id;
this.payload = payload;
}
public BenchmarkMessage() {
public BenchmarkMessageResponse() {
}
public long id() {
return id;
}
public byte[] payload() {
return payload;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
payload = new byte[in.readVInt()];
in.readFully(payload);
@ -51,6 +64,7 @@ public class BenchmarkMessage implements Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
out.writeVInt(payload.length);
out.writeBytes(payload);

View File

@ -19,7 +19,6 @@
package org.elasticsearch.benchmark.transport;
import org.elasticsearch.benchmark.transport.netty.BenchmarkMessageRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Bytes;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -73,7 +72,7 @@ public class BenchmarkNettyLargeMessages {
@Override
public void messageReceived(BenchmarkMessageRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(request);
channel.sendResponse(new BenchmarkMessageResponse(request));
}
});
@ -84,10 +83,10 @@ public class BenchmarkNettyLargeMessages {
public void run() {
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload);
transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler<BenchmarkMessage>() {
transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
@Override
public BenchmarkMessage newInstance() {
return new BenchmarkMessage();
public BenchmarkMessageResponse newInstance() {
return new BenchmarkMessageResponse();
}
@Override
@ -96,7 +95,7 @@ public class BenchmarkNettyLargeMessages {
}
@Override
public void handleResponse(BenchmarkMessage response) {
public void handleResponse(BenchmarkMessageResponse response) {
}
@Override
@ -116,10 +115,10 @@ public class BenchmarkNettyLargeMessages {
for (int i = 0; i < 1; i++) {
BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, Bytes.EMPTY_ARRAY);
long start = System.currentTimeMillis();
transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler<BenchmarkMessage>() {
transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
@Override
public BenchmarkMessage newInstance() {
return new BenchmarkMessage();
public BenchmarkMessageResponse newInstance() {
return new BenchmarkMessageResponse();
}
@Override
@ -128,7 +127,7 @@ public class BenchmarkNettyLargeMessages {
}
@Override
public void handleResponse(BenchmarkMessage response) {
public void handleResponse(BenchmarkMessageResponse response) {
}
@Override

View File

@ -19,7 +19,6 @@
package org.elasticsearch.benchmark.transport;
import org.elasticsearch.benchmark.transport.netty.BenchmarkMessageRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -91,7 +90,7 @@ public class TransportBenchmark {
@Override
public void messageReceived(BenchmarkMessageRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(request);
channel.sendResponse(new BenchmarkMessageResponse(request));
}
});
@ -99,10 +98,10 @@ public class TransportBenchmark {
for (int i = 0; i < 10000; i++) {
BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload);
clientTransportService.submitRequest(node, "benchmark", message, new BaseTransportResponseHandler<BenchmarkMessage>() {
clientTransportService.submitRequest(node, "benchmark", message, new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
@Override
public BenchmarkMessage newInstance() {
return new BenchmarkMessage();
public BenchmarkMessageResponse newInstance() {
return new BenchmarkMessageResponse();
}
@Override
@ -111,7 +110,7 @@ public class TransportBenchmark {
}
@Override
public void handleResponse(BenchmarkMessage response) {
public void handleResponse(BenchmarkMessageResponse response) {
}
@Override
@ -131,10 +130,10 @@ public class TransportBenchmark {
for (int j = 0; j < NUMBER_OF_ITERATIONS; j++) {
final long id = idGenerator.incrementAndGet();
BenchmarkMessageRequest request = new BenchmarkMessageRequest(id, payload);
BaseTransportResponseHandler<BenchmarkMessage> handler = new BaseTransportResponseHandler<BenchmarkMessage>() {
BaseTransportResponseHandler<BenchmarkMessageResponse> handler = new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
@Override
public BenchmarkMessage newInstance() {
return new BenchmarkMessage();
public BenchmarkMessageResponse newInstance() {
return new BenchmarkMessageResponse();
}
@Override
@ -143,9 +142,9 @@ public class TransportBenchmark {
}
@Override
public void handleResponse(BenchmarkMessage response) {
if (response.id != id) {
System.out.println("NO ID MATCH [" + response.id + "] and [" + id + "]");
public void handleResponse(BenchmarkMessageResponse response) {
if (response.id() != id) {
System.out.println("NO ID MATCH [" + response.id() + "] and [" + id + "]");
}
latch.countDown();
}

View File

@ -22,8 +22,6 @@ package org.elasticsearch.test.unit.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -86,7 +84,7 @@ public abstract class AbstractSimpleTransportTests {
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message));
channel.sendResponse(new StringMessageResponse("hello " + request.message));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
@ -94,11 +92,11 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessage>() {
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessage newInstance() {
return new StringMessage();
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
@ -107,7 +105,7 @@ public abstract class AbstractSimpleTransportTests {
}
@Override
public void handleResponse(StringMessage response) {
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@ -119,7 +117,7 @@ public abstract class AbstractSimpleTransportTests {
});
try {
StringMessage message = res.get();
StringMessageResponse message = res.get();
assertThat("hello moshe", equalTo(message.message));
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
@ -144,7 +142,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
try {
channel.sendResponse(VoidStreamable.INSTANCE, TransportResponseOptions.options().withCompress(true));
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options().withCompress(true));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
@ -152,11 +150,11 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<VoidStreamable> res = serviceB.submitRequest(serviceANode, "sayHello",
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<VoidStreamable>() {
TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(serviceANode, "sayHello",
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<TransportResponse.Empty>() {
@Override
public VoidStreamable newInstance() {
return VoidStreamable.INSTANCE;
public TransportResponse.Empty newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
@ -165,7 +163,7 @@ public abstract class AbstractSimpleTransportTests {
}
@Override
public void handleResponse(VoidStreamable response) {
public void handleResponse(TransportResponse.Empty response) {
}
@Override
@ -176,7 +174,7 @@ public abstract class AbstractSimpleTransportTests {
});
try {
VoidStreamable message = res.get();
TransportResponse.Empty message = res.get();
assertThat(message, notNullValue());
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
@ -202,7 +200,7 @@ public abstract class AbstractSimpleTransportTests {
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress(true));
channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.options().withCompress(true));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
@ -210,11 +208,11 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessage>() {
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessage newInstance() {
return new StringMessage();
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
@ -223,7 +221,7 @@ public abstract class AbstractSimpleTransportTests {
}
@Override
public void handleResponse(StringMessage response) {
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@ -235,7 +233,7 @@ public abstract class AbstractSimpleTransportTests {
});
try {
StringMessage message = res.get();
StringMessageResponse message = res.get();
assertThat("hello moshe", equalTo(message.message));
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
@ -264,11 +262,11 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloException",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessage>() {
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHelloException",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessage newInstance() {
return new StringMessage();
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
@ -277,7 +275,7 @@ public abstract class AbstractSimpleTransportTests {
}
@Override
public void handleResponse(StringMessage response) {
public void handleResponse(StringMessageResponse response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@ -342,11 +340,11 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse",
new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessage>() {
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse",
new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessage newInstance() {
return new StringMessage();
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
@ -355,7 +353,7 @@ public abstract class AbstractSimpleTransportTests {
}
@Override
public void handleResponse(StringMessage response) {
public void handleResponse(StringMessageResponse response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@ -366,7 +364,7 @@ public abstract class AbstractSimpleTransportTests {
});
try {
StringMessage message = res.txGet();
StringMessageResponse message = res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
} catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
@ -397,7 +395,7 @@ public abstract class AbstractSimpleTransportTests {
// ignore
}
try {
channel.sendResponse(new StringMessage("hello " + request.message));
channel.sendResponse(new StringMessageResponse("hello " + request.message));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
@ -405,11 +403,11 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessage>() {
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessage newInstance() {
return new StringMessage();
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
@ -418,7 +416,7 @@ public abstract class AbstractSimpleTransportTests {
}
@Override
public void handleResponse(StringMessage response) {
public void handleResponse(StringMessageResponse response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@ -429,7 +427,7 @@ public abstract class AbstractSimpleTransportTests {
});
try {
StringMessage message = res.txGet();
StringMessageResponse message = res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
} catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
@ -442,10 +440,10 @@ public abstract class AbstractSimpleTransportTests {
final int counter = i;
// now, try and send another request, this times, with a short timeout
res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest(counter + "ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessage>() {
new StringMessageRequest(counter + "ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessage newInstance() {
return new StringMessage();
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
@ -454,7 +452,7 @@ public abstract class AbstractSimpleTransportTests {
}
@Override
public void handleResponse(StringMessage response) {
public void handleResponse(StringMessageResponse response) {
assertThat("hello " + counter + "ms", equalTo(response.message));
}
@ -465,14 +463,14 @@ public abstract class AbstractSimpleTransportTests {
}
});
StringMessage message = res.txGet();
StringMessageResponse message = res.txGet();
assertThat(message.message, equalTo("hello " + counter + "ms"));
}
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
}
class StringMessageRequest extends TransportRequest {
static class StringMessageRequest extends TransportRequest {
private String message;
@ -496,24 +494,26 @@ public abstract class AbstractSimpleTransportTests {
}
}
class StringMessage implements Streamable {
static class StringMessageResponse extends TransportResponse {
private String message;
StringMessage(String message) {
StringMessageResponse(String message) {
this.message = message;
}
StringMessage() {
StringMessageResponse() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
message = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(message);
}
}