diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index c1d0541d4ce..07f805eb537 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -173,6 +173,7 @@ import org.elasticsearch.action.main.MainAction; import org.elasticsearch.action.main.TransportMainAction; import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.MultiSearchAction; +import org.elasticsearch.action.search.RemoteClusterService; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.TransportClearScrollAction; @@ -235,6 +236,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestNodesStatsAction; import org.elasticsearch.rest.action.admin.cluster.RestPendingClusterTasksAction; import org.elasticsearch.rest.action.admin.cluster.RestPutRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestPutStoredScriptAction; +import org.elasticsearch.rest.action.admin.cluster.RestRemoteClusterInfoAction; import org.elasticsearch.rest.action.admin.cluster.RestRestoreSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.RestSnapshotsStatusAction; import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction; @@ -500,7 +502,7 @@ public class ActionModule extends AbstractModule { return unmodifiableList(actionPlugins.stream().flatMap(p -> p.getActionFilters().stream()).collect(Collectors.toList())); } - public void initRestHandlers(Supplier nodesInCluster) { + public void initRestHandlers(Supplier nodesInCluster, RemoteClusterService remoteClusterService) { List catActions = new ArrayList<>(); Consumer registerHandler = a -> { if (a instanceof AbstractCatAction) { @@ -509,6 +511,7 @@ public class ActionModule extends AbstractModule { }; registerHandler.accept(new RestMainAction(settings, restController)); registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter)); + registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController, remoteClusterService)); registerHandler.accept(new RestNodesStatsAction(settings, restController)); registerHandler.accept(new RestNodesHotThreadsAction(settings, restController)); registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController)); diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index aea1aab7d3e..8e7b1d86997 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -23,6 +23,10 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; @@ -33,6 +37,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -54,8 +59,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -65,6 +72,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; +import java.util.stream.Collectors; /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the @@ -521,4 +529,70 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo return connectedNodes.contains(node); } + + /** + * Fetches connection info for this connection + */ + public void getConnectionInfo(ActionListener listener) { + final Optional anyNode = connectedNodes.stream().findAny(); + if (anyNode.isPresent() == false) { + // not connected we return immediately + RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias, + Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0, + RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings)); + listener.onResponse(remoteConnectionStats); + } else { + NodesInfoRequest request = new NodesInfoRequest(); + request.clear(); + request.http(true); + + transportService.sendRequest(anyNode.get(), NodesInfoAction.NAME, request, new TransportResponseHandler() { + @Override + public NodesInfoResponse newInstance() { + return new NodesInfoResponse(); + } + + @Override + public void handleResponse(NodesInfoResponse response) { + Collection httpAddresses = new HashSet<>(); + for (NodeInfo info : response.getNodes()) { + if (connectedNodes.contains(info.getNode()) && info.getHttp() != null) { + httpAddresses.add(info.getHttp().getAddress().publishAddress()); + } + } + + if (httpAddresses.size() < maxNumRemoteConnections) { + // just in case non of the connected nodes have http enabled we get other http enabled nodes instead. + for (NodeInfo info : response.getNodes()) { + if (nodePredicate.test(info.getNode()) && info.getHttp() != null) { + httpAddresses.add(info.getHttp().getAddress().publishAddress()); + } + if (httpAddresses.size() == maxNumRemoteConnections) { + break; // once we have enough return... + } + } + } + RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias, + seedNodes.stream().map(n -> n.getAddress()).collect(Collectors.toSet()), httpAddresses, maxNumRemoteConnections, + connectedNodes.size(), RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings)); + listener.onResponse(remoteConnectionInfo); + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + + } + + int getNumNodesConnected() { + return connectedNodes.size(); + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java index 089ce57a114..bf60b9519fe 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java @@ -24,9 +24,9 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.Booleans; @@ -51,6 +51,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -413,4 +414,17 @@ public final class RemoteClusterService extends AbstractComponent implements Clo public void close() throws IOException { IOUtils.close(remoteClusters.values()); } + + public void getRemoteConnectionInfos(ActionListener> listener) { + final Map remoteClusters = this.remoteClusters; + if (remoteClusters.isEmpty()) { + listener.onResponse(Collections.emptyList()); + } else { + final GroupedActionListener actionListener = new GroupedActionListener<>(listener, + remoteClusters.size(), Collections.emptyList()); + for (RemoteClusterConnection connection : remoteClusters.values()) { + connection.getConnectionInfo(actionListener); + } + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteConnectionInfo.java b/core/src/main/java/org/elasticsearch/action/search/RemoteConnectionInfo.java new file mode 100644 index 00000000000..da724f376b3 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteConnectionInfo.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collection; + +/** + * This class encapsulates all remote cluster information to be rendered on + * _remote/info requests. + */ +public final class RemoteConnectionInfo implements ToXContent { + final Collection seedNodes; + final Collection httpAddresses; + final int connectionsPerCluster; + final TimeValue initialConnectionTimeout; + final int numNodesConnected; + final String clusterAlias; + + RemoteConnectionInfo(String clusterAlias, Collection seedNodes, + Collection httpAddresses, + int connectionsPerCluster, int numNodesConnected, + TimeValue initialConnectionTimeout) { + this.clusterAlias = clusterAlias; + this.seedNodes = seedNodes; + this.httpAddresses = httpAddresses; + this.connectionsPerCluster = connectionsPerCluster; + this.numNodesConnected = numNodesConnected; + this.initialConnectionTimeout = initialConnectionTimeout; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(clusterAlias); + { + builder.startArray("seeds"); + for (TransportAddress addr : seedNodes) { + builder.value(addr.toString()); + } + builder.endArray(); + builder.startArray("http_addresses"); + for (TransportAddress addr : httpAddresses) { + builder.value(addr.toString()); + } + builder.endArray(); + builder.field("connected", numNodesConnected > 0); + builder.field("num_nodes_connected", numNodesConnected); + builder.field("max_connections_per_cluster", connectionsPerCluster); + builder.field("initial_connect_timeout", initialConnectionTimeout); + } + builder.endObject(); + return builder; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java b/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java new file mode 100644 index 00000000000..85b418e046c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.CountDown; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An action listener that delegates it's results to another listener once + * it has received one or more failures or N results. This allows synchronous + * tasks to be forked off in a loop with the same listener and respond to a + * higher level listener once all tasks responded. + */ +public final class GroupedActionListener implements ActionListener { + private final CountDown countDown; + private final AtomicInteger pos = new AtomicInteger(); + private final AtomicArray roles; + private final ActionListener> delegate; + private final Collection defaults; + private final AtomicReference failure = new AtomicReference<>(); + + /** + * Creates a new listener + * @param delegate the delegate listener + * @param groupSize the group size + */ + public GroupedActionListener(ActionListener> delegate, int groupSize, + Collection defaults) { + roles = new AtomicArray<>(groupSize); + countDown = new CountDown(groupSize); + this.delegate = delegate; + this.defaults = defaults; + } + + @Override + public void onResponse(T element) { + roles.set(pos.incrementAndGet() - 1, element); + if (countDown.countDown()) { + if (failure.get() != null) { + delegate.onFailure(failure.get()); + } else { + List collect = this.roles.asList(); + collect.addAll(defaults); + delegate.onResponse(Collections.unmodifiableList(collect)); + } + } + } + + @Override + public void onFailure(Exception e) { + if (failure.compareAndSet(null, e) == false) { + failure.get().addSuppressed(e); + } + if (countDown.countDown()) { + delegate.onFailure(failure.get()); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 00e00b745a0..dfcfe6b0e59 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -406,6 +406,8 @@ public class Node implements Closeable { final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); + final SearchTransportService searchTransportService = new SearchTransportService(settings, + settingsModule.getClusterSettings(), transportService); final Consumer httpBind; final HttpServerTransport httpServerTransport; if (networkModule.isHttpEnabled()) { @@ -447,8 +449,7 @@ public class Node implements Closeable { b.bind(IndicesService.class).toInstance(indicesService); b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase())); - b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings, - settingsModule.getClusterSettings(), transportService)); + b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays, scriptModule.getScriptService())); b.bind(Transport.class).toInstance(transport); @@ -485,7 +486,7 @@ public class Node implements Closeable { if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); - actionModule.initRestHandlers(() -> clusterService.state().nodes()); + actionModule.initRestHandlers(() -> clusterService.state().nodes(), searchTransportService.getRemoteClusterService()); } logger.info("initialized"); diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestRemoteClusterInfoAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestRemoteClusterInfoAction.java new file mode 100644 index 00000000000..c116b44a531 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestRemoteClusterInfoAction.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster; + +import org.elasticsearch.action.search.RemoteClusterService; +import org.elasticsearch.action.search.RemoteConnectionInfo; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.Collection; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public final class RestRemoteClusterInfoAction extends BaseRestHandler { + + private final RemoteClusterService remoteClusterService; + + public RestRemoteClusterInfoAction(Settings settings, RestController controller, + RemoteClusterService remoteClusterService) { + super(settings); + controller.registerHandler(GET, "_remote/info", this); + this.remoteClusterService = remoteClusterService; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) + throws IOException { + return channel -> remoteClusterService.getRemoteConnectionInfos( + new RestResponseListener>(channel) { + @Override + public RestResponse buildResponse( + Collection remoteConnectionInfos) throws Exception { + try (XContentBuilder xContentBuilder = channel.newBuilder()) { + xContentBuilder.startObject(); + for (RemoteConnectionInfo info : remoteConnectionInfos) { + info.toXContent(xContentBuilder, request); + } + xContentBuilder.endObject(); + return new BytesRestResponse(RestStatus.OK, xContentBuilder); + } + } + }); + } + @Override + public boolean canTripCircuitBreaker() { + return false; + } +} diff --git a/core/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/core/src/test/java/org/elasticsearch/action/ActionModuleTests.java index 7c9d5443464..ddd31c342e3 100644 --- a/core/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/core/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -113,7 +113,7 @@ public class ActionModuleTests extends ESTestCase { ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY), settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), null, emptyList(), null, null); - actionModule.initRestHandlers(null); + actionModule.initRestHandlers(null, null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.getRestController().registerHandler(Method.GET, "/", null)); @@ -135,7 +135,7 @@ public class ActionModuleTests extends ESTestCase { ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY), settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool, singletonList(dupsMainAction), null, null); - Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null)); assertThat(e.getMessage(), startsWith("Path [/] already has a value [" + RestMainAction.class.getName())); } finally { threadPool.shutdown(); @@ -166,7 +166,7 @@ public class ActionModuleTests extends ESTestCase { ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY), settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool, singletonList(registersFakeHandler), null, null); - actionModule.initRestHandlers(null); + actionModule.initRestHandlers(null, null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.getRestController().registerHandler(Method.GET, "/_dummy", null)); diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java index 15c735cafa6..8755190a534 100644 --- a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java @@ -19,8 +19,13 @@ package org.elasticsearch.action.search; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; @@ -34,24 +39,28 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; -import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.http.HttpInfo; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportConnectionListener; +import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; -import java.net.UnknownHostException; -import java.nio.channels.AlreadyConnectedException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -519,4 +528,121 @@ public class RemoteClusterConnectionTests extends ESTestCase { } } } + + private static void installNodeStatsHandler(TransportService service, DiscoveryNode...nodes) { + service.registerRequestHandler(NodesInfoAction.NAME, NodesInfoRequest::new, ThreadPool.Names.SAME, false, false, + (request, channel) -> { + List nodeInfos = new ArrayList<>(); + int port = 80; + for (DiscoveryNode node : nodes) { + HttpInfo http = new HttpInfo(new BoundTransportAddress(new TransportAddress[]{node.getAddress()}, + new TransportAddress(node.getAddress().address().getAddress(), port++)), 100); + nodeInfos.add(new NodeInfo(node.getVersion(), Build.CURRENT, node, null, null, null, null, null, null, http, null, + null, null)); + } + channel.sendResponse(new NodesInfoResponse(ClusterName.DEFAULT, nodeInfos, Collections.emptyList())); + }); + + } + + public void testGetConnectionInfo() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService transport1 = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService transport2 = startTransport("seed_node_1", knownNodes, Version.CURRENT); + MockTransportService transport3 = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode node1 = transport1.getLocalDiscoNode(); + DiscoveryNode node2 = transport3.getLocalDiscoNode(); + DiscoveryNode node3 = transport2.getLocalDiscoNode(); + knownNodes.add(transport1.getLocalDiscoNode()); + knownNodes.add(transport3.getLocalDiscoNode()); + knownNodes.add(transport2.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + List seedNodes = Arrays.asList(node3, node1, node2); + Collections.shuffle(seedNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + int maxNumConnections = randomIntBetween(1, 5); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + seedNodes, service, maxNumConnections, n -> true)) { + // test no nodes connected + RemoteConnectionInfo remoteConnectionInfo = getRemoteConnectionInfo(connection); + assertNotNull(remoteConnectionInfo); + assertEquals(0, remoteConnectionInfo.numNodesConnected); + assertEquals(0, remoteConnectionInfo.seedNodes.size()); + assertEquals(0, remoteConnectionInfo.httpAddresses.size()); + assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); + assertEquals("test-cluster", remoteConnectionInfo.clusterAlias); + updateSeedNodes(connection, seedNodes); + expectThrows(RemoteTransportException.class, () -> getRemoteConnectionInfo(connection)); + + for (MockTransportService s : Arrays.asList(transport1, transport2, transport3)) { + installNodeStatsHandler(s, node1, node2, node3); + } + + remoteConnectionInfo = getRemoteConnectionInfo(connection); + assertNotNull(remoteConnectionInfo); + assertEquals(connection.getNumNodesConnected(), remoteConnectionInfo.numNodesConnected); + assertEquals(Math.min(3, maxNumConnections), connection.getNumNodesConnected()); + assertEquals(3, remoteConnectionInfo.seedNodes.size()); + assertEquals(remoteConnectionInfo.httpAddresses.size(), Math.min(3, maxNumConnections)); + assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); + assertEquals("test-cluster", remoteConnectionInfo.clusterAlias); + for (TransportAddress address : remoteConnectionInfo.httpAddresses) { + assertTrue("port range mismatch: " + address.getPort(), address.getPort() >= 80 && address.getPort() <= 90); + } + } + } + } + } + + public void testRenderConnectionInfoXContent() throws IOException { + RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80)), + 4, 3, TimeValue.timeValueMinutes(30)); + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + stats.toXContent(builder, null); + builder.endObject(); + assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," + + "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"}}", builder.string()); + + stats = new RemoteConnectionInfo("some_other_cluster", + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80), new TransportAddress(TransportAddress.META_ADDRESS,81)), + 2, 0, TimeValue.timeValueSeconds(30)); + builder = XContentFactory.jsonBuilder(); + builder.startObject(); + stats.toXContent(builder, null); + builder.endObject(); + assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"]," + + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"}}", + builder.string()); + } + + private RemoteConnectionInfo getRemoteConnectionInfo(RemoteClusterConnection connection) throws Exception { + AtomicReference statsRef = new AtomicReference<>(); + AtomicReference exceptionRef = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + connection.getConnectionInfo(new ActionListener() { + @Override + public void onResponse(RemoteConnectionInfo remoteConnectionInfo) { + statsRef.set(remoteConnectionInfo); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + latch.await(); + if (exceptionRef.get() != null) { + throw exceptionRef.get(); + } + return statsRef.get(); + } } diff --git a/core/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java b/core/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java new file mode 100644 index 00000000000..2af2da7ba09 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java @@ -0,0 +1,124 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class GroupedActionListenerTests extends ESTestCase { + + public void testNotifications() throws InterruptedException { + AtomicReference> resRef = new AtomicReference<>(); + ActionListener> result = new ActionListener>() { + @Override + public void onResponse(Collection integers) { + resRef.set(integers); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }; + final int groupSize = randomIntBetween(10, 1000); + AtomicInteger count = new AtomicInteger(); + Collection defaults = randomBoolean() ? Collections.singletonList(-1) : + Collections.emptyList(); + GroupedActionListener listener = new GroupedActionListener<>(result, groupSize, + defaults); + int numThreads = randomIntBetween(2, 5); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new AssertionError(e); + } + int c = 0; + while((c = count.incrementAndGet()) <= groupSize) { + listener.onResponse(c-1); + } + } + }; + threads[i].start(); + } + for (Thread t : threads) { + t.join(); + } + assertNotNull(resRef.get()); + ArrayList list = new ArrayList<>(resRef.get()); + Collections.sort(list); + int expectedSize = groupSize + defaults.size(); + assertEquals(expectedSize, resRef.get().size()); + int expectedValue = defaults.isEmpty() ? 0 : -1; + for (int i = 0; i < expectedSize; i++) { + assertEquals(Integer.valueOf(expectedValue++), list.get(i)); + } + } + + public void testFailed() { + AtomicReference> resRef = new AtomicReference<>(); + AtomicReference excRef = new AtomicReference<>(); + + ActionListener> result = new ActionListener>() { + @Override + public void onResponse(Collection integers) { + resRef.set(integers); + } + + @Override + public void onFailure(Exception e) { + excRef.set(e); + } + }; + Collection defaults = randomBoolean() ? Collections.singletonList(-1) : + Collections.emptyList(); + int size = randomIntBetween(3, 4); + GroupedActionListener listener = new GroupedActionListener<>(result, size, + defaults); + listener.onResponse(0); + IOException ioException = new IOException(); + RuntimeException rtException = new RuntimeException(); + listener.onFailure(rtException); + listener.onFailure(ioException); + if (size == 4) { + listener.onResponse(2); + } + assertNotNull(excRef.get()); + assertEquals(rtException, excRef.get()); + assertEquals(1, excRef.get().getSuppressed().length); + assertEquals(ioException, excRef.get().getSuppressed()[0]); + assertNull(resRef.get()); + listener.onResponse(1); + assertNull(resRef.get()); + } +} diff --git a/docs/reference/cluster/remote-info.asciidoc b/docs/reference/cluster/remote-info.asciidoc new file mode 100644 index 00000000000..304e7c8de0b --- /dev/null +++ b/docs/reference/cluster/remote-info.asciidoc @@ -0,0 +1,35 @@ +[[cluster-remote-info]] +== Remote Cluster Info + +The cluster remote info API allows to retrieve all of the configured +remote cluster information. + +[source,js] +---------------------------------- +GET /_remote/info +---------------------------------- +// CONSOLE + +This command returns returns connection and endpoint information keyed by +the configured remote cluster alias. + +[float] +[[connection-info]] + +`seeds`:: + The configured initial seed transport addresses of the remote cluster. + +`http_addresses`:: + The published http addresses of all connected remote nodes. + +`connected`:: + True if there is at least one connection to the remote cluster. + +`num_nodes_connected`:: + The number of connected nodes in the remote cluster. + +`max_connection_per_cluster`:: + The maximum number of connections maintained for the remote cluster. + +`initial_connect_timeout`:: + The initial connect timeout for remote cluster connections. diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yaml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yaml new file mode 100644 index 00000000000..3dc74f22a34 --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yaml @@ -0,0 +1,48 @@ +--- +"Fetch remote cluster info for existing cluster": + + - do: + remote.info: {} + - match: { my_remote_cluster.connected: true } + - match: { my_remote_cluster.num_nodes_connected: 1} + - match: { my_remote_cluster.max_connections_per_cluster: 1} + - match: { my_remote_cluster.initial_connect_timeout: "30s" } + - is_true: my_remote_cluster.http_addresses.0 + +--- +"Add transient remote cluster based on the preset cluster and check remote info": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + search.remote.test_remote_cluster.seeds: $remote_ip + + - match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}} + + - do: + remote.info: {} + - set: { my_remote_cluster.http_addresses.0: remote_http } + - match: { test_remote_cluster.http_addresses.0: $remote_http } + + - match: { test_remote_cluster.connected: true } + - match: { my_remote_cluster.connected: true } + + - match: { test_remote_cluster.seeds.0: $remote_ip } + - match: { my_remote_cluster.seeds.0: $remote_ip } + + - match: { my_remote_cluster.num_nodes_connected: 1} + - match: { test_remote_cluster.num_nodes_connected: 1} + + - match: { my_remote_cluster.max_connections_per_cluster: 1} + - match: { test_remote_cluster.max_connections_per_cluster: 1} + + - match: { my_remote_cluster.initial_connect_timeout: "30s" } + - match: { test_remote_cluster.initial_connect_timeout: "30s" } + diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/remote.info.json b/rest-api-spec/src/main/resources/rest-api-spec/api/remote.info.json new file mode 100644 index 00000000000..a90d4ff6984 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/remote.info.json @@ -0,0 +1,12 @@ +{ + "remote.info": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/remote-info.html", + "methods": ["GET"], + "url": { + "path": "/_remote/info", + "paths": ["/_remote/info"], + "params": {} + }, + "body": null + } +} \ No newline at end of file diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/remote.info/10_info.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/remote.info/10_info.yaml new file mode 100644 index 00000000000..a3cfeb9e598 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/remote.info/10_info.yaml @@ -0,0 +1,9 @@ +--- +"Get an empty emote info": + - skip: + version: " - 5.99.99" + reason: this API doesn't exist in 5.x yet + - do: + remote.info: {} + - is_true: '' +