diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 8e72e6d5768..2ca42ff85ab 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; @@ -45,14 +46,19 @@ final class RemoteClusterAwareClient extends AbstractClient { protected void doExecute(Action action, Request request, ActionListener listener) { remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { - Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); + Transport.Connection connection; + if (request instanceof RemoteClusterAwareRequest) { + DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); + connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); + } else { + connection = remoteClusterService.getConnection(clusterAlias); + } service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, action.getResponseReader())); }, listener::onFailure)); } - @Override public void close() { // do nothing diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java new file mode 100644 index 00000000000..b708240f6da --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.node.DiscoveryNode; + +public interface RemoteClusterAwareRequest { + + /** + * Returns the preferred discovery node for this request. The remote cluster client will attempt to send + * this request directly to this node. Otherwise, it will send the request as a proxy action that will + * be routed by the remote cluster to this node. + * + * @return preferred discovery node + */ + DiscoveryNode getPreferredTargetNode(); + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 33b8b415d83..81cde2984f5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -7,24 +7,19 @@ package org.elasticsearch.xpack.ccr.action.repositories; import org.elasticsearch.action.Action; -import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.nodes.BaseNodeResponse; -import org.elasticsearch.action.support.nodes.BaseNodesResponse; -import org.elasticsearch.action.support.nodes.TransportNodesAction; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; -import java.io.IOException; -import java.util.List; - public class ClearCcrRestoreSessionAction extends Action { public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(); @@ -36,86 +31,47 @@ public class ClearCcrRestoreSessionAction extends Action { + @Override + public Writeable.Reader getResponseReader() { + return ClearCcrRestoreSessionResponse::new; + } + + public static class TransportDeleteCcrRestoreSessionAction + extends HandledTransportAction { private final CcrRestoreSourceService ccrRestoreService; + private final ThreadPool threadPool; @Inject - public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, - TransportService transportService, CcrRestoreSourceService ccrRestoreService) { - super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, - ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class); + public TransportDeleteCcrRestoreSessionAction(ActionFilters actionFilters, TransportService transportService, + CcrRestoreSourceService ccrRestoreService) { + super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new); + TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new); this.ccrRestoreService = ccrRestoreService; + this.threadPool = transportService.getThreadPool(); } @Override - protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List responses, - List failures) { - return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures); - } - - @Override - protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) { - return request.getRequest(); - } - - @Override - protected Response newNodeResponse() { - return new Response(); - } - - @Override - protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) { - ccrRestoreService.closeSession(request.getSessionUUID()); - return new Response(clusterService.localNode()); + protected void doExecute(Task task, ClearCcrRestoreSessionRequest request, + ActionListener listener) { + // TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch + // may be unnecessary when we remove these callbacks. + threadPool.generic().execute(() -> { + ccrRestoreService.closeSession(request.getSessionUUID()); + listener.onResponse(new ClearCcrRestoreSessionResponse()); + }); } } - public static class Response extends BaseNodeResponse { - - private Response() { - } - - private Response(StreamInput in) throws IOException { - readFrom(in); - } - - private Response(DiscoveryNode node) { - super(node); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - } - - public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse { + public static class ClearCcrRestoreSessionResponse extends ActionResponse { ClearCcrRestoreSessionResponse() { } - ClearCcrRestoreSessionResponse(ClusterName clusterName, List chunkResponses, List failures) { - super(clusterName, chunkResponses, failures); - } - - @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readList(Response::new); - } - - @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeList(nodes); + ClearCcrRestoreSessionResponse(StreamInput in) { } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index 11605970736..b9d277ca1b4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -6,68 +6,52 @@ package org.elasticsearch.xpack.ccr.action.repositories; -import org.elasticsearch.action.support.nodes.BaseNodeRequest; -import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.transport.RemoteClusterAwareRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; -public class ClearCcrRestoreSessionRequest extends BaseNodesRequest { +public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest { - private Request request; + private DiscoveryNode node; + private String sessionUUID; - ClearCcrRestoreSessionRequest() { + ClearCcrRestoreSessionRequest(StreamInput in) throws IOException { + super.readFrom(in); + sessionUUID = in.readString(); } - public ClearCcrRestoreSessionRequest(String nodeId, Request request) { - super(nodeId); - this.request = request; + public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node) { + this.sessionUUID = sessionUUID; + this.node = node; } @Override - public void readFrom(StreamInput streamInput) throws IOException { - super.readFrom(streamInput); - request = new Request(); - request.readFrom(streamInput); + public ActionRequestValidationException validate() { + return null; } @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - super.writeTo(streamOutput); - request.writeTo(streamOutput); + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException(); } - public Request getRequest() { - return request; + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); } - public static class Request extends BaseNodeRequest { + String getSessionUUID() { + return sessionUUID; + } - private String sessionUUID; - - Request() { - } - - public Request(String nodeId, String sessionUUID) { - super(nodeId); - this.sessionUUID = sessionUUID; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - sessionUUID = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(sessionUUID); - } - - public String getSessionUUID() { - return sessionUUID; - } + @Override + public DiscoveryNode getPreferredTargetNode() { + return node; } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 7f362aa3b76..2a1b354f5d8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -70,7 +71,7 @@ public class PutCcrRestoreSessionAction extends Action