Convert index and persistent actions/response to writeable (#44582) (#44601)

This commit converts several more classes from streamable to writeable
in server, mostly within the o.e.index and o.e.persistent packages.

relates #34389
This commit is contained in:
Ryan Ernst 2019-07-18 18:32:09 -07:00 committed by GitHub
parent 03f5084ac7
commit 13f46aa801
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 122 additions and 174 deletions

View File

@ -18,20 +18,15 @@
*/ */
package org.elasticsearch.plugin.noop.action.bulk; package org.elasticsearch.plugin.noop.action.bulk;
import org.elasticsearch.action.StreamableResponseActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
public class NoopBulkAction extends StreamableResponseActionType<BulkResponse> { public class NoopBulkAction extends ActionType<BulkResponse> {
public static final String NAME = "mock:data/write/bulk"; public static final String NAME = "mock:data/write/bulk";
public static final NoopBulkAction INSTANCE = new NoopBulkAction(); public static final NoopBulkAction INSTANCE = new NoopBulkAction();
private NoopBulkAction() { private NoopBulkAction() {
super(NAME); super(NAME, BulkResponse::new);
}
@Override
public BulkResponse newResponse() {
return new BulkResponse(null, 0);
} }
} }

View File

@ -38,7 +38,7 @@ public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest,
@Inject @Inject
public TransportNoopBulkAction(TransportService transportService, ActionFilters actionFilters) { public TransportNoopBulkAction(TransportService transportService, ActionFilters actionFilters) {
super(NoopBulkAction.NAME, transportService, BulkRequest::new, actionFilters); super(NoopBulkAction.NAME, transportService, actionFilters, BulkRequest::new);
} }
@Override @Override

View File

@ -170,7 +170,7 @@ public class BulkRequestWithGlobalParametersIT extends ESRestHighLevelClientTest
public void testGlobalRouting() throws IOException { public void testGlobalRouting() throws IOException {
createIndexWithMultipleShards("index"); createIndexWithMultipleShards("index");
BulkRequest request = new BulkRequest(null); BulkRequest request = new BulkRequest((String) null);
request.add(new IndexRequest("index").id("1") request.add(new IndexRequest("index").id("1")
.source(XContentType.JSON, "field", "bulk1")); .source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index").id("2") request.add(new IndexRequest("index").id("2")
@ -186,7 +186,7 @@ public class BulkRequestWithGlobalParametersIT extends ESRestHighLevelClientTest
} }
public void testMixLocalAndGlobalRouting() throws IOException { public void testMixLocalAndGlobalRouting() throws IOException {
BulkRequest request = new BulkRequest(null); BulkRequest request = new BulkRequest((String) null);
request.routing("globalRouting"); request.routing("globalRouting");
request.add(new IndexRequest("index").id("1") request.add(new IndexRequest("index").id("1")
.source(XContentType.JSON, "field", "bulk1")); .source(XContentType.JSON, "field", "bulk1"));

View File

@ -19,22 +19,17 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import org.elasticsearch.action.StreamableResponseActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
public class BulkAction extends StreamableResponseActionType<BulkResponse> { public class BulkAction extends ActionType<BulkResponse> {
public static final BulkAction INSTANCE = new BulkAction(); public static final BulkAction INSTANCE = new BulkAction();
public static final String NAME = "indices:data/write/bulk"; public static final String NAME = "indices:data/write/bulk";
private BulkAction() { private BulkAction() {
super(NAME); super(NAME, BulkResponse::new);
}
@Override
public BulkResponse newResponse() {
return new BulkResponse();
} }
@Override @Override

View File

@ -78,7 +78,17 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private long sizeInBytes = 0; private long sizeInBytes = 0;
public BulkRequest() { public BulkRequest() {}
public BulkRequest(StreamInput in) throws IOException {
super(in);
waitForActiveShards = ActiveShardCount.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
requests.add(DocWriteRequest.readDocumentRequest(in));
}
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = in.readTimeValue();
} }
public BulkRequest(@Nullable String globalIndex) { public BulkRequest(@Nullable String globalIndex) {
@ -391,14 +401,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
waitForActiveShards = ActiveShardCount.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
requests.add(DocWriteRequest.readDocumentRequest(in));
}
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = in.readTimeValue();
} }
@Override @Override

View File

@ -56,7 +56,16 @@ public class BulkResponse extends ActionResponse implements Iterable<BulkItemRes
private long tookInMillis; private long tookInMillis;
private long ingestTookInMillis; private long ingestTookInMillis;
BulkResponse() { BulkResponse() {}
public BulkResponse(StreamInput in) throws IOException {
super(in);
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = BulkItemResponse.readBulkItem(in);
}
tookInMillis = in.readVLong();
ingestTookInMillis = in.readZLong();
} }
public BulkResponse(BulkItemResponse[] responses, long tookInMillis) { public BulkResponse(BulkItemResponse[] responses, long tookInMillis) {
@ -131,13 +140,7 @@ public class BulkResponse extends ActionResponse implements Iterable<BulkItemRes
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = BulkItemResponse.readBulkItem(in);
}
tookInMillis = in.readVLong();
ingestTookInMillis = in.readZLong();
} }
@Override @Override

View File

@ -116,7 +116,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
TransportShardBulkAction shardBulkAction, NodeClient client, TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) { AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(BulkAction.NAME, transportService, BulkRequest::new, actionFilters, ThreadPool.Names.WRITE); super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.WRITE);
Objects.requireNonNull(relativeTimeProvider); Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;

View File

@ -64,17 +64,6 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
new TransportHandler()); new TransportHandler());
} }
/**
* @deprecated Use {@link #HandledTransportAction(String, boolean, TransportService, ActionFilters, Writeable.Reader, String)} instead.
*/
@Deprecated
protected HandledTransportAction(String actionName, TransportService transportService, Supplier<Request> request,
ActionFilters actionFilters, String executor) {
super(actionName, actionFilters, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, request, executor, false, true,
new TransportHandler());
}
protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker, protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
TransportService transportService, ActionFilters actionFilters, TransportService transportService, ActionFilters actionFilters,
Writeable.Reader<Request> requestReader) { Writeable.Reader<Request> requestReader) {

View File

@ -115,10 +115,18 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
*/ */
private int slices = DEFAULT_SLICES; private int slices = DEFAULT_SLICES;
/** public AbstractBulkByScrollRequest(StreamInput in) throws IOException {
* Constructor for deserialization. super(in);
*/ searchRequest = new SearchRequest(in);
public AbstractBulkByScrollRequest() { abortOnVersionConflict = in.readBoolean();
maxDocs = in.readVInt();
refresh = in.readBoolean();
timeout = in.readTimeValue();
activeShardCount = ActiveShardCount.readFrom(in);
retryBackoffInitialTime = in.readTimeValue();
maxRetries = in.readVInt();
requestsPerSecond = in.readFloat();
slices = in.readVInt();
} }
/** /**
@ -449,17 +457,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
searchRequest = new SearchRequest(in);
abortOnVersionConflict = in.readBoolean();
maxDocs = in.readVInt();
refresh = in.readBoolean();
timeout = in.readTimeValue();
activeShardCount = ActiveShardCount.readFrom(in);
retryBackoffInitialTime = in.readTimeValue();
maxRetries = in.readVInt();
requestsPerSecond = in.readFloat();
slices = in.readVInt();
} }
@Override @Override

View File

@ -35,10 +35,11 @@ public abstract class AbstractBulkIndexByScrollRequest<Self extends AbstractBulk
*/ */
private Script script; private Script script;
/** public AbstractBulkIndexByScrollRequest(StreamInput in) throws IOException {
* Constructor for deserialization. super(in);
*/ if (in.readBoolean()) {
public AbstractBulkIndexByScrollRequest() { script = new Script(in);
}
} }
/** /**
@ -74,10 +75,7 @@ public abstract class AbstractBulkIndexByScrollRequest<Self extends AbstractBulk
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
if (in.readBoolean()) {
script = new Script(in);
}
} }
@Override @Override

View File

@ -66,7 +66,7 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest<DeleteByQu
} }
public DeleteByQueryRequest(StreamInput in) throws IOException { public DeleteByQueryRequest(StreamInput in) throws IOException {
super.readFrom(in); super(in);
} }
private DeleteByQueryRequest(SearchRequest search, boolean setDefaults) { private DeleteByQueryRequest(SearchRequest search, boolean setDefaults) {

View File

@ -86,7 +86,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
} }
public ReindexRequest(StreamInput in) throws IOException { public ReindexRequest(StreamInput in) throws IOException {
super.readFrom(in); super(in);
destination = new IndexRequest(in); destination = new IndexRequest(in);
remoteInfo = in.readOptionalWriteable(RemoteInfo::new); remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
} }

View File

@ -57,7 +57,7 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<Updat
} }
public UpdateByQueryRequest(StreamInput in) throws IOException { public UpdateByQueryRequest(StreamInput in) throws IOException {
super.readFrom(in); super(in);
pipeline = in.readOptionalString(); pipeline = in.readOptionalString();
} }

View File

@ -22,7 +22,7 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.StreamableResponseActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
@ -123,13 +123,13 @@ public class RetentionLeaseActions {
} }
public static class Add extends StreamableResponseActionType<Response> { public static class Add extends ActionType<Response> {
public static final Add INSTANCE = new Add(); public static final Add INSTANCE = new Add();
public static final String ACTION_NAME = "indices:admin/seq_no/add_retention_lease"; public static final String ACTION_NAME = "indices:admin/seq_no/add_retention_lease";
private Add() { private Add() {
super(ACTION_NAME); super(ACTION_NAME, Response::new);
} }
public static class TransportAction extends TransportRetentionLeaseAction<AddRequest> { public static class TransportAction extends TransportRetentionLeaseAction<AddRequest> {
@ -168,21 +168,15 @@ public class RetentionLeaseActions {
} }
} }
@Override
public Response newResponse() {
return new Response();
}
} }
public static class Renew extends StreamableResponseActionType<Response> { public static class Renew extends ActionType<Response> {
public static final Renew INSTANCE = new Renew(); public static final Renew INSTANCE = new Renew();
public static final String ACTION_NAME = "indices:admin/seq_no/renew_retention_lease"; public static final String ACTION_NAME = "indices:admin/seq_no/renew_retention_lease";
private Renew() { private Renew() {
super(ACTION_NAME); super(ACTION_NAME, Response::new);
} }
public static class TransportAction extends TransportRetentionLeaseAction<RenewRequest> { public static class TransportAction extends TransportRetentionLeaseAction<RenewRequest> {
@ -214,21 +208,15 @@ public class RetentionLeaseActions {
} }
} }
@Override
public Response newResponse() {
return new Response();
}
} }
public static class Remove extends StreamableResponseActionType<Response> { public static class Remove extends ActionType<Response> {
public static final Remove INSTANCE = new Remove(); public static final Remove INSTANCE = new Remove();
public static final String ACTION_NAME = "indices:admin/seq_no/remove_retention_lease"; public static final String ACTION_NAME = "indices:admin/seq_no/remove_retention_lease";
private Remove() { private Remove() {
super(ACTION_NAME); super(ACTION_NAME, Response::new);
} }
public static class TransportAction extends TransportRetentionLeaseAction<RemoveRequest> { public static class TransportAction extends TransportRetentionLeaseAction<RemoveRequest> {
@ -261,12 +249,6 @@ public class RetentionLeaseActions {
} }
} }
@Override
public Response newResponse() {
return new Response();
}
} }
private abstract static class Request<T extends SingleShardRequest<T>> extends SingleShardRequest<T> { private abstract static class Request<T extends SingleShardRequest<T>> extends SingleShardRequest<T> {
@ -385,8 +367,7 @@ public class RetentionLeaseActions {
public static class Response extends ActionResponse { public static class Response extends ActionResponse {
public Response() { public Response() {}
}
Response(final StreamInput in) throws IOException { Response(final StreamInput in) throws IOException {
super(in); super(in);

View File

@ -20,11 +20,11 @@ package org.elasticsearch.persistent;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.StreamableResponseActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.StreamableTransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -46,18 +46,13 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* ActionType that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be * ActionType that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be
* removed from the cluster state in case of successful completion or restarted on some other node in case of failure. * removed from the cluster state in case of successful completion or restarted on some other node in case of failure.
*/ */
public class CompletionPersistentTaskAction extends StreamableResponseActionType<PersistentTaskResponse> { public class CompletionPersistentTaskAction extends ActionType<PersistentTaskResponse> {
public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction(); public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/completion"; public static final String NAME = "cluster:admin/persistent/completion";
private CompletionPersistentTaskAction() { private CompletionPersistentTaskAction() {
super(NAME); super(NAME, PersistentTaskResponse::new);
}
@Override
public PersistentTaskResponse newResponse() {
return new PersistentTaskResponse();
} }
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
@ -127,7 +122,7 @@ public class CompletionPersistentTaskAction extends StreamableResponseActionType
} }
} }
public static class TransportAction extends StreamableTransportMasterNodeAction<Request, PersistentTaskResponse> { public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
private final PersistentTasksClusterService persistentTasksClusterService; private final PersistentTasksClusterService persistentTasksClusterService;
@ -147,8 +142,8 @@ public class CompletionPersistentTaskAction extends StreamableResponseActionType
} }
@Override @Override
protected PersistentTaskResponse newResponse() { protected PersistentTaskResponse read(StreamInput in) throws IOException {
return new PersistentTaskResponse(); return new PersistentTaskResponse(in);
} }
@Override @Override

