diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index d3f184b0f47..3164c2522fd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -8,11 +8,14 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; @@ -33,7 +36,6 @@ import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.persistent.PersistentTasksService; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.Ccr; @@ -50,7 +52,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -public class TransportResumeFollowAction extends HandledTransportAction { +public class TransportResumeFollowAction extends TransportMasterNodeAction { static final ByteSizeValue DEFAULT_MAX_READ_REQUEST_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB); static final ByteSizeValue DEFAULT_MAX_WRITE_REQUEST_SIZE = new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES); @@ -65,7 +67,6 @@ public class TransportResumeFollowAction extends HandledTransportAction new ResumeFollowAction.Request(in)); + super(ResumeFollowAction.NAME, true, transportService, clusterService, threadPool, actionFilters, + ResumeFollowAction.Request::new, indexNameExpressionResolver); this.client = client; this.threadPool = threadPool; - this.clusterService = clusterService; this.persistentTasksService = persistentTasksService; this.indicesService = indicesService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker); } @Override - protected void doExecute(final Task task, - final ResumeFollowAction.Request request, - final ActionListener listener) { + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected ClusterBlockException checkBlock(ResumeFollowAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void masterOperation(final ResumeFollowAction.Request request, + ClusterState state, + final ActionListener listener) throws Exception { if (ccrLicenseChecker.isCcrAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; } - final ClusterState state = clusterService.state(); final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); if (followerIndexMetadata == null) { listener.onFailure(new IndexNotFoundException(request.getFollowerIndex())); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java index 913b5e6b0a5..41728928e09 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java @@ -7,9 +7,9 @@ package org.elasticsearch.xpack.core.ccr.action; import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -41,7 +41,7 @@ public final class ResumeFollowAction extends Action { return new AcknowledgedResponse(); } - public static class Request extends ActionRequest implements ToXContentObject { + public static class Request extends MasterNodeRequest implements ToXContentObject { static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index"); static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");