Add remote cluster client (#29495)

This change adds a client that is connected to a remote cluster.
This allows plugins and internal structures to invoke actions on
remote clusters just like a if it's a local cluster. The remote
cluster must be configured via the cross cluster search infrastructure.
This commit is contained in:
Simon Willnauer 2018-04-13 15:23:44 +02:00 committed by GitHub
parent eab530ce11
commit 694e2a9970
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 204 additions and 3 deletions

View File

@ -477,4 +477,14 @@ public interface Client extends ElasticsearchClient, Releasable {
* issued from it.
*/
Client filterWithHeader(Map<String, String> headers);
/**
* Returns a client to a remote cluster with the given cluster alias.
*
* @throws IllegalArgumentException if the given clusterAlias doesn't exist
* @throws UnsupportedOperationException if this functionality is not available on this client.
*/
default Client getRemoteClusterClient(String clusterAlias) {
throw new UnsupportedOperationException("this client doesn't support remote cluster connections");
}
}

View File

@ -73,4 +73,9 @@ public abstract class FilterClient extends AbstractClient {
protected Client in() {
return in;
}
@Override
public Client getRemoteClusterClient(String clusterAlias) {
return in.getRemoteClusterClient(clusterAlias);
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import java.util.Map;
import java.util.function.Supplier;
@ -48,14 +49,17 @@ public class NodeClient extends AbstractClient {
* {@link #executeLocally(GenericAction, ActionRequest, TaskListener)}.
*/
private Supplier<String> localNodeId;
private RemoteClusterService remoteClusterService;
public NodeClient(Settings settings, ThreadPool threadPool) {
super(settings, threadPool);
}
public void initialize(Map<GenericAction, TransportAction> actions, Supplier<String> localNodeId) {
public void initialize(Map<GenericAction, TransportAction> actions, Supplier<String> localNodeId,
RemoteClusterService remoteClusterService) {
this.actions = actions;
this.localNodeId = localNodeId;
this.remoteClusterService = remoteClusterService;
}
@Override
@ -117,4 +121,9 @@ public class NodeClient extends AbstractClient {
}
return transportAction;
}
@Override
public Client getRemoteClusterClient(String clusterAlias) {
return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias);
}
}

View File

@ -538,7 +538,7 @@ public class Node implements Closeable {
resourcesToClose.addAll(pluginLifecycleComponents);
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
() -> clusterService.localNode().getId());
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug("initializing HTTP handlers ...");

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
final class RemoteClusterAwareClient extends AbstractClient {
private final TransportService service;
private final String clusterAlias;
private final RemoteClusterService remoteClusterService;
RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias) {
super(settings, threadPool);
this.service = service;
this.clusterAlias = clusterAlias;
this.remoteClusterService = service.getRemoteClusterService();
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>>
void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, action::newResponse));
},
listener::onFailure));
}
@Override
public void close() {
// do nothing
}
@Override
public Client getRemoteClusterClient(String clusterAlias) {
return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias);
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -36,6 +37,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
@ -398,4 +400,18 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
});
}
}
/**
* Returns a client to the remote cluster if the given cluster alias exists.
* @param threadPool the {@link ThreadPool} for the client
* @param clusterAlias the cluster alias the remote cluster is registered under
*
* @throws IllegalArgumentException if the given clusterAlias doesn't exist
*/
public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) {
if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) {
throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]");
}
return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias);
}
}

View File

@ -43,7 +43,7 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTestCase {
Settings settings = HEADER_SETTINGS;
Actions actions = new Actions(settings, threadPool, testedActions);
NodeClient client = new NodeClient(settings, threadPool);
client.initialize(actions, () -> "test");
client.initialize(actions, () -> "test", null);
return client;
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport;
public class RemoteClusterClientTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
public void testConnectAndExecuteRequest() throws Exception {
Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build();
try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool,
remoteSettings)) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
// also test a failure, there is no handler for search registered
ActionNotFoundTransportException ex = expectThrows(ActionNotFoundTransportException.class,
() -> client.prepareSearch().get());
assertEquals("No handler for action [indices:data/read/search]", ex.getMessage());
}
}
}
public void testEnsureWeReconnect() throws Exception {
Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build();
try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool,
remoteSettings)) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
service.disconnectFromNode(remoteNode);
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertBusy(() -> assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode)));
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
}
}
}
}