View File

@ -32,8 +32,9 @@ import java.util.Objects;
public class PersistentTaskResponse extends ActionResponse { public class PersistentTaskResponse extends ActionResponse {
private PersistentTask<?> task; private PersistentTask<?> task;
public PersistentTaskResponse() { public PersistentTaskResponse(StreamInput in) throws IOException {
super(); super(in);
task = in.readOptionalWriteable(PersistentTask::new);
} }
public PersistentTaskResponse(PersistentTask<?> task) { public PersistentTaskResponse(PersistentTask<?> task) {
@ -42,8 +43,7 @@ public class PersistentTaskResponse extends ActionResponse {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
task = in.readOptionalWriteable(PersistentTask::new);
} }
@Override @Override

View File

@ -20,11 +20,11 @@ package org.elasticsearch.persistent;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.StreamableResponseActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.StreamableTransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -40,18 +40,13 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
public class RemovePersistentTaskAction extends StreamableResponseActionType<PersistentTaskResponse> { public class RemovePersistentTaskAction extends ActionType<PersistentTaskResponse> {
public static final RemovePersistentTaskAction INSTANCE = new RemovePersistentTaskAction(); public static final RemovePersistentTaskAction INSTANCE = new RemovePersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/remove"; public static final String NAME = "cluster:admin/persistent/remove";
private RemovePersistentTaskAction() { private RemovePersistentTaskAction() {
super(NAME); super(NAME, PersistentTaskResponse::new);
}
@Override
public PersistentTaskResponse newResponse() {
return new PersistentTaskResponse();
} }
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
@ -112,7 +107,7 @@ public class RemovePersistentTaskAction extends StreamableResponseActionType<Per
} }
public static class TransportAction extends StreamableTransportMasterNodeAction<Request, PersistentTaskResponse> { public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
private final PersistentTasksClusterService persistentTasksClusterService; private final PersistentTasksClusterService persistentTasksClusterService;
@ -132,8 +127,8 @@ public class RemovePersistentTaskAction extends StreamableResponseActionType<Per
} }
@Override @Override
protected PersistentTaskResponse newResponse() { protected PersistentTaskResponse read(StreamInput in) throws IOException {
return new PersistentTaskResponse(); return new PersistentTaskResponse(in);
} }
@Override @Override

View File

@ -21,11 +21,11 @@ package org.elasticsearch.persistent;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.StreamableResponseActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.StreamableTransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -47,18 +47,13 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/** /**
* This action can be used to add the record for the persistent action to the cluster state. * This action can be used to add the record for the persistent action to the cluster state.
*/ */
public class StartPersistentTaskAction extends StreamableResponseActionType<PersistentTaskResponse> { public class StartPersistentTaskAction extends ActionType<PersistentTaskResponse> {
public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction(); public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/start"; public static final String NAME = "cluster:admin/persistent/start";
private StartPersistentTaskAction() { private StartPersistentTaskAction() {
super(NAME); super(NAME, PersistentTaskResponse::new);
}
@Override
public PersistentTaskResponse newResponse() {
return new PersistentTaskResponse();
} }
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
@ -183,7 +178,7 @@ public class StartPersistentTaskAction extends StreamableResponseActionType<Pers
} }
public static class TransportAction extends StreamableTransportMasterNodeAction<Request, PersistentTaskResponse> { public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
private final PersistentTasksClusterService persistentTasksClusterService; private final PersistentTasksClusterService persistentTasksClusterService;
@ -208,8 +203,8 @@ public class StartPersistentTaskAction extends StreamableResponseActionType<Pers
} }
@Override @Override
protected PersistentTaskResponse newResponse() { protected PersistentTaskResponse read(StreamInput in) throws IOException {
return new PersistentTaskResponse(); return new PersistentTaskResponse(in);
} }
@Override @Override

View File

@ -20,11 +20,11 @@ package org.elasticsearch.persistent;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.StreamableResponseActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.StreamableTransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -42,18 +42,13 @@ import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.action.ValidateActions.addValidationError;
public class UpdatePersistentTaskStatusAction extends StreamableResponseActionType<PersistentTaskResponse> { public class UpdatePersistentTaskStatusAction extends ActionType<PersistentTaskResponse> {
public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction(); public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction();
public static final String NAME = "cluster:admin/persistent/update_status"; public static final String NAME = "cluster:admin/persistent/update_status";
private UpdatePersistentTaskStatusAction() { private UpdatePersistentTaskStatusAction() {
super(NAME); super(NAME, PersistentTaskResponse::new);
}
@Override
public PersistentTaskResponse newResponse() {
return new PersistentTaskResponse();
} }
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
@ -143,7 +138,7 @@ public class UpdatePersistentTaskStatusAction extends StreamableResponseActionTy
} }
} }
public static class TransportAction extends StreamableTransportMasterNodeAction<Request, PersistentTaskResponse> { public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
private final PersistentTasksClusterService persistentTasksClusterService; private final PersistentTasksClusterService persistentTasksClusterService;
@ -163,8 +158,8 @@ public class UpdatePersistentTaskStatusAction extends StreamableResponseActionTy
} }
@Override @Override
protected PersistentTaskResponse newResponse() { protected PersistentTaskResponse read(StreamInput in) throws IOException {
return new PersistentTaskResponse(); return new PersistentTaskResponse(in);
} }
@Override @Override

