From 694e2a997003c3323dfdd79c1465c3525c79df8c Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 13 Apr 2018 15:23:44 +0200 Subject: [PATCH] Add remote cluster client (#29495) This change adds a client that is connected to a remote cluster. This allows plugins and internal structures to invoke actions on remote clusters just like a if it's a local cluster. The remote cluster must be configured via the cross cluster search infrastructure. --- .../java/org/elasticsearch/client/Client.java | 10 ++ .../elasticsearch/client/FilterClient.java | 5 + .../elasticsearch/client/node/NodeClient.java | 11 ++- .../java/org/elasticsearch/node/Node.java | 2 +- .../transport/RemoteClusterAwareClient.java | 67 +++++++++++++ .../transport/RemoteClusterService.java | 16 ++++ .../client/node/NodeClientHeadersTests.java | 2 +- .../transport/RemoteClusterClientTests.java | 94 +++++++++++++++++++ 8 files changed, 204 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java create mode 100644 server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java diff --git a/server/src/main/java/org/elasticsearch/client/Client.java b/server/src/main/java/org/elasticsearch/client/Client.java index 2c61653f61c..adb2f509b99 100644 --- a/server/src/main/java/org/elasticsearch/client/Client.java +++ b/server/src/main/java/org/elasticsearch/client/Client.java @@ -477,4 +477,14 @@ public interface Client extends ElasticsearchClient, Releasable { * issued from it. */ Client filterWithHeader(Map headers); + + /** + * Returns a client to a remote cluster with the given cluster alias. + * + * @throws IllegalArgumentException if the given clusterAlias doesn't exist + * @throws UnsupportedOperationException if this functionality is not available on this client. + */ + default Client getRemoteClusterClient(String clusterAlias) { + throw new UnsupportedOperationException("this client doesn't support remote cluster connections"); + } } diff --git a/server/src/main/java/org/elasticsearch/client/FilterClient.java b/server/src/main/java/org/elasticsearch/client/FilterClient.java index 23d3c2c3d0c..92f6817b74b 100644 --- a/server/src/main/java/org/elasticsearch/client/FilterClient.java +++ b/server/src/main/java/org/elasticsearch/client/FilterClient.java @@ -73,4 +73,9 @@ public abstract class FilterClient extends AbstractClient { protected Client in() { return in; } + + @Override + public Client getRemoteClusterClient(String clusterAlias) { + return in.getRemoteClusterClient(clusterAlias); + } } diff --git a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java index e4f26b15702..69bf5d21f7a 100644 --- a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskListener; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import java.util.Map; import java.util.function.Supplier; @@ -48,14 +49,17 @@ public class NodeClient extends AbstractClient { * {@link #executeLocally(GenericAction, ActionRequest, TaskListener)}. */ private Supplier localNodeId; + private RemoteClusterService remoteClusterService; public NodeClient(Settings settings, ThreadPool threadPool) { super(settings, threadPool); } - public void initialize(Map actions, Supplier localNodeId) { + public void initialize(Map actions, Supplier localNodeId, + RemoteClusterService remoteClusterService) { this.actions = actions; this.localNodeId = localNodeId; + this.remoteClusterService = remoteClusterService; } @Override @@ -117,4 +121,9 @@ public class NodeClient extends AbstractClient { } return transportAction; } + + @Override + public Client getRemoteClusterClient(String clusterAlias) { + return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias); + } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 9fa886af8bc..b02e1614bbd 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -538,7 +538,7 @@ public class Node implements Closeable { resourcesToClose.addAll(pluginLifecycleComponents); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); client.initialize(injector.getInstance(new Key>() {}), - () -> clusterService.localNode().getId()); + () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java new file mode 100644 index 00000000000..aa476bf4dd2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; + +final class RemoteClusterAwareClient extends AbstractClient { + + private final TransportService service; + private final String clusterAlias; + private final RemoteClusterService remoteClusterService; + + RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias) { + super(settings, threadPool); + this.service = service; + this.clusterAlias = clusterAlias; + this.remoteClusterService = service.getRemoteClusterService(); + } + + @Override + protected > + void doExecute(Action action, Request request, ActionListener listener) { + remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { + Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); + service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, action::newResponse)); + }, + listener::onFailure)); + } + + + @Override + public void close() { + // do nothing + } + + @Override + public Client getRemoteClusterClient(String clusterAlias) { + return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index b253d9d23df..f4545713017 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.client.Client; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -36,6 +37,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -398,4 +400,18 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl }); } } + + /** + * Returns a client to the remote cluster if the given cluster alias exists. + * @param threadPool the {@link ThreadPool} for the client + * @param clusterAlias the cluster alias the remote cluster is registered under + * + * @throws IllegalArgumentException if the given clusterAlias doesn't exist + */ + public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { + if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) { + throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]"); + } + return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias); + } } diff --git a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java index bca04738d8b..5e739cc3250 100644 --- a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java +++ b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java @@ -43,7 +43,7 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTestCase { Settings settings = HEADER_SETTINGS; Actions actions = new Actions(settings, threadPool, testedActions); NodeClient client = new NodeClient(settings, threadPool); - client.initialize(actions, () -> "test"); + client.initialize(actions, () -> "test", null); return client; } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java new file mode 100644 index 00000000000..6008b7900a0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport; + +public class RemoteClusterClientTests extends ESTestCase { + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testConnectAndExecuteRequest() throws Exception { + Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); + try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool, + remoteSettings)) { + DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode(); + + Settings localSettings = Settings.builder() + .put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true) + .put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build(); + try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); + assertNotNull(clusterStateResponse); + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + // also test a failure, there is no handler for search registered + ActionNotFoundTransportException ex = expectThrows(ActionNotFoundTransportException.class, + () -> client.prepareSearch().get()); + assertEquals("No handler for action [indices:data/read/search]", ex.getMessage()); + } + } + } + + public void testEnsureWeReconnect() throws Exception { + Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); + try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool, + remoteSettings)) { + DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode(); + Settings localSettings = Settings.builder() + .put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true) + .put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build(); + try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + service.disconnectFromNode(remoteNode); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + assertBusy(() -> assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode))); + Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); + assertNotNull(clusterStateResponse); + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + } + } + } + +}