[CCR] Cleanup pause follow action (#34183)

* Change the `TransportPauseFollowAction` to extend from `TransportMasterNodeAction`
  instead of `HandledAction`, this removes a sync cluster state api call.
* Introduced `ResponseHandler` that removes duplicated code in `TransportPauseFollowAction` and
  `TransportResumeFollowAction`.
* Changed `PauseFollowAction.Request` to not use `readFrom()`.
This commit is contained in:
Martijn van Groningen 2018-10-24 08:12:39 +02:00 committed by GitHub
parent 52266d8b11
commit abf8cb6706
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 136 additions and 145 deletions

View File

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
final class ResponseHandler {
private final AtomicInteger counter;
private final AtomicReferenceArray<Object> responses;
private final ActionListener<AcknowledgedResponse> listener;
ResponseHandler(int numRequests, ActionListener<AcknowledgedResponse> listener) {
this.counter = new AtomicInteger(numRequests);
this.responses = new AtomicReferenceArray<>(numRequests);
this.listener = listener;
}
<T> ActionListener<T> getActionListener(final int requestId) {
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
responses.set(requestId, response);
finalizeResponse();
}
@Override
public void onFailure(Exception e) {
responses.set(requestId, e);
finalizeResponse();
}
};
}
private void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Exception) response);
}
}
}
if (error == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
listener.onFailure(error);
}
}
}
}

View File

@ -7,27 +7,27 @@
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {
public class TransportPauseFollowAction extends TransportMasterNodeAction<PauseFollowAction.Request, AcknowledgedResponse> {
private final Client client;
private final PersistentTasksService persistentTasksService;
@Inject
@ -35,86 +35,60 @@ public class TransportPauseFollowAction extends HandledTransportAction<PauseFoll
final Settings settings,
final TransportService transportService,
final ActionFilters actionFilters,
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final IndexNameExpressionResolver indexNameExpressionResolver,
final PersistentTasksService persistentTasksService) {
super(settings, PauseFollowAction.NAME, transportService, actionFilters, PauseFollowAction.Request::new);
this.client = client;
super(settings, PauseFollowAction.NAME, transportService, clusterService, threadPool, actionFilters,
PauseFollowAction.Request::new, indexNameExpressionResolver);
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(
final Task task,
final PauseFollowAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
protected String executor() {
return ThreadPool.Names.SAME;
}
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}
@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
.filter(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
})
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
.collect(Collectors.toList());
@Override
protected void masterOperation(PauseFollowAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}
if (shardFollowTaskIds.isEmpty()) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
.filter(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
})
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
.collect(Collectors.toList());
final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size());
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(shardFollowTaskIds.size());
int i = 0;
if (shardFollowTaskIds.isEmpty()) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}
for (String taskId : shardFollowTaskIds) {
final int shardId = i++;
persistentTasksService.sendRemoveRequest(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
responses.set(shardId, task);
finalizeResponse();
}
int i = 0;
final ResponseHandler responseHandler = new ResponseHandler(shardFollowTaskIds.size(), listener);
for (String taskId : shardFollowTaskIds) {
final int taskSlot = i++;
persistentTasksService.sendRemoveRequest(taskId, responseHandler.getActionListener(taskSlot));
}
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}
if (error == null) {
// include task ids?
listener.onResponse(new AcknowledgedResponse(true));
} else {
// TODO: cancel all started tasks
listener.onFailure(error);
}
}
}
});
}
}, listener::onFailure));
@Override
protected ClusterBlockException checkBlock(PauseFollowAction.Request request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowIndex());
}
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -49,8 +48,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
public class TransportResumeFollowAction extends HandledTransportAction<ResumeFollowAction.Request, AcknowledgedResponse> {
@ -144,62 +141,22 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
IndexMetaData leaderIndexMetadata,
IndexMetaData followIndexMetadata,
String[] leaderIndexHistoryUUIDs,
ActionListener<AcknowledgedResponse> handler) throws IOException {
ActionListener<AcknowledgedResponse> listener) throws IOException {
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService);
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
final ResponseHandler handler = new ResponseHandler(numShards, listener);
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (int i = 0; i < numShards; i++) {
final int shardId = i;
for (int shardId = 0; shardId < numShards; shardId++) {
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
leaderIndexMetadata, followIndexMetadata, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);
finalizeResponse();
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}
if (error == null) {
// include task ids?
handler.onResponse(new AcknowledgedResponse(true));
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
}
}
}
}
);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, handler.getActionListener(shardId));
}
}

View File

@ -31,8 +31,7 @@ public class RestPauseFollowAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request();
request.setFollowIndex(restRequest.param("index"));
Request request = new Request(restRequest.param("index"));
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -357,8 +357,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
}
public void testUnfollowNonExistingIndex() {
PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
unfollowRequest.setFollowIndex("non-existing-index");
PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request("non-existing-index");
expectThrows(IllegalArgumentException.class,
() -> followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).actionGet());
}
@ -750,8 +749,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
private void pauseFollow(String... indices) throws Exception {
for (String index : indices) {
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
unfollowRequest.setFollowIndex(index);
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
}
ensureNoCcrTasks();

View File

@ -52,8 +52,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs + secondBatchNumDocs));
});
PauseFollowAction.Request pauseRequest = new PauseFollowAction.Request();
pauseRequest.setFollowIndex("follower");
PauseFollowAction.Request pauseRequest = new PauseFollowAction.Request("follower");
client().execute(PauseFollowAction.INSTANCE, pauseRequest);
final long thirdBatchNumDocs = randomIntBetween(2, 64);

View File

@ -7,13 +7,14 @@
package org.elasticsearch.xpack.core.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
public class PauseFollowAction extends Action<AcknowledgedResponse> {
@ -29,29 +30,28 @@ public class PauseFollowAction extends Action<AcknowledgedResponse> {
return new AcknowledgedResponse();
}
public static class Request extends ActionRequest {
public static class Request extends MasterNodeRequest<Request> {
private String followIndex;
private final String followIndex;
public Request(String followIndex) {
this.followIndex = Objects.requireNonNull(followIndex, "followIndex");
}
public Request(StreamInput in) throws IOException {
super(in);
this.followIndex = in.readString();
}
public String getFollowIndex() {
return followIndex;
}
public void setFollowIndex(final String followIndex) {
this.followIndex = followIndex;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
followIndex = in.readString();
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);