View File

@ -30,7 +30,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.StreamableTransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
@ -506,9 +506,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
new TransportResponseHandler<UpdateIndexShardSnapshotStatusResponse>() { new TransportResponseHandler<UpdateIndexShardSnapshotStatusResponse>() {
@Override @Override
public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
final UpdateIndexShardSnapshotStatusResponse response = new UpdateIndexShardSnapshotStatusResponse(); return new UpdateIndexShardSnapshotStatusResponse(in);
response.readFrom(in);
return response;
} }
@Override @Override
@ -606,12 +604,19 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
} }
static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {
UpdateIndexShardSnapshotStatusResponse() {}
UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException {
super(in);
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException {} public void writeTo(StreamOutput out) throws IOException {}
} }
private class UpdateSnapshotStatusAction private class UpdateSnapshotStatusAction
extends StreamableTransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> { extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super( super(
@ -626,8 +631,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
} }
@Override @Override
protected UpdateIndexShardSnapshotStatusResponse newResponse() { protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
return new UpdateIndexShardSnapshotStatusResponse(); return new UpdateIndexShardSnapshotStatusResponse(in);
} }
@Override @Override

View File

@ -20,14 +20,15 @@ package org.elasticsearch.persistent;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.Collections; import java.util.Collections;
public class PersistentTasksExecutorResponseTests extends AbstractStreamableTestCase<PersistentTaskResponse> { public class PersistentTasksExecutorResponseTests extends AbstractWireSerializingTestCase<PersistentTaskResponse> {
@Override @Override
protected PersistentTaskResponse createTestInstance() { protected PersistentTaskResponse createTestInstance() {
@ -37,13 +38,13 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
new TestPersistentTasksPlugin.TestParams("test"), new TestPersistentTasksPlugin.TestParams("test"),
randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT)); randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
} else { } else {
return new PersistentTaskResponse(null); return new PersistentTaskResponse((PersistentTask<?>) null);
} }
} }
@Override @Override
protected PersistentTaskResponse createBlankInstance() { protected Writeable.Reader<PersistentTaskResponse> instanceReader() {
return new PersistentTaskResponse(); return PersistentTaskResponse::new;
} }
@Override @Override

View File

@ -42,7 +42,14 @@ public final class PutWatchRequest extends ActionRequest {
public PutWatchRequest() {} public PutWatchRequest() {}
public PutWatchRequest(StreamInput in) throws IOException { public PutWatchRequest(StreamInput in) throws IOException {
readFrom(in); super(in);
id = in.readString();
source = in.readBytesReference();
active = in.readBoolean();
xContentType = in.readEnum(XContentType.class);
version = in.readZLong();
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} }
public PutWatchRequest(String id, BytesReference source, XContentType xContentType) { public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
@ -53,14 +60,7 @@ public final class PutWatchRequest extends ActionRequest {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
id = in.readString();
source = in.readBytesReference();
active = in.readBoolean();
xContentType = in.readEnum(XContentType.class);
version = in.readZLong();
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} }
@Override @Override