[CCR] Change resume follow api to be a master node action (#35249)

In order to start shard follow tasks, the resume follow api already
needs execute N requests to the elected master node.

The pause follow API is also a master node action, which would make
how both APIs execute more consistent.
This commit is contained in:
Martijn van Groningen 2018-11-07 07:38:44 +01:00 committed by GitHub
parent 1614107d59
commit 2395e16d84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 12 deletions

View File

@ -8,11 +8,14 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
@ -33,7 +36,6 @@ import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
@ -50,7 +52,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class TransportResumeFollowAction extends HandledTransportAction<ResumeFollowAction.Request, AcknowledgedResponse> {
public class TransportResumeFollowAction extends TransportMasterNodeAction<ResumeFollowAction.Request, AcknowledgedResponse> {
static final ByteSizeValue DEFAULT_MAX_READ_REQUEST_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB);
static final ByteSizeValue DEFAULT_MAX_WRITE_REQUEST_SIZE = new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES);
@ -65,7 +67,6 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final PersistentTasksService persistentTasksService;
private final IndicesService indicesService;
private final CcrLicenseChecker ccrLicenseChecker;
@ -77,28 +78,43 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
final ActionFilters actionFilters,
final Client client,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final PersistentTasksService persistentTasksService,
final IndicesService indicesService,
final CcrLicenseChecker ccrLicenseChecker) {
super(ResumeFollowAction.NAME, transportService, actionFilters, in -> new ResumeFollowAction.Request(in));
super(ResumeFollowAction.NAME, true, transportService, clusterService, threadPool, actionFilters,
ResumeFollowAction.Request::new, indexNameExpressionResolver);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.persistentTasksService = persistentTasksService;
this.indicesService = indicesService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
}
@Override
protected void doExecute(final Task task,
final ResumeFollowAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
@Override
protected ClusterBlockException checkBlock(ResumeFollowAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected void masterOperation(final ResumeFollowAction.Request request,
ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
if (ccrLicenseChecker.isCcrAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}
final ClusterState state = clusterService.state();
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
if (followerIndexMetadata == null) {
listener.onFailure(new IndexNotFoundException(request.getFollowerIndex()));

View File

@ -7,9 +7,9 @@
package org.elasticsearch.xpack.core.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -41,7 +41,7 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
return new AcknowledgedResponse();
}
public static class Request extends ActionRequest implements ToXContentObject {
public static class Request extends MasterNodeRequest<Request> implements ToXContentObject {
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");