From c6efb9be2a235a7a73475e0a87ff0a273fc3394a Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Wed, 10 Jul 2019 10:54:31 -0700 Subject: [PATCH] Convert ReplicationResponse to Writeable (#43953) This commit convers ReplicationResponse and all its subclasses to support Writeable.Reader as a constructor. relates #34389 --- .../action/DocWriteResponse.java | 48 +++++++++---------- ...TransportVerifyShardBeforeCloseAction.java | 4 +- .../flush/TransportShardFlushAction.java | 7 ++- .../refresh/TransportShardRefreshAction.java | 7 ++- .../action/bulk/BulkItemResponse.java | 10 ++-- .../action/bulk/BulkShardResponse.java | 22 ++++----- .../action/bulk/TransportShardBulkAction.java | 7 ++- .../action/delete/DeleteAction.java | 11 ++--- .../action/delete/DeleteResponse.java | 4 +- .../action/index/IndexAction.java | 11 ++--- .../action/index/IndexResponse.java | 4 +- .../resync/ResyncReplicationResponse.java | 9 ++++ .../TransportResyncReplicationAction.java | 8 ++-- .../replication/ReplicationResponse.java | 10 +++- .../TransportReplicationAction.java | 12 ++--- ...ransportInstanceSingleOperationAction.java | 6 +-- .../action/update/TransportUpdateAction.java | 6 ++- .../action/update/UpdateAction.java | 11 ++--- .../action/update/UpdateResponse.java | 14 ++---- .../seqno/GlobalCheckpointSyncAction.java | 4 +- .../RetentionLeaseBackgroundSyncAction.java | 4 +- .../index/seqno/RetentionLeaseSyncAction.java | 10 +++- .../elasticsearch/action/bulk/RetryTests.java | 2 +- .../TransportReplicationActionTests.java | 8 +++- ...ReplicationAllPermitsAcquisitionTests.java | 2 +- .../TransportWriteActionTests.java | 2 +- ...ortInstanceSingleOperationActionTests.java | 3 +- .../bulk/BulkShardOperationsResponse.java | 11 ++--- .../TransportBulkShardOperationsAction.java | 5 +- .../persistence/JobResultsPersisterTests.java | 2 +- .../execution/ExecutionServiceTests.java | 2 +- 31 files changed, 133 insertions(+), 133 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/server/src/main/java/org/elasticsearch/action/DocWriteResponse.java index 8894482d79b..3a7a68e083b 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteResponse.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -112,14 +112,14 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr } } - private ShardId shardId; - private String id; - private String type; - private long version; - private long seqNo; - private long primaryTerm; + private final ShardId shardId; + private final String id; + private final String type; + private final long version; + private final long seqNo; + private final long primaryTerm; private boolean forcedRefresh; - protected Result result; + protected final Result result; public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) { this.shardId = shardId; @@ -132,7 +132,21 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr } // needed for deserialization - protected DocWriteResponse() { + protected DocWriteResponse(StreamInput in) throws IOException { + super(in); + shardId = new ShardId(in); + type = in.readString(); + id = in.readString(); + version = in.readZLong(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { + seqNo = in.readZLong(); + primaryTerm = in.readVLong(); + } else { + seqNo = UNASSIGNED_SEQ_NO; + primaryTerm = UNASSIGNED_PRIMARY_TERM; + } + forcedRefresh = in.readBoolean(); + result = Result.readFrom(in); } /** @@ -257,24 +271,6 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr return location.toString(); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - shardId = new ShardId(in); - type = in.readString(); - id = in.readString(); - version = in.readZLong(); - if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { - seqNo = in.readZLong(); - primaryTerm = in.readVLong(); - } else { - seqNo = UNASSIGNED_SEQ_NO; - primaryTerm = UNASSIGNED_PRIMARY_TERM; - } - forcedRefresh = in.readBoolean(); - result = Result.readFrom(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 79cbab47819..611d7ce8955 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -64,8 +64,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA } @Override - protected ReplicationResponse newResponseInstance() { - return new ReplicationResponse(); + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index a07dee9613a..4b85a3d9f3f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -27,12 +27,15 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + public class TransportShardFlushAction extends TransportReplicationAction { @@ -47,8 +50,8 @@ public class TransportShardFlushAction } @Override - protected ReplicationResponse newResponseInstance() { - return new ReplicationResponse(); + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index c0a52ac8c0d..a397eb881da 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -28,12 +28,15 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + public class TransportShardRefreshAction extends TransportReplicationAction { @@ -49,8 +52,8 @@ public class TransportShardRefreshAction } @Override - protected ReplicationResponse newResponseInstance() { - return new ReplicationResponse(); + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 838293b8b1f..ded2a540b6e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -476,15 +476,11 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject { byte type = in.readByte(); if (type == 0) { - response = new IndexResponse(); - response.readFrom(in); + response = new IndexResponse(in); } else if (type == 1) { - response = new DeleteResponse(); - response.readFrom(in); - + response = new DeleteResponse(in); } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses' - response = new UpdateResponse(); - response.readFrom(in); + response = new UpdateResponse(in); } if (in.readBoolean()) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java index fc58e620738..8bb1bc9e62b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java @@ -30,10 +30,16 @@ import java.io.IOException; public class BulkShardResponse extends ReplicationResponse implements WriteResponse { - private ShardId shardId; - private BulkItemResponse[] responses; + private final ShardId shardId; + private final BulkItemResponse[] responses; - BulkShardResponse() { + BulkShardResponse(StreamInput in) throws IOException { + super(in); + shardId = new ShardId(in); + responses = new BulkItemResponse[in.readVInt()]; + for (int i = 0; i < responses.length; i++) { + responses[i] = BulkItemResponse.readBulkItem(in); + } } // NOTE: public for testing only @@ -64,16 +70,6 @@ public class BulkShardResponse extends ReplicationResponse implements WriteRespo } } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - shardId = new ShardId(in); - responses = new BulkItemResponse[in.readVInt()]; - for (int i = 0; i < responses.length; i++) { - responses[i] = BulkItemResponse.readBulkItem(in); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index ad803589f9d..2bc886d5255 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.util.MessageSupplier; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -49,6 +50,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; @@ -68,6 +70,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -100,8 +103,8 @@ public class TransportShardBulkAction extends TransportWriteAction { +public class DeleteAction extends ActionType { public static final DeleteAction INSTANCE = new DeleteAction(); public static final String NAME = "indices:data/write/delete"; private DeleteAction() { - super(NAME); - } - - @Override - public DeleteResponse newResponse() { - return new DeleteResponse(); + super(NAME, DeleteResponse::new); } } diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java index 14b7f65239b..5961797e3a0 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -36,7 +37,8 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpect */ public class DeleteResponse extends DocWriteResponse { - public DeleteResponse() { + public DeleteResponse(StreamInput in) throws IOException { + super(in); } public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean found) { diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexAction.java b/server/src/main/java/org/elasticsearch/action/index/IndexAction.java index b6afe88b770..20c31e9d358 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexAction.java @@ -19,19 +19,14 @@ package org.elasticsearch.action.index; -import org.elasticsearch.action.StreamableResponseActionType; +import org.elasticsearch.action.ActionType; -public class IndexAction extends StreamableResponseActionType { +public class IndexAction extends ActionType { public static final IndexAction INSTANCE = new IndexAction(); public static final String NAME = "indices:data/write/index"; private IndexAction() { - super(NAME); - } - - @Override - public IndexResponse newResponse() { - return new IndexResponse(); + super(NAME, IndexResponse::new); } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 3174e4d8ab1..75ad2e106a0 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.index; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -37,7 +38,8 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpect */ public class IndexResponse extends DocWriteResponse { - public IndexResponse() { + public IndexResponse(StreamInput in) throws IOException { + super(in); } public IndexResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean created) { diff --git a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java index f3dbea04763..2180ef7d6ef 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java +++ b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java @@ -20,9 +20,18 @@ package org.elasticsearch.action.resync; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse { + public ResyncReplicationResponse() {} + + public ResyncReplicationResponse(StreamInput in) throws IOException { + super(in); + } + @Override public void setForcedRefresh(boolean forcedRefresh) { // ignore diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 464cd3168bf..095f5e27a62 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -63,8 +63,8 @@ public class TransportResyncReplicationAction extends TransportWriteAction() { @Override public ResyncReplicationResponse read(StreamInput in) throws IOException { - ResyncReplicationResponse response = newResponseInstance(); - response.readFrom(in); - return response; + return newResponseInstance(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java index 008b0095fb8..f6f6a8238f3 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java @@ -50,10 +50,16 @@ public class ReplicationResponse extends ActionResponse { private ShardInfo shardInfo; + public ReplicationResponse() {} + + public ReplicationResponse(StreamInput in) throws IOException { + super(in); + shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in); + } + @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in); + throw new UnsupportedOperationException("Streamable no longer used"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 1b918975760..5feb9977087 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -169,7 +169,7 @@ public abstract class TransportReplicationAction< return new ReplicasProxy(); } - protected abstract Response newResponseInstance(); + protected abstract Response newResponseInstance(StreamInput in) throws IOException; /** * Resolves derived values in the request. For example, the target shard id of the incoming request, if not set at request construction. @@ -342,11 +342,7 @@ public abstract class TransportReplicationAction< // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. final ShardRouting primary = primaryShardReference.routingEntry(); assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; - final Writeable.Reader reader = in -> { - Response response = TransportReplicationAction.this.newResponseInstance(); - response.readFrom(in); - return response; - }; + final Writeable.Reader reader = TransportReplicationAction.this::newResponseInstance; DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), @@ -750,9 +746,7 @@ public abstract class TransportReplicationAction< @Override public Response read(StreamInput in) throws IOException { - Response response = newResponseInstance(); - response.readFrom(in); - return response; + return newResponseInstance(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index d1d7b6ffac5..f7b193e63e4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -86,7 +86,7 @@ public abstract class TransportInstanceSingleOperationAction< protected abstract void shardOperation(Request request, ActionListener listener); - protected abstract Response newResponse(); + protected abstract Response newResponse(StreamInput in) throws IOException; protected ClusterBlockException checkGlobalBlock(ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); @@ -183,9 +183,7 @@ public abstract class TransportInstanceSingleOperationAction< @Override public Response read(StreamInput in) throws IOException { - Response response = newResponse(); - response.readFrom(in); - return response; + return newResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index f2a2034d14c..1a2dd0a53c7 100644 --- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; @@ -56,6 +57,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -89,8 +91,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio } @Override - protected UpdateResponse newResponse() { - return new UpdateResponse(); + protected UpdateResponse newResponse(StreamInput in) throws IOException { + return new UpdateResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/UpdateAction.java index f24f5436f5e..c97aa74aee6 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateAction.java @@ -19,19 +19,14 @@ package org.elasticsearch.action.update; -import org.elasticsearch.action.StreamableResponseActionType; +import org.elasticsearch.action.ActionType; -public class UpdateAction extends StreamableResponseActionType { +public class UpdateAction extends ActionType { public static final UpdateAction INSTANCE = new UpdateAction(); public static final String NAME = "indices:data/write/update"; private UpdateAction() { - super(NAME); - } - - @Override - public UpdateResponse newResponse() { - return new UpdateResponse(); + super(NAME, UpdateResponse::new); } } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index f3afec4f25b..b2f64010c15 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -39,7 +39,11 @@ public class UpdateResponse extends DocWriteResponse { private GetResult getResult; - public UpdateResponse() { + public UpdateResponse(StreamInput in) throws IOException { + super(in); + if (in.readBoolean()) { + getResult = GetResult.readGetResult(in); + } } /** @@ -69,14 +73,6 @@ public class UpdateResponse extends DocWriteResponse { return this.result == Result.CREATED ? RestStatus.CREATED : super.status(); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - if (in.readBoolean()) { - getResult = GetResult.readGetResult(in); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 70e34623a41..aab004c6ced 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -97,8 +97,8 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction< } @Override - protected ReplicationResponse newResponseInstance() { - return new ReplicationResponse(); + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 25c7fec1b48..d55f9d08e5a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -192,8 +192,8 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi } @Override - protected ReplicationResponse newResponseInstance() { - return new ReplicationResponse(); + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 04204fb4a16..f824363b92d 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -195,6 +195,12 @@ public class RetentionLeaseSyncAction extends public static final class Response extends ReplicationResponse implements WriteResponse { + public Response() {} + + Response(StreamInput in) throws IOException { + super(in); + } + @Override public void setForcedRefresh(final boolean forcedRefresh) { // ignore @@ -203,8 +209,8 @@ public class RetentionLeaseSyncAction extends } @Override - protected Response newResponseInstance() { - return new Response(); + protected Response newResponseInstance(StreamInput in) throws IOException { + return new Response(in); } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index decee8ceab7..f5d881e2b04 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -226,7 +226,7 @@ public class RetryTests extends ESTestCase { } private BulkItemResponse successfulResponse() { - return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse()); + return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse(null, null, null, 0, 0, 0, false)); } private BulkItemResponse failedResponse() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 4459aa55569..ef88466cda0 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1228,6 +1228,10 @@ public class TransportReplicationActionTests extends ESTestCase { } static class TestResponse extends ReplicationResponse { + TestResponse(StreamInput in) throws IOException { + super(in); + } + TestResponse() { setShardInfo(new ShardInfo()); } @@ -1251,8 +1255,8 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected TestResponse newResponseInstance() { - return new TestResponse(); + protected TestResponse newResponseInstance(StreamInput in) throws IOException { + return new TestResponse(in); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index d5eb6a81bf2..103b98bc586 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -431,7 +431,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe } @Override - protected Response newResponseInstance() { + protected Response newResponseInstance(StreamInput in) { return new Response(); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 57b30d3484b..e12151595d5 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -417,7 +417,7 @@ public class TransportWriteActionTests extends ESTestCase { @Override - protected TestResponse newResponseInstance() { + protected TestResponse newResponseInstance(StreamInput in) throws IOException { return new TestResponse(); } diff --git a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index affa827812b..9160d52098e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; @@ -114,7 +115,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { } @Override - protected Response newResponse() { + protected Response newResponse(StreamInput in) throws IOException { return new Response(); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java index 0c72f02fde1..f244369caeb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java @@ -37,15 +37,14 @@ public final class BulkShardOperationsResponse extends ReplicationResponse imple public BulkShardOperationsResponse() { } - @Override - public void setForcedRefresh(final boolean forcedRefresh) { + public BulkShardOperationsResponse(StreamInput in) throws IOException { + super(in); + globalCheckpoint = in.readZLong(); + maxSeqNo = in.readZLong(); } @Override - public void readFrom(final StreamInput in) throws IOException { - super.readFrom(in); - globalCheckpoint = in.readZLong(); - maxSeqNo = in.readZLong(); + public void setForcedRefresh(final boolean forcedRefresh) { } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 5f8f1d5368a..2a8123c17e9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SeqNoStats; @@ -190,8 +191,8 @@ public class TransportBulkShardOperationsAction } @Override - protected BulkShardOperationsResponse newResponseInstance() { - return new BulkShardOperationsResponse(); + protected BulkShardOperationsResponse newResponseInstance(StreamInput in) throws IOException { + return new BulkShardOperationsResponse(in); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 0f3436daf80..363b737424c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -238,7 +238,7 @@ public class JobResultsPersisterTests extends ESTestCase { // Take the listener passed to client::index as 2nd argument ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; // Handle the response on the listener - listener.onResponse(new IndexResponse()); + listener.onResponse(new IndexResponse(null, null, null, 0, 0, 0, false)); return null; }) .when(client).index(any(), any(ActionListener.class)); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 40478f45dfa..53f060b22c3 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -1098,7 +1098,7 @@ public class ExecutionServiceTests extends ESTestCase { } PlainActionFuture future = PlainActionFuture.newFuture(); - future.onResponse(new UpdateResponse()); + future.onResponse(new UpdateResponse(null, null, null, null, 0, 0, 0, null)); return future; }).when(client).update(any());