diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 179c4f1c438..17b7bbe674b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -62,7 +62,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; -public class FollowIndexAction extends Action { +public class FollowIndexAction extends Action { public static final FollowIndexAction INSTANCE = new FollowIndexAction(); public static final String NAME = "cluster:admin/xpack/ccr/follow_index"; @@ -72,8 +72,8 @@ public class FollowIndexAction extends Action { } @Override - public Response newResponse() { - return new Response(); + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); } public static class Request extends ActionRequest implements ToXContentObject { @@ -129,9 +129,17 @@ public class FollowIndexAction extends Action { private TimeValue retryTimeout; private TimeValue idleShardRetryDelay; - public Request(String leaderIndex, String followerIndex, Integer maxBatchOperationCount, Integer maxConcurrentReadBatches, - Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches, Integer maxWriteBufferSize, - TimeValue retryTimeout, TimeValue idleShardRetryDelay) { + public Request( + String leaderIndex, + String followerIndex, + Integer maxBatchOperationCount, + Integer maxConcurrentReadBatches, + Long maxOperationSizeInBytes, + Integer maxConcurrentWriteBatches, + Integer maxWriteBufferSize, + TimeValue retryTimeout, + TimeValue idleShardRetryDelay) { + if (leaderIndex == null) { throw new IllegalArgumentException("leader_index is missing"); } @@ -271,22 +279,21 @@ public class FollowIndexAction extends Action { @Override public int hashCode() { - return Objects.hash(leaderIndex, followerIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes, - maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryDelay); + return Objects.hash( + leaderIndex, + followerIndex, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxOperationSizeInBytes, + maxConcurrentWriteBatches, + maxWriteBufferSize, + retryTimeout, + idleShardRetryDelay + ); } } - public static class Response extends AcknowledgedResponse { - - Response() { - } - - Response(boolean acknowledged) { - super(acknowledged); - } - } - - public static class TransportAction extends HandledTransportAction { + public static class TransportAction extends HandledTransportAction { private final Client client; private final ThreadPool threadPool; @@ -318,7 +325,9 @@ public class FollowIndexAction extends Action { } @Override - protected void doExecute(final Task task, final Request request, final ActionListener listener) { + protected void doExecute(final Task task, + final Request request, + final ActionListener listener) { if (ccrLicenseChecker.isCcrAllowed()) { final String[] indices = new String[]{request.leaderIndex}; final Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); @@ -337,7 +346,8 @@ public class FollowIndexAction extends Action { } } - private void followLocalIndex(final Request request, final ActionListener listener) { + private void followLocalIndex(final Request request, + final ActionListener listener) { final ClusterState state = clusterService.state(); final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); // following an index in local cluster, so use local cluster state to fetch leader index metadata @@ -353,7 +363,7 @@ public class FollowIndexAction extends Action { final Request request, final String clusterAlias, final String leaderIndex, - final ActionListener listener) { + final ActionListener listener) { final ClusterState state = clusterService.state(); final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( @@ -380,8 +390,13 @@ public class FollowIndexAction extends Action { *
  • The leader index and follow index need to have the same number of primary shards
  • * */ - void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, - ActionListener handler) throws IOException { + void start( + Request request, + String clusterNameAlias, + IndexMetaData leaderIndexMetadata, + IndexMetaData followIndexMetadata, + ActionListener handler) throws IOException { + MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null; validate(request, leaderIndexMetadata, followIndexMetadata, mapperService); final int numShards = followIndexMetadata.getNumberOfShards(); @@ -429,7 +444,7 @@ public class FollowIndexAction extends Action { if (error == null) { // include task ids? - handler.onResponse(new Response(true)); + handler.onResponse(new AcknowledgedResponse(true)); } else { // TODO: cancel all started tasks handler.onFailure(error); @@ -493,7 +508,9 @@ public class FollowIndexAction extends Action { WHITELISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings); } - static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData followIndex, MapperService followerMapperService) { + static void validate(Request request, + IndexMetaData leaderIndex, + IndexMetaData followIndex, MapperService followerMapperService) { if (leaderIndex == null) { throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java index f671d59cfe4..93b2bcc3e40 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java @@ -28,7 +28,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; -public class UnfollowIndexAction extends Action { +public class UnfollowIndexAction extends Action { public static final UnfollowIndexAction INSTANCE = new UnfollowIndexAction(); public static final String NAME = "cluster:admin/xpack/ccr/unfollow_index"; @@ -38,8 +38,8 @@ public class UnfollowIndexAction extends Action { } @Override - public Response newResponse() { - return new Response(); + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); } public static class Request extends ActionRequest { @@ -72,31 +72,27 @@ public class UnfollowIndexAction extends Action { } } - public static class Response extends AcknowledgedResponse { - - Response(boolean acknowledged) { - super(acknowledged); - } - - Response() { - } - } - - public static class TransportAction extends HandledTransportAction { + public static class TransportAction extends HandledTransportAction { private final Client client; private final PersistentTasksService persistentTasksService; @Inject - public TransportAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, Client client, PersistentTasksService persistentTasksService) { + public TransportAction(Settings settings, + TransportService transportService, + ActionFilters actionFilters, + Client client, + PersistentTasksService persistentTasksService) { super(settings, NAME, transportService, actionFilters, Request::new); this.client = client; this.persistentTasksService = persistentTasksService; } @Override - protected void doExecute(Task task, Request request, ActionListener listener) { + protected void doExecute(Task task, + Request request, + ActionListener listener) { + client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> { IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.followIndex); if (followIndexMetadata == null) { @@ -140,7 +136,7 @@ public class UnfollowIndexAction extends Action { if (error == null) { // include task ids? - listener.onResponse(new Response(true)); + listener.onResponse(new AcknowledgedResponse(true)); } else { // TODO: cancel all started tasks listener.onFailure(error); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 87772d0c150..675758903bf 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -36,9 +37,9 @@ public class CcrLicenseIT extends ESSingleNodeTestCase { client().execute( FollowIndexAction.INSTANCE, followRequest, - new ActionListener() { + new ActionListener() { @Override - public void onResponse(final FollowIndexAction.Response response) { + public void onResponse(final AcknowledgedResponse response) { fail(); }