[CCR] Removed custom follow and unfollow api's reponse classes with AcknowledgedResponse (#33260)

These response classes did not add any value and in that case just AcknowledgedResponse should be used.

I also changed the formatting of methods to take one line per parameter in
FollowIndexAction.java and UnfollowIndexAction.java files to make
reviewing diffs in the future easier.
This commit is contained in:
Martijn van Groningen 2018-08-31 21:16:06 +07:00 committed by GitHub
parent 5330067033
commit 66b164c2a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 46 deletions

View File

@ -62,7 +62,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
public class FollowIndexAction extends Action<FollowIndexAction.Response> {
public class FollowIndexAction extends Action<AcknowledgedResponse> {
public static final FollowIndexAction INSTANCE = new FollowIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/follow_index";
@ -72,8 +72,8 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
}
@Override
public Response newResponse() {
return new Response();
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
public static class Request extends ActionRequest implements ToXContentObject {
@ -129,9 +129,17 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
private TimeValue retryTimeout;
private TimeValue idleShardRetryDelay;
public Request(String leaderIndex, String followerIndex, Integer maxBatchOperationCount, Integer maxConcurrentReadBatches,
Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches, Integer maxWriteBufferSize,
TimeValue retryTimeout, TimeValue idleShardRetryDelay) {
public Request(
String leaderIndex,
String followerIndex,
Integer maxBatchOperationCount,
Integer maxConcurrentReadBatches,
Long maxOperationSizeInBytes,
Integer maxConcurrentWriteBatches,
Integer maxWriteBufferSize,
TimeValue retryTimeout,
TimeValue idleShardRetryDelay) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader_index is missing");
}
@ -271,22 +279,21 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
@Override
public int hashCode() {
return Objects.hash(leaderIndex, followerIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes,
maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryDelay);
return Objects.hash(
leaderIndex,
followerIndex,
maxBatchOperationCount,
maxConcurrentReadBatches,
maxOperationSizeInBytes,
maxConcurrentWriteBatches,
maxWriteBufferSize,
retryTimeout,
idleShardRetryDelay
);
}
}
public static class Response extends AcknowledgedResponse {
Response() {
}
Response(boolean acknowledged) {
super(acknowledged);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
public static class TransportAction extends HandledTransportAction<Request, AcknowledgedResponse> {
private final Client client;
private final ThreadPool threadPool;
@ -318,7 +325,9 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
}
@Override
protected void doExecute(final Task task, final Request request, final ActionListener<Response> listener) {
protected void doExecute(final Task task,
final Request request,
final ActionListener<AcknowledgedResponse> listener) {
if (ccrLicenseChecker.isCcrAllowed()) {
final String[] indices = new String[]{request.leaderIndex};
final Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false);
@ -337,7 +346,8 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
}
}
private void followLocalIndex(final Request request, final ActionListener<Response> listener) {
private void followLocalIndex(final Request request,
final ActionListener<AcknowledgedResponse> listener) {
final ClusterState state = clusterService.state();
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
// following an index in local cluster, so use local cluster state to fetch leader index metadata
@ -353,7 +363,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
final Request request,
final String clusterAlias,
final String leaderIndex,
final ActionListener<Response> listener) {
final ActionListener<AcknowledgedResponse> listener) {
final ClusterState state = clusterService.state();
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
@ -380,8 +390,13 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
* <li>The leader index and follow index need to have the same number of primary shards</li>
* </ul>
*/
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
ActionListener<Response> handler) throws IOException {
void start(
Request request,
String clusterNameAlias,
IndexMetaData leaderIndexMetadata,
IndexMetaData followIndexMetadata,
ActionListener<AcknowledgedResponse> handler) throws IOException {
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
validate(request, leaderIndexMetadata, followIndexMetadata, mapperService);
final int numShards = followIndexMetadata.getNumberOfShards();
@ -429,7 +444,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
if (error == null) {
// include task ids?
handler.onResponse(new Response(true));
handler.onResponse(new AcknowledgedResponse(true));
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
@ -493,7 +508,9 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
WHITELISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings);
}
static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData followIndex, MapperService followerMapperService) {
static void validate(Request request,
IndexMetaData leaderIndex,
IndexMetaData followIndex, MapperService followerMapperService) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
}

View File

@ -28,7 +28,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class UnfollowIndexAction extends Action<UnfollowIndexAction.Response> {
public class UnfollowIndexAction extends Action<AcknowledgedResponse> {
public static final UnfollowIndexAction INSTANCE = new UnfollowIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/unfollow_index";
@ -38,8 +38,8 @@ public class UnfollowIndexAction extends Action<UnfollowIndexAction.Response> {
}
@Override
public Response newResponse() {
return new Response();
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
public static class Request extends ActionRequest {
@ -72,31 +72,27 @@ public class UnfollowIndexAction extends Action<UnfollowIndexAction.Response> {
}
}
public static class Response extends AcknowledgedResponse {
Response(boolean acknowledged) {
super(acknowledged);
}
Response() {
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
public static class TransportAction extends HandledTransportAction<Request, AcknowledgedResponse> {
private final Client client;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, TransportService transportService,
ActionFilters actionFilters, Client client, PersistentTasksService persistentTasksService) {
public TransportAction(Settings settings,
TransportService transportService,
ActionFilters actionFilters,
Client client,
PersistentTasksService persistentTasksService) {
super(settings, NAME, transportService, actionFilters, Request::new);
this.client = client;
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
protected void doExecute(Task task,
Request request,
ActionListener<AcknowledgedResponse> listener) {
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.followIndex);
if (followIndexMetadata == null) {
@ -140,7 +136,7 @@ public class UnfollowIndexAction extends Action<UnfollowIndexAction.Response> {
if (error == null) {
// include task ids?
listener.onResponse(new Response(true));
listener.onResponse(new AcknowledgedResponse(true));
} else {
// TODO: cancel all started tasks
listener.onFailure(error);

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ccr;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
@ -36,9 +37,9 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
client().execute(
FollowIndexAction.INSTANCE,
followRequest,
new ActionListener<FollowIndexAction.Response>() {
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(final FollowIndexAction.Response response) {
public void onResponse(final AcknowledgedResponse response) {
fail();
}