Adjusts transport actions in CCR

This commit adjusts the ccr’s actions accordingly to the recent changes
in the upstream.
This commit is contained in:
Nhat Nguyen 2018-06-23 17:53:18 -04:00
parent 08ee9b67c5
commit 2c56df631d
3 changed files with 10 additions and 11 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
@ -172,6 +173,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final RemoteClusterService remoteClusterService;
private final PersistentTasksService persistentTasksService;
@ -181,8 +183,9 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
Client client, ClusterService clusterService, PersistentTasksService persistentTasksService,
IndicesService indicesService) {
super(settings, NAME, threadPool, transportService, actionFilters, Request::new);
super(settings, NAME, transportService, actionFilters, Request::new);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.persistentTasksService = persistentTasksService;
@ -190,7 +193,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
ClusterState localClusterState = clusterService.state();
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex);

View File

@ -13,7 +13,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
@ -430,10 +429,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new FilterClient(client) {
@Override
protected <
Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>>
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) {

View File

@ -21,7 +21,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -88,15 +88,15 @@ public class UnfollowIndexAction extends Action<UnfollowIndexAction.Response> {
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
public TransportAction(Settings settings, TransportService transportService,
ActionFilters actionFilters, Client client, PersistentTasksService persistentTasksService) {
super(settings, NAME, threadPool, transportService, actionFilters, Request::new);
super(settings, NAME, transportService, actionFilters, Request::new);
this.client = client;
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.followIndex);
if (followIndexMetadata == null) {