Convert ReplicationResponse to Writeable (#43953)

This commit convers ReplicationResponse and all its subclasses to
support Writeable.Reader as a constructor.

relates #34389
This commit is contained in:
Ryan Ernst 2019-07-10 10:54:31 -07:00 committed by Ryan Ernst
parent fb77d8f461
commit c6efb9be2a
31 changed files with 133 additions and 133 deletions

View File

@ -112,14 +112,14 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
}
}
private ShardId shardId;
private String id;
private String type;
private long version;
private long seqNo;
private long primaryTerm;
private final ShardId shardId;
private final String id;
private final String type;
private final long version;
private final long seqNo;
private final long primaryTerm;
private boolean forcedRefresh;
protected Result result;
protected final Result result;
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
this.shardId = shardId;
@ -132,7 +132,21 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
}
// needed for deserialization
protected DocWriteResponse() {
protected DocWriteResponse(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
type = in.readString();
id = in.readString();
version = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = UNASSIGNED_SEQ_NO;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
}
/**
@ -257,24 +271,6 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
return location.toString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
type = in.readString();
id = in.readString();
version = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = UNASSIGNED_SEQ_NO;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -64,8 +64,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
}
@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
}
@Override

View File

@ -27,12 +27,15 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
public class TransportShardFlushAction
extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ReplicationResponse> {
@ -47,8 +50,8 @@ public class TransportShardFlushAction
}
@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
}
@Override

View File

@ -28,12 +28,15 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
public class TransportShardRefreshAction
extends TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> {
@ -49,8 +52,8 @@ public class TransportShardRefreshAction
}
@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
}
@Override

View File

@ -476,15 +476,11 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
byte type = in.readByte();
if (type == 0) {
response = new IndexResponse();
response.readFrom(in);
response = new IndexResponse(in);
} else if (type == 1) {
response = new DeleteResponse();
response.readFrom(in);
response = new DeleteResponse(in);
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
response = new UpdateResponse();
response.readFrom(in);
response = new UpdateResponse(in);
}
if (in.readBoolean()) {

View File

@ -30,10 +30,16 @@ import java.io.IOException;
public class BulkShardResponse extends ReplicationResponse implements WriteResponse {
private ShardId shardId;
private BulkItemResponse[] responses;
private final ShardId shardId;
private final BulkItemResponse[] responses;
BulkShardResponse() {
BulkShardResponse(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = BulkItemResponse.readBulkItem(in);
}
}
// NOTE: public for testing only
@ -64,16 +70,6 @@ public class BulkShardResponse extends ReplicationResponse implements WriteRespo
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = BulkItemResponse.readBulkItem(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -26,6 +26,7 @@ import org.apache.logging.log4j.util.MessageSupplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -49,6 +50,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -68,6 +70,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
@ -100,8 +103,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected BulkShardResponse newResponseInstance() {
return new BulkShardResponse();
protected BulkShardResponse newResponseInstance(StreamInput in) throws IOException {
return new BulkShardResponse(in);
}
@Override

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class DeleteAction extends StreamableResponseActionType<DeleteResponse> {
public class DeleteAction extends ActionType<DeleteResponse> {
public static final DeleteAction INSTANCE = new DeleteAction();
public static final String NAME = "indices:data/write/delete";
private DeleteAction() {
super(NAME);
}
@Override
public DeleteResponse newResponse() {
return new DeleteResponse();
super(NAME, DeleteResponse::new);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
@ -36,7 +37,8 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpect
*/
public class DeleteResponse extends DocWriteResponse {
public DeleteResponse() {
public DeleteResponse(StreamInput in) throws IOException {
super(in);
}
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean found) {

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.index;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class IndexAction extends StreamableResponseActionType<IndexResponse> {
public class IndexAction extends ActionType<IndexResponse> {
public static final IndexAction INSTANCE = new IndexAction();
public static final String NAME = "indices:data/write/index";
private IndexAction() {
super(NAME);
}
@Override
public IndexResponse newResponse() {
return new IndexResponse();
super(NAME, IndexResponse::new);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.index;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
@ -37,7 +38,8 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpect
*/
public class IndexResponse extends DocWriteResponse {
public IndexResponse() {
public IndexResponse(StreamInput in) throws IOException {
super(in);
}
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean created) {

View File

@ -20,9 +20,18 @@ package org.elasticsearch.action.resync;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse {
public ResyncReplicationResponse() {}
public ResyncReplicationResponse(StreamInput in) throws IOException {
super(in);
}
@Override
public void setForcedRefresh(boolean forcedRefresh) {
// ignore

View File

@ -63,8 +63,8 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
}
@Override
protected ResyncReplicationResponse newResponseInstance() {
return new ResyncReplicationResponse();
protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ResyncReplicationResponse(in);
}
@Override
@ -137,9 +137,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
new TransportResponseHandler<ResyncReplicationResponse>() {
@Override
public ResyncReplicationResponse read(StreamInput in) throws IOException {
ResyncReplicationResponse response = newResponseInstance();
response.readFrom(in);
return response;
return newResponseInstance(in);
}
@Override

View File

@ -50,10 +50,16 @@ public class ReplicationResponse extends ActionResponse {
private ShardInfo shardInfo;
public ReplicationResponse() {}
public ReplicationResponse(StreamInput in) throws IOException {
super(in);
shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in);
throw new UnsupportedOperationException("Streamable no longer used");
}
@Override

View File

@ -169,7 +169,7 @@ public abstract class TransportReplicationAction<
return new ReplicasProxy();
}
protected abstract Response newResponseInstance();
protected abstract Response newResponseInstance(StreamInput in) throws IOException;
/**
* Resolves derived values in the request. For example, the target shard id of the incoming request, if not set at request construction.
@ -342,11 +342,7 @@ public abstract class TransportReplicationAction<
// phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
final ShardRouting primary = primaryShardReference.routingEntry();
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
final Writeable.Reader<Response> reader = in -> {
Response response = TransportReplicationAction.this.newResponseInstance();
response.readFrom(in);
return response;
};
final Writeable.Reader<Response> reader = TransportReplicationAction.this::newResponseInstance;
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
@ -750,9 +746,7 @@ public abstract class TransportReplicationAction<
@Override
public Response read(StreamInput in) throws IOException {
Response response = newResponseInstance();
response.readFrom(in);
return response;
return newResponseInstance(in);
}
@Override

View File

@ -86,7 +86,7 @@ public abstract class TransportInstanceSingleOperationAction<
protected abstract void shardOperation(Request request, ActionListener<Response> listener);
protected abstract Response newResponse();
protected abstract Response newResponse(StreamInput in) throws IOException;
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
@ -183,9 +183,7 @@ public abstract class TransportInstanceSingleOperationAction<
@Override
public Response read(StreamInput in) throws IOException {
Response response = newResponse();
response.readFrom(in);
return response;
return newResponse(in);
}
@Override

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
@ -56,6 +57,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
@ -89,8 +91,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
@Override
protected UpdateResponse newResponse() {
return new UpdateResponse();
protected UpdateResponse newResponse(StreamInput in) throws IOException {
return new UpdateResponse(in);
}
@Override

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.update;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class UpdateAction extends StreamableResponseActionType<UpdateResponse> {
public class UpdateAction extends ActionType<UpdateResponse> {
public static final UpdateAction INSTANCE = new UpdateAction();
public static final String NAME = "indices:data/write/update";
private UpdateAction() {
super(NAME);
}
@Override
public UpdateResponse newResponse() {
return new UpdateResponse();
super(NAME, UpdateResponse::new);
}
}

View File

@ -39,7 +39,11 @@ public class UpdateResponse extends DocWriteResponse {
private GetResult getResult;
public UpdateResponse() {
public UpdateResponse(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
}
}
/**
@ -69,14 +73,6 @@ public class UpdateResponse extends DocWriteResponse {
return this.result == Result.CREATED ? RestStatus.CREATED : super.status();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -97,8 +97,8 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
}
@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
}
@Override

View File

@ -192,8 +192,8 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
}
@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
}
}

View File

@ -195,6 +195,12 @@ public class RetentionLeaseSyncAction extends
public static final class Response extends ReplicationResponse implements WriteResponse {
public Response() {}
Response(StreamInput in) throws IOException {
super(in);
}
@Override
public void setForcedRefresh(final boolean forcedRefresh) {
// ignore
@ -203,8 +209,8 @@ public class RetentionLeaseSyncAction extends
}
@Override
protected Response newResponseInstance() {
return new Response();
protected Response newResponseInstance(StreamInput in) throws IOException {
return new Response(in);
}
}

View File

@ -226,7 +226,7 @@ public class RetryTests extends ESTestCase {
}
private BulkItemResponse successfulResponse() {
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse());
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse(null, null, null, 0, 0, 0, false));
}
private BulkItemResponse failedResponse() {

View File

@ -1228,6 +1228,10 @@ public class TransportReplicationActionTests extends ESTestCase {
}
static class TestResponse extends ReplicationResponse {
TestResponse(StreamInput in) throws IOException {
super(in);
}
TestResponse() {
setShardInfo(new ShardInfo());
}
@ -1251,8 +1255,8 @@ public class TransportReplicationActionTests extends ESTestCase {
}
@Override
protected TestResponse newResponseInstance() {
return new TestResponse();
protected TestResponse newResponseInstance(StreamInput in) throws IOException {
return new TestResponse(in);
}
@Override

View File

@ -431,7 +431,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
}
@Override
protected Response newResponseInstance() {
protected Response newResponseInstance(StreamInput in) {
return new Response();
}

View File

@ -417,7 +417,7 @@ public class TransportWriteActionTests extends ESTestCase {
@Override
protected TestResponse newResponseInstance() {
protected TestResponse newResponseInstance(StreamInput in) throws IOException {
return new TestResponse();
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
@ -114,7 +115,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
}
@Override
protected Response newResponse() {
protected Response newResponse(StreamInput in) throws IOException {
return new Response();
}

View File

@ -37,15 +37,14 @@ public final class BulkShardOperationsResponse extends ReplicationResponse imple
public BulkShardOperationsResponse() {
}
@Override
public void setForcedRefresh(final boolean forcedRefresh) {
public BulkShardOperationsResponse(StreamInput in) throws IOException {
super(in);
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
public void setForcedRefresh(final boolean forcedRefresh) {
}
@Override

View File

@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SeqNoStats;
@ -190,8 +191,8 @@ public class TransportBulkShardOperationsAction
}
@Override
protected BulkShardOperationsResponse newResponseInstance() {
return new BulkShardOperationsResponse();
protected BulkShardOperationsResponse newResponseInstance(StreamInput in) throws IOException {
return new BulkShardOperationsResponse(in);
}
/**

View File

@ -238,7 +238,7 @@ public class JobResultsPersisterTests extends ESTestCase {
// Take the listener passed to client::index as 2nd argument
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
// Handle the response on the listener
listener.onResponse(new IndexResponse());
listener.onResponse(new IndexResponse(null, null, null, 0, 0, 0, false));
return null;
})
.when(client).index(any(), any(ActionListener.class));

View File

@ -1098,7 +1098,7 @@ public class ExecutionServiceTests extends ESTestCase {
}
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
future.onResponse(new UpdateResponse());
future.onResponse(new UpdateResponse(null, null, null, null, 0, 0, 0, null));
return future;
}).when(client).update(any());