diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index ef26867600e..f9fafa9f95a 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -19,8 +19,11 @@ package org.elasticsearch.action; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedConsumer; +import java.util.ArrayList; +import java.util.List; import java.util.function.Consumer; /** @@ -65,4 +68,41 @@ public interface ActionListener { } }; } + + /** + * Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception + * the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining + * listeners will be processed and the caught exception will be re-thrown. + */ + static void onResponse(Iterable> listeners, Response response) { + List exceptionList = new ArrayList<>(); + for (ActionListener listener : listeners) { + try { + listener.onResponse(response); + } catch (Exception ex) { + try { + listener.onFailure(ex); + } catch (Exception ex1) { + exceptionList.add(ex1); + } + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } + + /** + * Notifies every given listener with the failure passed to {@link #onFailure(Exception)}. If a listener itself throws an exception + * all remaining listeners will be processed and the caught exception will be re-thrown. + */ + static void onFailure(Iterable> listeners, Exception failure) { + List exceptionList = new ArrayList<>(); + for (ActionListener listener : listeners) { + try { + listener.onFailure(failure); + } catch (Exception ex) { + exceptionList.add(ex); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsGroup.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsGroup.java index 473d31754eb..79a014ebda7 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsGroup.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsGroup.java @@ -38,7 +38,7 @@ public class ClusterSearchShardsGroup implements Streamable, ToXContent { } - ClusterSearchShardsGroup(ShardId shardId, ShardRouting[] shards) { + public ClusterSearchShardsGroup(ShardId shardId, ShardRouting[] shards) { this.shardId = shardId; this.shards = shards; } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index b1b55adaa63..3ee5f56ebdc 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -42,7 +42,8 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo } - ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, Map indicesAndFilters) { + public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, + Map indicesAndFilters) { this.groups = groups; this.nodes = nodes; this.indicesAndFilters = indicesAndFilters; diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 6e9d762da8f..527f400a682 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -41,6 +41,8 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.Transport; import java.util.List; import java.util.Map; @@ -59,7 +61,7 @@ abstract class AbstractSearchAsyncAction private final GroupShardsIterator shardsIts; protected final SearchRequest request; /** Used by subclasses to resolve node ids to DiscoveryNodes. **/ - protected final Function nodeIdToDiscoveryNode; + protected final Function nodeIdToConnection; protected final SearchTask task; protected final int expectedSuccessfulOps; private final int expectedTotalOps; @@ -74,7 +76,7 @@ abstract class AbstractSearchAsyncAction protected volatile ScoreDoc[] sortedShardDocs; protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService, - Function nodeIdToDiscoveryNode, + Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { @@ -85,7 +87,7 @@ abstract class AbstractSearchAsyncAction this.request = request; this.task = task; this.listener = listener; - this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode; + this.nodeIdToConnection = nodeIdToConnection; this.clusterStateVersion = clusterStateVersion; this.shardsIts = shardsIts; expectedSuccessfulOps = shardsIts.size(); @@ -119,30 +121,34 @@ abstract class AbstractSearchAsyncAction void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { if (shard == null) { + // TODO upgrade this to an assert... // no more active shards... (we should not really get here, but just for safety) onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } else { - final DiscoveryNode node = nodeIdToDiscoveryNode.apply(shard.currentNodeId()); - if (node == null) { - onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } else { + try { + final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId()); AliasFilter filter = this.aliasFilter.get(shard.index().getUUID()); assert filter != null; float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST); ShardSearchTransportRequest transportRequest = new ShardSearchTransportRequest(request, shardIt.shardId(), shardsIts.size(), filter, indexBoost, startTime()); - sendExecuteFirstPhase(node, transportRequest , new ActionListener() { - @Override - public void onResponse(FirstResult result) { - onFirstPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt); - } + sendExecuteFirstPhase(connection, transportRequest, new ActionListener() { + @Override + public void onResponse(FirstResult result) { + onFirstPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt); + } - @Override - public void onFailure(Exception t) { - onFirstPhaseResult(shardIndex, shard, node.getId(), shardIt, t); - } - }); + @Override + public void onFailure(Exception t) { + onFirstPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t); + } + }); + } catch (ConnectTransportException | IllegalArgumentException ex) { + // we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to + // the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected + // at all which is not not needed anymore. + onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, ex); } } } @@ -292,8 +298,8 @@ abstract class AbstractSearchAsyncAction private void raiseEarlyFailure(Exception e) { for (AtomicArray.Entry entry : firstResults.asList()) { try { - DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.shardTarget().getNodeId()); - sendReleaseSearchContext(entry.value.id(), node); + Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId()); + sendReleaseSearchContext(entry.value.id(), connection); } catch (Exception inner) { inner.addSuppressed(e); logger.trace("failed to release context", inner); @@ -317,8 +323,8 @@ abstract class AbstractSearchAsyncAction if (queryResult.hasHits() && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs try { - DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.queryResult().shardTarget().getNodeId()); - sendReleaseSearchContext(entry.value.queryResult().id(), node); + Transport.Connection connection = nodeIdToConnection.apply(entry.value.queryResult().shardTarget().getNodeId()); + sendReleaseSearchContext(entry.value.queryResult().id(), connection); } catch (Exception e) { logger.trace("failed to release context", e); } @@ -327,9 +333,9 @@ abstract class AbstractSearchAsyncAction } } - protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { - if (node != null) { - searchTransportService.sendFreeContext(node, contextId, request); + protected void sendReleaseSearchContext(long contextId, Transport.Connection connection) { + if (connection != null) { + searchTransportService.sendFreeContext(connection, contextId, request); } } @@ -339,7 +345,7 @@ abstract class AbstractSearchAsyncAction return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); } - protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + protected abstract void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request, ActionListener listener); protected final void processFirstPhaseResult(int shardIndex, FirstResult result) { diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java new file mode 100644 index 00000000000..cce03cc4ed4 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -0,0 +1,502 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.TcpTransport; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportActionProxy; +import org.elasticsearch.transport.TransportConnectionListener; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; + +/** + * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the + * current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not + * fully connected with the current node. From a connection perspective a local cluster forms a bi-directional star network while in the + * remote case we only connect to a subset of the nodes in the cluster in an uni-directional fashion. + * + * This class also handles the discovery of nodes from the remote cluster. The initial list of seed nodes is only used to discover all nodes + * in the remote cluster and connects to all eligible nodes, for details see {@link RemoteClusterService#REMOTE_NODE_ATTRIBUTE}. + * + * In the case of a disconnection, this class will issue a re-connect task to establish at most + * {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of + * connections per cluster has been reached. + */ +final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable { + + private final TransportService transportService; + private final ConnectionProfile remoteProfile; + private final Set connectedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Supplier nodeSupplier; + private final String clusterAlias; + private final int maxNumRemoteConnections; + private final Predicate nodePredicate; + private volatile List seedNodes; + private final ConnectHandler connectHandler; + + /** + * Creates a new {@link RemoteClusterConnection} + * @param settings the nodes settings object + * @param clusterAlias the configured alias of the cluster to connect to + * @param seedNodes a list of seed nodes to discover eligible nodes from + * @param transportService the local nodes transport service + * @param maxNumRemoteConnections the maximum number of connections to the remote cluster + * @param nodePredicate a predicate to filter eligible remote nodes to connect to + */ + RemoteClusterConnection(Settings settings, String clusterAlias, List seedNodes, + TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate) { + super(settings); + this.transportService = transportService; + this.maxNumRemoteConnections = maxNumRemoteConnections; + this.nodePredicate = nodePredicate; + this.clusterAlias = clusterAlias; + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings)); + builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable? + builder.addConnections(0, // we don't want this to be used for anything else but search + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.STATE, + TransportRequestOptions.Type.RECOVERY); + remoteProfile = builder.build(); + nodeSupplier = new Supplier() { + private volatile Iterator current; + @Override + public DiscoveryNode get() { + if (current == null || current.hasNext() == false) { + current = connectedNodes.iterator(); + if (current.hasNext() == false) { + throw new IllegalStateException("No node available for cluster: " + clusterAlias + " nodes: " + connectedNodes); + } + } + return current.next(); + } + }; + this.seedNodes = Collections.unmodifiableList(seedNodes); + this.connectHandler = new ConnectHandler(); + transportService.addConnectionListener(this); + } + + /** + * Updates the list of seed nodes for this cluster connection + */ + synchronized void updateSeedNodes(List seedNodes, ActionListener connectListener) { + this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes)); + connectHandler.connect(connectListener); + } + + @Override + public void onNodeDisconnected(DiscoveryNode node) { + boolean remove = connectedNodes.remove(node); + if (remove && connectedNodes.size() < maxNumRemoteConnections) { + // try to reconnect and fill up the slot of the disconnected node + connectHandler.forceConnect(); + } + } + + /** + * Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end. + */ + public void fetchSearchShards(SearchRequest searchRequest, final List indices, + ActionListener listener) { + if (connectedNodes.isEmpty()) { + // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener + // this will cause some back pressure on the search end and eventually will cause rejections but that's fine + // we can't proceed with a search on a cluster level. + // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller + // end since they provide the listener. + connectHandler.connect(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, indices, listener), listener::onFailure)); + } else { + fetchShardsInternal(searchRequest, indices, listener); + } + } + + private void fetchShardsInternal(SearchRequest searchRequest, List indices, + final ActionListener listener) { + final DiscoveryNode node = nodeSupplier.get(); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()])) + .indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference()) + .routing(searchRequest.routing()); + transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest, + new TransportResponseHandler() { + + @Override + public ClusterSearchShardsResponse newInstance() { + return new ClusterSearchShardsResponse(); + } + + @Override + public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { + listener.onResponse(clusterSearchShardsResponse); + } + + @Override + public void handleException(TransportException e) { + listener.onFailure(e); + } + + @Override + public String executor() { + return ThreadPool.Names.SEARCH; + } + }); + } + + /** + * Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the + * given node. + */ + Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { + DiscoveryNode discoveryNode = nodeSupplier.get(); + Transport.Connection connection = transportService.getConnection(discoveryNode); + return new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return remoteClusterNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), + TransportActionProxy.wrapRequest(remoteClusterNode, request), options); + } + + @Override + public void close() throws IOException { + assert false: "proxy connections must not be closed"; + } + }; + } + + @Override + public void close() throws IOException { + connectHandler.close(); + } + + public boolean isClosed() { + return connectHandler.isClosed(); + } + + /** + * The connect handler manages node discovery and the actual connect to the remote cluster. + * There is at most one connect job running at any time. If such a connect job is triggered + * while another job is running the provided listeners are queued and batched up until the current running job returns. + * + * The handler has a built-in queue that can hold up to 100 connect attempts and will reject requests once the queue is full. + * In a scenario when a remote cluster becomes unavailable we will queue requests up but if we can't connect quick enough + * we will just reject the connect trigger which will lead to failing searches. + */ + private class ConnectHandler implements Closeable { + private final Semaphore running = new Semaphore(1); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final BlockingQueue> queue = new ArrayBlockingQueue<>(100); + private final CancellableThreads cancellableThreads = new CancellableThreads(); + + /** + * Triggers a connect round iff there are pending requests queued up and if there is no + * connect round currently running. + */ + void maybeConnect() { + connect(null); + } + + /** + * Triggers a connect round unless there is one running already. If there is a connect round running, the listener will either + * be queued or rejected and failed. + */ + void connect(ActionListener connectListener) { + connect(connectListener, false); + } + + /** + * Triggers a connect round unless there is one already running. In contrast to {@link #maybeConnect()} will this method also + * trigger a connect round if there is no listener queued up. + */ + void forceConnect() { + connect(null, true); + } + + private void connect(ActionListener connectListener, boolean forceRun) { + final boolean runConnect; + final Collection> toNotify; + synchronized (queue) { + if (connectListener != null && queue.offer(connectListener) == false) { + connectListener.onFailure(new RejectedExecutionException("connect queue is full")); + return; + } + if (forceRun == false && queue.isEmpty()) { + return; + } + runConnect = running.tryAcquire(); + if (runConnect) { + toNotify = new ArrayList<>(); + queue.drainTo(toNotify); + if (closed.get()) { + running.release(); + ActionListener.onFailure(toNotify, new AlreadyClosedException("connect handler is already closed")); + return; + } + } else { + toNotify = Collections.emptyList(); + } + } + if (runConnect) { + forkConnect(toNotify); + } + } + + private void forkConnect(final Collection> toNotify) { + ThreadPool threadPool = transportService.getThreadPool(); + ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT); + executor.submit(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + synchronized (queue) { + running.release(); + } + try { + ActionListener.onFailure(toNotify, e); + } finally { + maybeConnect(); + } + } + + @Override + protected void doRun() throws Exception { + ActionListener listener = ActionListener.wrap((x) -> { + synchronized (queue) { + running.release(); + } + try { + ActionListener.onResponse(toNotify, x); + } finally { + maybeConnect(); + } + + }, (e) -> { + synchronized (queue) { + running.release(); + } + try { + ActionListener.onFailure(toNotify, e); + } finally { + maybeConnect(); + } + }); + collectRemoteNodes(seedNodes.iterator(), transportService, listener); + } + }); + + } + + void collectRemoteNodes(Iterator seedNodes, + final TransportService transportService, ActionListener listener) { + if (Thread.currentThread().isInterrupted()) { + listener.onFailure(new InterruptedException("remote connect thread got interrupted")); + } + try { + if (seedNodes.hasNext()) { + cancellableThreads.executeIO(() -> { + final DiscoveryNode seedNode = seedNodes.next(); + final DiscoveryNode handshakeNode; + Transport.Connection connection = transportService.openConnection(seedNode, + ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); + boolean success = false; + try { + handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), + (c) -> true); + if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { + transportService.connectToNode(handshakeNode, remoteProfile); + connectedNodes.add(handshakeNode); + } + ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.nodes(true); + transportService.sendRequest(connection, + ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, + new SniffClusterStateResponseHandler(transportService, connection, listener, seedNodes, + cancellableThreads)); + success = true; + } finally { + if (success == false) { + connection.close(); + } + } + }); + } else { + listener.onFailure(new IllegalStateException("no seed node left")); + } + } catch (CancellableThreads.ExecutionCancelledException ex) { + listener.onFailure(ex); // we got canceled - fail the listener and step out + } catch (ConnectTransportException | IOException | IllegalStateException ex) { + // ISE if we fail the handshake with an version incompatible node + if (seedNodes.hasNext()) { + logger.debug((Supplier) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed", + clusterAlias), ex); + collectRemoteNodes(seedNodes, transportService, listener); + } else { + listener.onFailure(ex); + } + } + } + + @Override + public void close() throws IOException { + try { + if (closed.compareAndSet(false, true)) { + cancellableThreads.cancel("connect handler is closed"); + running.acquire(); // acquire the semaphore to ensure all connections are closed and all thread joined + running.release(); + maybeConnect(); // now go and notify pending listeners + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + final boolean isClosed() { + return closed.get(); + } + + /* This class handles the _state response from the remote cluster when sniffing nodes to connect to */ + private class SniffClusterStateResponseHandler implements TransportResponseHandler { + + private final TransportService transportService; + private final Transport.Connection connection; + private final ActionListener listener; + private final Iterator seedNodes; + private final CancellableThreads cancellableThreads; + + SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection, + ActionListener listener, Iterator seedNodes, + CancellableThreads cancellableThreads) { + this.transportService = transportService; + this.connection = connection; + this.listener = listener; + this.seedNodes = seedNodes; + this.cancellableThreads = cancellableThreads; + } + + @Override + public ClusterStateResponse newInstance() { + return new ClusterStateResponse(); + } + + @Override + public void handleResponse(ClusterStateResponse response) { + try { + cancellableThreads.executeIO(() -> { + DiscoveryNodes nodes = response.getState().nodes(); + Iterable nodesIter = nodes.getNodes()::valuesIt; + for (DiscoveryNode node : nodesIter) { + if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { + try { + transportService.connectToNode(node, remoteProfile); // noop if node is connected + connectedNodes.add(node); + } catch (ConnectTransportException | IllegalStateException ex) { + // ISE if we fail the handshake with an version incompatible node + // fair enough we can't connect just move on + logger.debug((Supplier) + () -> new ParameterizedMessage("failed to connect to node {}", node), ex); + } + } + } + }); + connection.close(); + listener.onResponse(null); + } catch (CancellableThreads.ExecutionCancelledException ex) { + listener.onFailure(ex); // we got canceled - fail the listener and step out + } catch (Exception ex) { + logger.warn((Supplier) + () -> new ParameterizedMessage("fetching nodes from external cluster {} failed", + clusterAlias), ex); + collectRemoteNodes(seedNodes, transportService, listener); + } finally { + // just to make sure we don't leak anything we close the connection here again even if we managed to do so before + IOUtils.closeWhileHandlingException(connection); + } + } + + @Override + public void handleException(TransportException exp) { + logger.warn((Supplier) + () -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), + exp); + try { + IOUtils.closeWhileHandlingException(connection); + } finally { + // once the connection is closed lets try the next node + collectRemoteNodes(seedNodes, transportService, listener); + } + } + + @Override + public String executor() { + return ThreadPool.Names.MANAGEMENT; + } + } + } + + boolean assertNoRunningConnections() { // for testing only + assert connectHandler.running.availablePermits() == 1; + return true; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java new file mode 100644 index 00000000000..92b4ca5f0b3 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java @@ -0,0 +1,395 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.apache.logging.log4j.util.Supplier; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.PlainShardIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportService; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Basic service for accessing remote clusters via gateway nodes + */ +public final class RemoteClusterService extends AbstractComponent implements Closeable { + + static final String LOCAL_CLUSTER_GROUP_KEY = ""; + + /** + * A list of initial seed nodes to discover eligible nodes from the remote cluster + */ + public static final Setting.AffixSetting> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.", + "seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterService::parseSeedAddress, + Setting.Property.NodeScope, Setting.Property.Dynamic)); + /** + * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single + * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. + */ + public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster", + 3, 1, Setting.Property.NodeScope); + + /** + * The initial connect timeout for remote cluster connections + */ + public static final Setting REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING = + Setting.positiveTimeSetting("search.remote.initial_connect_timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope); + + /** + * The name of a node attribute to select nodes that should be connected to in the remote cluster. + * For instance a node can be configured with node.attr.gateway: true in order to be eligible as a gateway node between + * clusters. In that case search.remote.node.attr: gateway can be used to filter out other nodes in the remote cluster. + * The value of the setting is expected to be a boolean, true for nodes that can become gateways, false otherwise. + */ + public static final Setting REMOTE_NODE_ATTRIBUTE = Setting.simpleString("search.remote.node.attr", + Setting.Property.NodeScope); + + private static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; + + private final TransportService transportService; + private final int numRemoteConnections; + private volatile Map remoteClusters = Collections.emptyMap(); + + RemoteClusterService(Settings settings, TransportService transportService) { + super(settings); + this.transportService = transportService; + numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings); + } + + /** + * This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure + * @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes + * @param connectionListener a listener invoked once every configured cluster has been connected to + */ + private synchronized void updateRemoteClusters(Map> seeds, ActionListener connectionListener) { + if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) { + throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); + } + Map remoteClusters = new HashMap<>(); + if (seeds.isEmpty()) { + connectionListener.onResponse(null); + } else { + CountDown countDown = new CountDown(seeds.size()); + Predicate nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion()); + if (REMOTE_NODE_ATTRIBUTE.exists(settings)) { + // nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for + // cross cluster search + String attribute = REMOTE_NODE_ATTRIBUTE.get(settings); + nodePredicate = nodePredicate.and((node) -> Boolean.getBoolean(node.getAttributes().getOrDefault(attribute, "false"))); + } + remoteClusters.putAll(this.remoteClusters); + for (Map.Entry> entry : seeds.entrySet()) { + RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); + if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection + try { + IOUtils.close(remote); + } catch (IOException e) { + logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), e); + } + remoteClusters.remove(entry.getKey()); + continue; + } + + if (remote == null) { // this is a new cluster we have to add a new representation + remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections, + nodePredicate); + remoteClusters.put(entry.getKey(), remote); + } + + // now update the seed nodes no matter if it's new or already existing + RemoteClusterConnection finalRemote = remote; + remote.updateSeedNodes(entry.getValue(), ActionListener.wrap( + response -> { + if (countDown.countDown()) { + connectionListener.onResponse(response); + } + }, + exception -> { + if (countDown.fastForward()) { + connectionListener.onFailure(exception); + } + if (finalRemote.isClosed() == false) { + logger.warn("failed to update seed list for cluster: " + entry.getKey(), exception); + } + })); + } + } + this.remoteClusters = Collections.unmodifiableMap(remoteClusters); + } + + /** + * Returns true if at least one remote cluster is configured + */ + boolean isCrossClusterSearchEnabled() { + return remoteClusters.isEmpty() == false; + } + + /** + * Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All + * indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under + * {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable. + * + * @param requestIndices the indices in the search request to filter + * @param indexExists a predicate that can test if a certain index or alias exists + * + * @return a map of grouped remote and local indices + */ + Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) { + Map> perClusterIndices = new HashMap<>(); + for (String index : requestIndices) { + int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR); + String indexName = index; + String clusterName = LOCAL_CLUSTER_GROUP_KEY; + if (i >= 0) { + String remoteClusterName = index.substring(0, i); + if (isRemoteClusterRegistered(remoteClusterName)) { + if (indexExists.test(index)) { + // we use : as a separator for remote clusters. might conflict if there is an index that is actually named + // remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias + // if that happens + throw new IllegalArgumentException("Can not filter indices; index " + index + + " exists but there is also a remote cluster named: " + remoteClusterName); + } + indexName = index.substring(i + 1); + clusterName = remoteClusterName; + } + } + perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList()).add(indexName); + } + return perClusterIndices; +} + + /** + * Returns true iff the given cluster is configured as a remote cluster. Otherwise false + */ + boolean isRemoteClusterRegistered(String clusterName) { + return remoteClusters.containsKey(clusterName); + } + + void collectSearchShards(SearchRequest searchRequest, Map> remoteIndicesByCluster, + ActionListener> listener) { + final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); + final Map searchShardsResponses = new ConcurrentHashMap<>(); + final AtomicReference transportException = new AtomicReference<>(); + for (Map.Entry> entry : remoteIndicesByCluster.entrySet()) { + final String clusterName = entry.getKey(); + RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName); + if (remoteClusterConnection == null) { + throw new IllegalArgumentException("no such remote cluster: " + clusterName); + } + final List indices = entry.getValue(); + remoteClusterConnection.fetchSearchShards(searchRequest, indices, + new ActionListener() { + @Override + public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { + searchShardsResponses.put(clusterName, clusterSearchShardsResponse); + if (responsesCountDown.countDown()) { + TransportException exception = transportException.get(); + if (exception == null) { + listener.onResponse(searchShardsResponses); + } else { + listener.onFailure(transportException.get()); + } + } + } + + @Override + public void onFailure(Exception e) { + TransportException exception = new TransportException("unable to communicate with remote cluster [" + + clusterName + "]", e); + if (transportException.compareAndSet(null, exception) == false) { + exception = transportException.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + if (responsesCountDown.countDown()) { + listener.onFailure(exception); + } + } + }); + } + } + + + Function processRemoteShards(Map searchShardsResponses, + List remoteShardIterators, + Map aliasFilterMap) { + Map> nodeToCluster = new HashMap<>(); + for (Map.Entry entry : searchShardsResponses.entrySet()) { + String clusterName = entry.getKey(); + ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); + for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) { + nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterName)); + } + Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters(); + for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { + //add the cluster name to the remote index names for indices disambiguation + //this ends up in the hits returned with the search response + ShardId shardId = clusterSearchShardsGroup.getShardId(); + Index remoteIndex = shardId.getIndex(); + Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID()); + ShardIterator shardIterator = new PlainShardIterator(new ShardId(index, shardId.getId()), + Arrays.asList(clusterSearchShardsGroup.getShards())); + remoteShardIterators.add(shardIterator); + AliasFilter aliasFilter; + if (indicesAndFilters == null) { + aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY); + } else { + aliasFilter = indicesAndFilters.get(shardId.getIndexName()); + assert aliasFilter != null; + } + // here we have to map the filters to the UUID since from now on we use the uuid for the lookup + aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter); + } + } + return (nodeId) -> { + Supplier supplier = nodeToCluster.get(nodeId); + if (supplier == null) { + throw new IllegalArgumentException("unknown remote node: " + nodeId); + } + return supplier.get(); + }; + } + + /** + * Returns a connection to the given node on the given remote cluster + * @throws IllegalArgumentException if the remote cluster is unknown + */ + private Transport.Connection getConnection(DiscoveryNode node, String cluster) { + RemoteClusterConnection connection = remoteClusters.get(cluster); + if (connection == null) { + throw new IllegalArgumentException("no such remote cluster: " + cluster); + } + return connection.getConnection(node); + } + + void updateRemoteCluster(String clusterAlias, List addresses) { + updateRemoteClusters(Collections.singletonMap(clusterAlias, addresses.stream().map(address -> { + TransportAddress transportAddress = new TransportAddress(address); + return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(), + transportAddress, + Version.CURRENT.minimumCompatibilityVersion()); + }).collect(Collectors.toList())), + ActionListener.wrap((x) -> {}, (x) -> {}) ); + } + + static Map> buildRemoteClustersSeeds(Settings settings) { + Stream>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); + return allConcreteSettings.collect( + Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> { + String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting); + List nodes = new ArrayList<>(); + for (InetSocketAddress address : concreteSetting.get(settings)) { + TransportAddress transportAddress = new TransportAddress(address); + DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(), + transportAddress, + Version.CURRENT.minimumCompatibilityVersion()); + nodes.add(node); + } + return nodes; + })); + } + + private static InetSocketAddress parseSeedAddress(String remoteHost) { + int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 + if (portSeparator == -1 || portSeparator == remoteHost.length()) { + throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); + } + String host = remoteHost.substring(0, portSeparator); + InetAddress hostAddress; + try { + hostAddress = InetAddress.getByName(host); + } catch (UnknownHostException e) { + throw new IllegalArgumentException("unknown host [" + host + "]", e); + } + try { + int port = Integer.valueOf(remoteHost.substring(portSeparator + 1)); + if (port <= 0) { + throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]"); + } + return new InetSocketAddress(hostAddress, port); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("port must be a number", e); + } + + } + + /** + * Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection + * to all configured seed nodes. + */ + void initializeRemoteClusters() { + final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); + final PlainActionFuture future = new PlainActionFuture<>(); + Map> seeds = buildRemoteClustersSeeds(settings); + updateRemoteClusters(seeds, future); + try { + future.get(timeValue.millis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (TimeoutException ex) { + logger.warn("failed to connect to remote clusters within {}", timeValue.toString()); + } catch (Exception e) { + throw new IllegalStateException("failed to connect to remote clusters", e); + } + } + + @Override + public void close() throws IOException { + IOUtils.close(remoteClusters.values()); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java index 5d73120efab..3b045df842f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchRequest; +import org.elasticsearch.transport.Transport; import java.io.IOException; import java.util.Map; @@ -46,12 +47,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction queryFetchResults; private final SearchPhaseController searchPhaseController; SearchDfsQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, - Function nodeIdToDiscoveryNode, + Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, concreteIndexBoosts, executor, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); this.searchPhaseController = searchPhaseController; queryFetchResults = new AtomicArray<>(firstResults.length()); @@ -63,9 +64,9 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction listener) { - searchTransportService.sendExecuteDfs(node, request, task, listener); + searchTransportService.sendExecuteDfs(connection, request, task, listener); } @Override @@ -75,15 +76,15 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().getNodeId()); + Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId()); QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); + executeSecondPhase(entry.index, dfsResult, counter, connection, querySearchRequest); } } void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, - final DiscoveryNode node, final QuerySearchRequest querySearchRequest) { - searchTransportService.sendExecuteFetch(node, querySearchRequest, task, new ActionListener() { + final Transport.Connection connection, final QuerySearchRequest querySearchRequest) { + searchTransportService.sendExecuteFetch(connection, querySearchRequest, task, new ActionListener() { @Override public void onResponse(QueryFetchSearchResult result) { result.shardTarget(dfsResult.shardTarget()); @@ -101,7 +102,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToDiscoveryNode, + Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, concreteIndexBoosts, executor, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); this.searchPhaseController = searchPhaseController; queryResults = new AtomicArray<>(firstResults.length()); @@ -73,9 +74,9 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { - searchTransportService.sendExecuteDfs(node, request, task, listener); + searchTransportService.sendExecuteDfs(connection, request, task, listener); } @Override @@ -84,15 +85,15 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().getNodeId()); + Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId()); QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, connection); } } void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, - final QuerySearchRequest querySearchRequest, final DiscoveryNode node) { - searchTransportService.sendExecuteQuery(node, querySearchRequest, task, new ActionListener() { + final QuerySearchRequest querySearchRequest, final Transport.Connection connection) { + searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener() { @Override public void onResponse(QuerySearchResult result) { result.shardTarget(dfsResult.shardTarget()); @@ -110,7 +111,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = queryResults.get(entry.index); - DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().getNodeId()); + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection); } } void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, - final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener() { + final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) { + searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index f597ede64bc..5b20299f98c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.transport.Transport; import java.io.IOException; import java.util.Map; @@ -39,13 +40,13 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToDiscoveryNode, + Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, concreteIndexBoosts, executor, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); this.searchPhaseController = searchPhaseController; @@ -57,9 +58,9 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction listener) { - searchTransportService.sendExecuteFetch(node, request, task, listener); + searchTransportService.sendExecuteFetch(connection, request, task, listener); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index b36728af7b0..5644b03b989 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.transport.Transport; import java.io.IOException; import java.util.Map; @@ -50,13 +51,13 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToDiscoveryNode, + Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, concreteIndexBoosts, executor, request, listener, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); this.searchPhaseController = searchPhaseController; fetchResults = new AtomicArray<>(firstResults.length()); @@ -69,9 +70,9 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { - searchTransportService.sendExecuteQuery(node, request, task, listener); + searchTransportService.sendExecuteQuery(connection, request, task, listener); } @Override @@ -90,15 +91,15 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction entry : docIdsToLoad.asList()) { QuerySearchResultProvider queryResult = firstResults.get(entry.index); - DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().getNodeId()); + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection); } } void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, - final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener() { + final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) { + searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 5b052132566..2eb6633b1f7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -25,9 +25,10 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -44,19 +45,23 @@ import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TaskAwareTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.Consumer; /** * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through * transport. */ -public class SearchTransportService extends AbstractComponent { +public class SearchTransportService extends AbstractLifecycleComponent { public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]"; public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]"; @@ -72,15 +77,19 @@ public class SearchTransportService extends AbstractComponent { public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; private final TransportService transportService; + private final RemoteClusterService remoteClusterService; - public SearchTransportService(Settings settings, TransportService transportService) { + public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) { super(settings); this.transportService = transportService; + this.remoteClusterService = new RemoteClusterService(settings, transportService); + clusterSettings.addAffixUpdateConsumer(RemoteClusterService.REMOTE_CLUSTERS_SEEDS, remoteClusterService::updateRemoteCluster, + (namespace, value) -> {}); } - public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { - transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), - new ActionListenerResponseHandler<>(new ActionListener() { + public void sendFreeContext(Transport.Connection connection, final long contextId, SearchRequest request) { + transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(new ActionListener() { @Override public void onResponse(SearchFreeContextResponse response) { // no need to respond if it was freed or not @@ -103,64 +112,68 @@ public class SearchTransportService extends AbstractComponent { new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); } - public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task, + public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(node, DFS_ACTION_NAME, request, task, + transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); } - public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task, + public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(node, QUERY_ACTION_NAME, request, task, + transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); } - public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, SearchTask task, + public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(node, QUERY_ID_ACTION_NAME, request, task, + transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); } public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(node, QUERY_SCROLL_ACTION_NAME, request, task, + transportService.sendChildRequest(transportService.getConnection(node), QUERY_SCROLL_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); } - public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task, + public void sendExecuteFetch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(node, QUERY_FETCH_ACTION_NAME, request, task, + transportService.sendChildRequest(connection, QUERY_FETCH_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); } - public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, SearchTask task, + public void sendExecuteFetch(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, task, + transportService.sendChildRequest(connection, QUERY_QUERY_FETCH_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); } public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, + transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); } - public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, SearchTask task, + public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, final ActionListener listener) { - sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, task, listener); + sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener); } public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, SearchTask task, final ActionListener listener) { - sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, task, listener); + sendExecuteFetch(transportService.getConnection(node), FETCH_ID_SCROLL_ACTION_NAME, request, task, listener); } - private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, SearchTask task, + private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(node, action, request, task, + transportService.sendChildRequest(connection, action, request, task, new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); } + public RemoteClusterService getRemoteClusterService() { + return remoteClusterService; + } + static class ScrollFreeContextRequest extends TransportRequest { private long id; @@ -265,6 +278,7 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(new SearchFreeContextResponse(freed)); } }); + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override @@ -273,6 +287,7 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(new SearchFreeContextResponse(freed)); } }); + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @@ -282,6 +297,9 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(TransportResponse.Empty.INSTANCE); } }); + TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, + () -> TransportResponse.Empty.INSTANCE); + transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -291,6 +309,8 @@ public class SearchTransportService extends AbstractComponent { } }); + TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new); + transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -299,6 +319,8 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(result); } }); + TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new); + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -307,6 +329,8 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(result); } }); + TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new); + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -315,6 +339,8 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(result); } }); + TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new); + transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -323,6 +349,8 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(result); } }); + TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new); + transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -331,6 +359,8 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(result); } }); + TransportActionProxy.registerProxyAction(transportService, QUERY_QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new); + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -339,6 +369,8 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(result); } }); + TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new); + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -347,6 +379,8 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(result); } }); + TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new); + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override @@ -355,6 +389,24 @@ public class SearchTransportService extends AbstractComponent { channel.sendResponse(result); } }); + TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); + } + Transport.Connection getConnection(DiscoveryNode node) { + return transportService.getConnection(node); + } + + @Override + protected void doStart() { + // here we start to connect to the remote clusters + remoteClusterService.initializeRemoteClusters(); + } + + @Override + protected void doStop() {} + + @Override + protected void doClose() throws IOException { + remoteClusterService.close(); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 48ee5cc288b..f67fc543425 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -26,8 +26,11 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -38,10 +41,13 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; @@ -58,6 +64,7 @@ public class TransportSearchAction extends HandledTransportAction buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState, Index[] concreteIndices) { + private Map buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState, + Index[] concreteIndices, Map remoteAliasMap) { final Map aliasFilterMap = new HashMap<>(); for (Index index : concreteIndices) { clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index.getName()); @@ -82,6 +91,7 @@ public class TransportSearchAction extends HandledTransportAction listener) { // pure paranoia if time goes backwards we are at least positive final long startTimeInMillis = Math.max(0, System.currentTimeMillis()); - ClusterState clusterState = clusterService.state(); - clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); + final String[] localIndices; + final Map> remoteClusterIndices; + final ClusterState clusterState = clusterService.state(); + if (remoteClusterService.isCrossClusterSearchEnabled()) { + remoteClusterIndices = remoteClusterService.groupClusterIndices( searchRequest.indices(), // empty string is not allowed + idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); + List remove = remoteClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY); + localIndices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]); + } else { + remoteClusterIndices = Collections.emptyMap(); + localIndices = searchRequest.indices(); + } + if (remoteClusterIndices.isEmpty()) { + executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(), + (nodeId) -> null, clusterState, Collections.emptyMap(), listener); + } else { + remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices, + ActionListener.wrap((searchShardsResponses) -> { + List remoteShardIterators = new ArrayList<>(); + Map remoteAliasFilters = new HashMap<>(); + Function connectionFunction = remoteClusterService.processRemoteShards( + searchShardsResponses, remoteShardIterators, remoteAliasFilters); + executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, remoteShardIterators, + connectionFunction, clusterState, remoteAliasFilters, listener); + }, listener::onFailure)); + } + } + + private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices, + List remoteShardIterators, Function remoteConnections, + ClusterState clusterState, Map remoteAliasMap, + ActionListener listener) { + + clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead // of just for the _search api - Index[] indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(), - startTimeInMillis, searchRequest.indices()); - Map aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices); + final Index[] indices; + if (localIndices.length == 0 && remoteShardIterators.size() > 0) { + indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified + } else { + indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(), + startTimeInMillis, localIndices); + } + Map aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); String[] concreteIndices = new String[indices.length]; for (int i = 0; i < indices.length; i++) { concreteIndices[i] = indices[i].getName(); } - GroupShardsIterator shardIterators = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, + GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference()); + GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, remoteShardIterators); + failIfOverShardCountLimit(clusterService, shardIterators.size()); Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); @@ -150,41 +198,69 @@ public class TransportSearchAction extends HandledTransportAction connectionLookup = (nodeId) -> { + final DiscoveryNode discoveryNode = nodes.get(nodeId); + final Transport.Connection connection; + if (discoveryNode != null) { + connection = searchTransportService.getConnection(discoveryNode); + } else { + connection = remoteConnections.apply(nodeId); + } + if (connection == null) { + throw new IllegalStateException("no node found for id: " + nodeId); + } + return connection; + }; + searchAsyncAction(task, searchRequest, shardIterators, startTimeInMillis, connectionLookup, clusterState.version(), Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start(); } + private static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator, + List remoteShardIterators) { + if (remoteShardIterators.isEmpty()) { + return localShardsIterator; + } + List shards = new ArrayList<>(); + for (ShardIterator shardIterator : remoteShardIterators) { + shards.add(shardIterator); + } + for (ShardIterator shardIterator : localShardsIterator) { + shards.add(shardIterator); + } + return new GroupShardsIterator(shards); + } + @Override protected final void doExecute(SearchRequest searchRequest, ActionListener listener) { throw new UnsupportedOperationException("the task parameter is required"); } private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest, GroupShardsIterator shardIterators, - long startTime, ClusterState state, Map aliasFilter, + long startTime, Function connectionLookup, + long clusterStateVersion, Map aliasFilter, Map concreteIndexBoosts, ActionListener listener) { - final Function nodesLookup = state.nodes()::get; - final long clusterStateVersion = state.version(); Executor executor = threadPool.executor(ThreadPool.Names.SEARCH); AbstractSearchAsyncAction searchAsyncAction; switch(searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: - searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup, + searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, clusterStateVersion, task); break; case QUERY_THEN_FETCH: - searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup, + searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, clusterStateVersion, task); break; case DFS_QUERY_AND_FETCH: - searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup, + searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, clusterStateVersion, task); break; case QUERY_AND_FETCH: - searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup, + searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, clusterStateVersion, task); break; @@ -194,7 +270,7 @@ public class TransportSearchAction extends HandledTransportAction shardCountLimit) { throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of " diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 34a574077b2..ff010f9f949 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.settings; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; +import org.elasticsearch.action.search.RemoteClusterService; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; @@ -54,9 +55,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; -import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.FaultDetection; import org.elasticsearch.discovery.zen.UnicastZenPing; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayService; @@ -253,6 +254,10 @@ public final class ClusterSettings extends AbstractScopedSettings { SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, + RemoteClusterService.REMOTE_CLUSTERS_SEEDS, + RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, + RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, + RemoteClusterService.REMOTE_NODE_ATTRIBUTE, TransportService.TRACE_LOG_EXCLUDE_SETTING, TransportService.TRACE_LOG_INCLUDE_SETTING, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/common/settings/Setting.java b/core/src/main/java/org/elasticsearch/common/settings/Setting.java index 5f067f27e90..45ebe1b061c 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/core/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -481,7 +482,7 @@ public class Setting extends ToXContentToBytes { public Map, T> getValue(Settings current, Settings previous) { // we collect all concrete keys and then delegate to the actual setting for validation and settings extraction final Map, T> result = new IdentityHashMap<>(); - Stream.concat(matchStream(current), matchStream(previous)).forEach(aKey -> { + Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); AbstractScopedSettings.SettingUpdater updater = getConcreteSetting(aKey).newUpdater((v) -> consumer.accept(namespace, v), logger, @@ -505,6 +506,18 @@ public class Setting extends ToXContentToBytes { }; } + @Override + public T get(Settings settings) { + throw new UnsupportedOperationException("affix settings can't return values" + + " use #getConcreteSetting to obtain a concrete setting"); + } + + @Override + public String getRaw(Settings settings) { + throw new UnsupportedOperationException("affix settings can't return values" + + " use #getConcreteSetting to obtain a concrete setting"); + } + @Override public Setting getConcreteSetting(String key) { if (match(key)) { @@ -518,6 +531,22 @@ public class Setting extends ToXContentToBytes { public void diff(Settings.Builder builder, Settings source, Settings defaultSettings) { matchStream(defaultSettings).forEach((key) -> getConcreteSetting(key).diff(builder, source, defaultSettings)); } + + /** + * Returns the namespace for a concrete settting. Ie. an affix setting with prefix: search. and suffix: username + * will return remote as a namespace for the setting search.remote.username + */ + public String getNamespace(Setting concreteSetting) { + return key.getNamespace(concreteSetting.getKey()); + } + + /** + * Returns a stream of all concrete setting instances for the given settings. AffixSetting is only a specification, concrete + * settings depend on an actual set of setting keys. + */ + public Stream> getAllConcreteSettings(Settings settings) { + return matchStream(settings).distinct().map(this::getConcreteSetting); + } } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 05ea6a9bda8..e3fe68838d4 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -448,7 +448,8 @@ public class Node implements Closeable { b.bind(IndicesService.class).toInstance(indicesService); b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase())); - b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings, transportService)); + b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings, + settingsModule.getClusterSettings(), transportService)); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays, scriptModule.getScriptService())); b.bind(Transport.class).toInstance(transport); @@ -643,12 +644,16 @@ public class Node implements Closeable { } } + if (NetworkModule.HTTP_ENABLED.get(settings)) { injector.getInstance(HttpServerTransport.class).start(); } // start nodes now, after the http server, because it may take some time tribeService.startNodes(); + // starts connecting to remote clusters if any cluster is configured + SearchTransportService searchTransportService = injector.getInstance(SearchTransportService.class); + searchTransportService.start(); if (WRITE_PORTS_FIELD_SETTING.get(settings)) { if (NetworkModule.HTTP_ENABLED.get(settings)) { @@ -692,6 +697,7 @@ public class Node implements Closeable { injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); + injector.getInstance(SearchTransportService.class).stop(); pluginLifecycleComponents.forEach(LifecycleComponent::stop); // we should stop this last since it waits for resources to get released @@ -753,6 +759,8 @@ public class Node implements Closeable { toClose.add(injector.getInstance(SearchService.class)); toClose.add(() -> stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); + toClose.add(() -> stopWatch.stop().start("search_transport_service")); + toClose.add(injector.getInstance(SearchTransportService.class)); for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); diff --git a/core/src/main/java/org/elasticsearch/search/internal/AliasFilter.java b/core/src/main/java/org/elasticsearch/search/internal/AliasFilter.java index ed82cbd1b69..915e745ddf4 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/AliasFilter.java +++ b/core/src/main/java/org/elasticsearch/search/internal/AliasFilter.java @@ -128,4 +128,13 @@ public final class AliasFilter implements Writeable { public int hashCode() { return Objects.hash(aliases, filter, reparseAliases); } + + @Override + public String toString() { + return "AliasFilter{" + + "aliases=" + Arrays.toString(aliases) + + ", filter=" + filter + + ", reparseAliases=" + reparseAliases + + '}'; + } } diff --git a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index 8c14c6b5c9d..2dc605cf3d4 100644 --- a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -198,7 +198,7 @@ public final class ConnectionProfile { */ T getChannel(T[] channels) { if (length == 0) { - throw new IllegalStateException("can't select channel size is 0"); + throw new IllegalStateException("can't select channel size is 0 for types: " + types); } assert channels.length >= offset + length : "illegal size: " + channels.length + " expected >= " + (offset + length); return channels[offset + Math.floorMod(counter.incrementAndGet(), length)]; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java new file mode 100644 index 00000000000..30b7299f59d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -0,0 +1,150 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.io.UncheckedIOException; + +/** + * TransportActionProxy allows an arbitrary action to be executed on a defined target node while the initial request is sent to a second + * node that acts as a request proxy to the target node. This is useful if a node is not directly connected to a target node but is + * connected to an intermediate node that establishes a transitive connection. + */ +public final class TransportActionProxy { + + private TransportActionProxy() {} // no instance + + private static class ProxyRequestHandler implements TransportRequestHandler { + + private final TransportService service; + private final String action; + private final Supplier responseFactory; + + ProxyRequestHandler(TransportService service, String action, Supplier responseFactory) { + this.service = service; + this.action = action; + this.responseFactory = responseFactory; + } + + @Override + public void messageReceived(T request, TransportChannel channel) throws Exception { + DiscoveryNode targetNode = request.targetNode; + TransportRequest wrappedRequest = request.wrapped; + service.sendRequest(targetNode, action, wrappedRequest, new ProxyResponseHandler<>(channel, responseFactory)); + } + } + + private static class ProxyResponseHandler implements TransportResponseHandler { + + private final Supplier responseFactory; + private final TransportChannel channel; + + ProxyResponseHandler(TransportChannel channel, Supplier responseFactory) { + this.responseFactory = responseFactory; + this.channel = channel; + + } + @Override + public T newInstance() { + return responseFactory.get(); + } + + @Override + public void handleResponse(T response) { + try { + channel.sendResponse(response); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void handleException(TransportException exp) { + try { + channel.sendResponse(exp); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + + static class ProxyRequest extends TransportRequest { + T wrapped; + Supplier supplier; + DiscoveryNode targetNode; + + ProxyRequest(Supplier supplier) { + this.supplier = supplier; + } + + ProxyRequest(T wrapped, DiscoveryNode targetNode) { + this.wrapped = wrapped; + this.targetNode = targetNode; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + targetNode = new DiscoveryNode(in); + wrapped = supplier.get(); + wrapped.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + targetNode.writeTo(out); + wrapped.writeTo(out); + } + } + + /** + * Registers a proxy request handler that allows to forward requests for the given action to another node. + */ + public static void registerProxyAction(TransportService service, String action, Supplier responseSupplier) { + RequestHandlerRegistry requestHandler = service.getRequestHandler(action); + service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME, + true, false, new ProxyRequestHandler<>(service, action, responseSupplier)); + } + + /** + * Returns the corresponding proxy action for the given action + */ + public static String getProxyAction(String action) { + return "internal:transport/proxy/" + action; + } + + /** + * Wraps the actual request in a proxy request object that encodes the target node. + */ + public static TransportRequest wrapRequest(DiscoveryNode node, TransportRequest request) { + return new ProxyRequest<>(request, node); + } +} diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 1978fd0f48f..8c816d12be1 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -63,6 +63,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; @@ -163,9 +164,6 @@ public class TransportService extends AbstractLifecycleComponent { } } - /** - * Returns the local node representation - */ public DiscoveryNode getLocalNode() { return localNode; } @@ -341,6 +339,25 @@ public class TransportService extends AbstractLifecycleComponent { public DiscoveryNode handshake( final Transport.Connection connection, final long handshakeTimeout) throws ConnectTransportException { + return handshake(connection, handshakeTimeout, clusterName::equals); + } + + /** + * Executes a high-level handshake using the given connection + * and returns the discovery node of the node the connection + * was established with. The handshake will fail if the cluster + * name on the target node doesn't match the local cluster name. + * + * @param connection the connection to a specific node + * @param handshakeTimeout handshake timeout + * @param clusterNamePredicate cluster name validation predicate + * @return the connected node + * @throws ConnectTransportException if the connection failed + * @throws IllegalStateException if the handshake failed + */ + public DiscoveryNode handshake( + final Transport.Connection connection, + final long handshakeTimeout, Predicate clusterNamePredicate) throws ConnectTransportException { final HandshakeResponse response; final DiscoveryNode node = connection.getNode(); try { @@ -358,7 +375,7 @@ public class TransportService extends AbstractLifecycleComponent { throw new IllegalStateException("handshake failed with " + node, e); } - if (!Objects.equals(clusterName, response.clusterName)) { + if (!clusterNamePredicate.test(response.clusterName)) { throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node); } else if (response.version.isCompatible(localNode.getVersion()) == false) { throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node); @@ -486,19 +503,18 @@ public class TransportService extends AbstractLifecycleComponent { } } - public void sendChildRequest(final DiscoveryNode node, final String action, + public void sendChildRequest(final Transport.Connection connection, final String action, final TransportRequest request, final Task parentTask, final TransportResponseHandler handler) { - sendChildRequest(node, action, request, parentTask, TransportRequestOptions.EMPTY, handler); + sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler); } - public void sendChildRequest(final DiscoveryNode node, final String action, + public void sendChildRequest(final Transport.Connection connection, final String action, final TransportRequest request, final Task parentTask, final TransportRequestOptions options, final TransportResponseHandler handler) { request.setParentTask(localNode.getId(), parentTask.getId()); try { - final Transport.Connection connection = getConnection(node); sendRequest(connection, action, request, options, handler); } catch (TaskCancelledException ex) { // The parent task is already cancelled - just fail the request @@ -1134,6 +1150,13 @@ public class TransportService extends AbstractLifecycleComponent { } } + /** + * Returns the internal thread pool + */ + public ThreadPool getThreadPool() { + return threadPool; + } + private boolean isLocalNode(DiscoveryNode discoveryNode) { return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode); } diff --git a/core/src/test/java/org/elasticsearch/action/ActionListenerTests.java b/core/src/test/java/org/elasticsearch/action/ActionListenerTests.java new file mode 100644 index 00000000000..6414c81058b --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/ActionListenerTests.java @@ -0,0 +1,151 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action; + +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class ActionListenerTests extends ESTestCase { + + public void testWrap() { + AtomicReference reference = new AtomicReference<>(); + AtomicReference exReference = new AtomicReference<>(); + + CheckedConsumer handler = (o) -> { + if (Boolean.FALSE.equals(o)) { + throw new IllegalArgumentException("must not be false"); + } + reference.set(o); + }; + ActionListener wrap = ActionListener.wrap(handler, exReference::set); + wrap.onResponse(Boolean.FALSE); + assertNull(reference.get()); + assertNotNull(exReference.get()); + assertEquals("must not be false", exReference.get().getMessage()); + exReference.set(null); + + wrap.onResponse(Boolean.TRUE); + assertTrue(reference.get()); + assertNull(exReference.get()); + } + + public void testOnResponse() { + final int numListeners = randomIntBetween(1, 20); + List> refList = new ArrayList<>(); + List> excList = new ArrayList<>(); + List> listeners = new ArrayList<>(); + List failOnTrue = new ArrayList<>(); + AtomicInteger exceptionCounter = new AtomicInteger(0); + for (int i = 0; i < numListeners; i++) { + boolean doFailOnTrue = rarely(); + failOnTrue.add(doFailOnTrue); + AtomicReference reference = new AtomicReference<>(); + AtomicReference exReference = new AtomicReference<>(); + refList.add(reference); + excList.add(exReference); + CheckedConsumer handler = (o) -> { + if (Boolean.FALSE.equals(o)) { + throw new IllegalArgumentException("must not be false " + exceptionCounter.getAndIncrement()); + } + if (doFailOnTrue) { + throw new IllegalStateException("must not be true"); + } + reference.set(o); + }; + listeners.add(ActionListener.wrap(handler, exReference::set)); + } + + ActionListener.onResponse(listeners, Boolean.TRUE); + for (int i = 0; i < numListeners; i++) { + if (failOnTrue.get(i) == false) { + assertTrue("listener index " + i, refList.get(i).get()); + refList.get(i).set(null); + } else { + assertNull("listener index " + i, refList.get(i).get()); + } + + } + + for (int i = 0; i < numListeners; i++) { + if (failOnTrue.get(i) == false) { + assertNull("listener index " + i, excList.get(i).get()); + } else { + assertEquals("listener index " + i, "must not be true", excList.get(i).get().getMessage()); + } + } + + ActionListener.onResponse(listeners, Boolean.FALSE); + for (int i = 0; i < numListeners; i++) { + assertNull("listener index " + i, refList.get(i).get()); + } + + assertEquals(numListeners, exceptionCounter.get()); + for (int i = 0; i < numListeners; i++) { + assertNotNull(excList.get(i).get()); + assertEquals("listener index " + i, "must not be false " + i, excList.get(i).get().getMessage()); + } + } + + public void testOnFailure() { + final int numListeners = randomIntBetween(1, 20); + List> refList = new ArrayList<>(); + List> excList = new ArrayList<>(); + List> listeners = new ArrayList<>(); + + final int listenerToFail = randomBoolean() ? -1 : randomIntBetween(0, numListeners-1); + for (int i = 0; i < numListeners; i++) { + AtomicReference reference = new AtomicReference<>(); + AtomicReference exReference = new AtomicReference<>(); + refList.add(reference); + excList.add(exReference); + boolean fail = i == listenerToFail; + CheckedConsumer handler = (o) -> { + reference.set(o); + }; + listeners.add(ActionListener.wrap(handler, (e) -> { + exReference.set(e); + if (fail) { + throw new RuntimeException("double boom"); + } + })); + } + + try { + ActionListener.onFailure(listeners, new Exception("booom")); + assertTrue("unexpected succces listener to fail: " + listenerToFail, listenerToFail == -1); + } catch (RuntimeException ex) { + assertTrue("listener to fail: " + listenerToFail, listenerToFail >= 0); + assertNotNull(ex.getCause()); + assertEquals("double boom", ex.getCause().getMessage()); + } + + for (int i = 0; i < numListeners; i++) { + assertNull("listener index " + i, refList.get(i).get()); + } + + for (int i = 0; i < numListeners; i++) { + assertEquals("listener index " + i, "booom", excList.get(i).get().getMessage()); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java new file mode 100644 index 00000000000..f08f810f33f --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java @@ -0,0 +1,511 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.mocksocket.MockServerSocket; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportConnectionListener; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.channels.AlreadyConnectedException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; + +public class RemoteClusterConnectionTests extends ESTestCase { + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, List knownNodes, Version version) { + return startTransport(id, knownNodes, version, threadPool); + } + + public static MockTransportService startTransport(String id, List knownNodes, Version version, ThreadPool threadPool) { + boolean success = false; + MockTransportService newService = MockTransportService.createNewService(Settings.EMPTY, version, threadPool, null); + try { + newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ClusterSearchShardsRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], + knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); + }); + newService.registerRequestHandler(ClusterStateAction.NAME, ClusterStateRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (DiscoveryNode node : knownNodes) { + builder.add(node); + } + ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(builder.build()).build(); + channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build)); + }); + newService.start(); + newService.acceptIncomingRequests(); + success = true; + return newService; + } finally { + if (success == false) { + newService.close(); + } + } + } + + public void testDiscoverSingleNode() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } + + public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService incomaptibleTransport = startTransport("incompat_seed_node", knownNodes, Version.fromString("2.0.0")); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + DiscoveryNode incompatibleSeedNode = incomaptibleTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + knownNodes.add(incomaptibleTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + List seedNodes = Arrays.asList(incompatibleSeedNode, seedNode); + Collections.shuffle(seedNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + seedNodes, service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, seedNodes); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertFalse(service.nodeConnected(incompatibleSeedNode)); + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } + + public void testNodeDisconnected() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT); + MockTransportService spareTransport = startTransport("spare_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + DiscoveryNode spareNode = spareTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertFalse(service.nodeConnected(spareNode)); + knownNodes.add(spareNode); + CountDownLatch latchDisconnect = new CountDownLatch(1); + CountDownLatch latchConnected = new CountDownLatch(1); + service.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (node.equals(discoverableNode)) { + latchDisconnect.countDown(); + } + } + + @Override + public void onNodeConnected(DiscoveryNode node) { + if (node.equals(spareNode)) { + latchConnected.countDown(); + } + } + }); + + discoverableTransport.close(); + // now make sure we try to connect again to other nodes once we got disconnected + assertTrue(latchDisconnect.await(10, TimeUnit.SECONDS)); + assertTrue(latchConnected.await(10, TimeUnit.SECONDS)); + assertTrue(service.nodeConnected(spareNode)); + } + } + } + } + + public void testFilterDiscoveredNodes() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + DiscoveryNode rejectedNode = randomBoolean() ? seedNode : discoverableNode; + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + if (rejectedNode.equals(seedNode)) { + assertFalse(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + } else { + assertTrue(service.nodeConnected(seedNode)); + assertFalse(service.nodeConnected(discoverableNode)); + } + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } + + private void updateSeedNodes(RemoteClusterConnection connection, List seedNodes) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionAtomicReference = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { + exceptionAtomicReference.set(x); + latch.countDown(); + }); + connection.updateSeedNodes(seedNodes, listener); + latch.await(); + if (exceptionAtomicReference.get() != null) { + throw exceptionAtomicReference.get(); + } + } + + public void testConnectWithIncompatibleTransports() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.fromString("2.0.0"))) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(seedNode))); + assertFalse(service.nodeConnected(seedNode)); + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } + + @SuppressForbidden(reason = "calls getLocalHost here but it's fine in this case") + public void testSlowNodeCanBeCanceled() throws IOException, InterruptedException { + try (ServerSocket socket = new MockServerSocket()) { + socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1); + socket.setReuseAddress(true); + DiscoveryNode seedNode = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(), + socket.getLocalPort()), emptyMap(), + emptySet(), Version.CURRENT); + CountDownLatch acceptedLatch = new CountDownLatch(1); + CountDownLatch closeRemote = new CountDownLatch(1); + Thread t = new Thread() { + @Override + public void run() { + try (Socket accept = socket.accept()) { + acceptedLatch.countDown(); + closeRemote.await(); + } catch (IOException e) { + // that's fine we might close + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }; + t.start(); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + CountDownLatch listenerCalled = new CountDownLatch(1); + AtomicReference exceptionReference = new AtomicReference<>(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + ActionListener listener = ActionListener.wrap(x -> { + listenerCalled.countDown(); + fail("expected exception"); + }, x -> { + exceptionReference.set(x); + listenerCalled.countDown(); + }); + connection.updateSeedNodes(Arrays.asList(seedNode), listener); + acceptedLatch.await(); + connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on + assertTrue(connection.assertNoRunningConnections()); + } + closeRemote.countDown(); + listenerCalled.await(); + assertNotNull(exceptionReference.get()); + expectThrows(CancellableThreads.ExecutionCancelledException.class, () -> {throw exceptionReference.get();}); + + } + } + } + + public void testFetchShards() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + if (randomBoolean()) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + } + + SearchRequest request = new SearchRequest("test-index"); + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + AtomicReference failReference = new AtomicReference<>(); + ActionListener shardsListener = ActionListener.wrap( + x -> { + reference.set(x); + responseLatch.countDown(); + }, + x -> { + failReference.set(x); + responseLatch.countDown(); + }); + connection.fetchSearchShards(request, Arrays.asList("test-index"), shardsListener); + responseLatch.await(); + assertNull(failReference.get()); + assertNotNull(reference.get()); + ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); + assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } + + public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService seedTransport1 = startTransport("seed_node_1", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + DiscoveryNode seedNode1 = seedTransport1.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + knownNodes.add(seedTransport1.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + List seedNodes = Arrays.asList(seedNode1, seedNode); + Collections.shuffle(seedNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + seedNodes, service, Integer.MAX_VALUE, n -> true)) { + int numThreads = randomIntBetween(4, 10); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + for (int i = 0; i < threads.length; i++) { + final int numConnectionAttempts = randomIntBetween(10, 200); + threads[i] = new Thread() { + @Override + public void run() { + try { + barrier.await(); + CountDownLatch latch = new CountDownLatch(numConnectionAttempts); + for (int i = 0; i < numConnectionAttempts; i++) { + AtomicBoolean executed = new AtomicBoolean(false); + ActionListener listener = ActionListener.wrap(x -> { + assertTrue(executed.compareAndSet(false, true)); + latch.countDown();}, x -> { + assertTrue(executed.compareAndSet(false, true)); + latch.countDown(); + if (x instanceof RejectedExecutionException) { + // that's fine + } else { + throw new AssertionError(x); + } + }); + connection.updateSeedNodes(seedNodes, listener); + } + latch.await(); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }; + threads[i].start(); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(service.nodeConnected(seedNode1)); + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } + + public void testCloseWhileConcurrentlyConnecting() throws IOException, InterruptedException, BrokenBarrierException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService seedTransport1 = startTransport("seed_node_1", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode seedNode1 = seedTransport1.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + knownNodes.add(seedTransport1.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + List seedNodes = Arrays.asList(seedNode1, seedNode); + Collections.shuffle(seedNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + seedNodes, service, Integer.MAX_VALUE, n -> true)) { + int numThreads = randomIntBetween(4, 10); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < threads.length; i++) { + final int numConnectionAttempts = randomIntBetween(10, 100); + threads[i] = new Thread() { + @Override + public void run() { + try { + barrier.await(); + CountDownLatch latch = new CountDownLatch(numConnectionAttempts); + for (int i = 0; i < numConnectionAttempts; i++) { + AtomicReference executed = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap( + x -> { + if (executed.compareAndSet(null, new RuntimeException())) { + latch.countDown(); + } else { + throw new AssertionError("shit's been called twice", executed.get()); + } + }, + x -> { + if (executed.compareAndSet(null, new RuntimeException())) { + latch.countDown(); + } else { + throw new AssertionError("shit's been called twice", executed.get()); + } + if (x instanceof RejectedExecutionException || x instanceof AlreadyClosedException + || x instanceof CancellableThreads.ExecutionCancelledException) { + // that's fine + } else { + throw new AssertionError(x); + } + }); + connection.updateSeedNodes(seedNodes, listener); + } + latch.await(); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }; + threads[i].start(); + } + barrier.await(); + connection.close(); + } + } + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java new file mode 100644 index 00000000000..1531d66e5da --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java @@ -0,0 +1,250 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +public class RemoteClusterServiceTests extends ESTestCase { + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, List knownNodes, Version version) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool); + } + + public void testSettingsAreRegistered() { + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTERS_SEEDS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); + } + + public void testRemoteClusterSeedSetting() { + // simple validation + Settings settings = Settings.builder() + .put("search.remote.foo.seeds", "192.168.0.1:8080") + .put("search.remote.bar.seed", "[::1]:9090").build(); + RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings)); + + Settings brokenSettings = Settings.builder() + .put("search.remote.foo.seeds", "192.168.0.1").build(); + expectThrows(IllegalArgumentException.class, () -> + RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings))); + } + + public void testBuiltRemoteClustersSeeds() throws Exception { + Map> map = RemoteClusterService.buildRemoteClustersSeeds( + Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build()); + assertEquals(2, map.size()); + assertTrue(map.containsKey("foo")); + assertTrue(map.containsKey("bar")); + assertEquals(1, map.get("foo").size()); + assertEquals(1, map.get("bar").size()); + + DiscoveryNode foo = map.get("foo").get(0); + + assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080))); + assertEquals(foo.getId(), "foo#192.168.0.1:8080"); + assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); + + DiscoveryNode bar = map.get("bar").get(0); + assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090))); + assertEquals(bar.getId(), "bar#[::1]:9090"); + assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); + } + + + public void testGroupClusterIndices() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString()); + builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + assertFalse(service.isRemoteClusterRegistered("foo")); + Map> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", + "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> false); + String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, + k -> Collections.emptyList()).toArray(new String[0]); + assertNotNull(perClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY)); + assertArrayEquals(new String[]{"foo:bar", "foo"}, localIndices); + assertEquals(2, perClusterIndices.size()); + assertEquals(Arrays.asList("bar", "test"), perClusterIndices.get("cluster_1")); + assertEquals(Arrays.asList("foo:bar", "foo*"), perClusterIndices.get("cluster_2")); + + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> + service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", + "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> "cluster_1:bar".equals(i))); + + assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" + + " cluster_1", iae.getMessage()); + } + } + } + } + + public void testIncrementallyAddClusters() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString()); + builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address())); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address())); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + service.updateRemoteCluster("cluster_2", Collections.emptyList()); + assertFalse(service.isRemoteClusterRegistered("cluster_2")); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, + () -> service.updateRemoteCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList())); + assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); + } + } + } + } + + public void testProcessRemoteShards() throws IOException { + try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, null)) { + assertFalse(service.isCrossClusterSearchEnabled()); + List iteratorList = new ArrayList<>(); + Map searchShardsResponseMap = new HashMap<>(); + DiscoveryNode[] nodes = new DiscoveryNode[] { + new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), + new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT) + }; + Map indicesAndAliases = new HashMap<>(); + indicesAndAliases.put("foo", new AliasFilter(new TermsQueryBuilder("foo", "bar"), Strings.EMPTY_ARRAY)); + indicesAndAliases.put("bar", new AliasFilter(new MatchAllQueryBuilder(), Strings.EMPTY_ARRAY)); + ClusterSearchShardsGroup[] groups = new ClusterSearchShardsGroup[] { + new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 0), + new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED), + TestShardRouting.newShardRouting("foo", 0, "node2", false, ShardRoutingState.STARTED)}), + new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 1), + new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED), + TestShardRouting.newShardRouting("foo", 1, "node2", false, ShardRoutingState.STARTED)}), + new ClusterSearchShardsGroup(new ShardId("bar", "bar_id", 0), + new ShardRouting[] {TestShardRouting.newShardRouting("bar", 0, "node2", true, ShardRoutingState.STARTED), + TestShardRouting.newShardRouting("bar", 0, "node1", false, ShardRoutingState.STARTED)}) + }; + searchShardsResponseMap.put("test_cluster_1", new ClusterSearchShardsResponse(groups, nodes, indicesAndAliases)); + Map remoteAliases = new HashMap<>(); + service.processRemoteShards(searchShardsResponseMap, iteratorList, remoteAliases); + assertEquals(3, iteratorList.size()); + for (ShardIterator iterator : iteratorList) { + if (iterator.shardId().getIndexName().endsWith("foo")) { + assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1); + assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName()); + ShardRouting shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "foo"); + shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "foo"); + assertNull(iterator.nextOrNull()); + } else { + assertEquals(0, iterator.shardId().getId()); + assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName()); + ShardRouting shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "bar"); + shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "bar"); + assertNull(iterator.nextOrNull()); + } + } + assertEquals(2, remoteAliases.size()); + assertTrue(remoteAliases.toString(), remoteAliases.containsKey("foo_id")); + assertTrue(remoteAliases.toString(), remoteAliases.containsKey("bar_id")); + assertEquals(new TermsQueryBuilder("foo", "bar"), remoteAliases.get("foo_id").getQueryBuilder()); + assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder()); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 86a8a8fa7c3..11428b51709 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -38,6 +39,10 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; import java.io.IOException; import java.util.ArrayList; @@ -76,26 +81,30 @@ public class SearchAsyncActionTests extends ESTestCase { AtomicInteger contextIdGenerator = new AtomicInteger(0); GroupShardsIterator shardsIter = getShardsIter("idx", randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode); AtomicInteger numFreedContext = new AtomicInteger(); - SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + Collections.singleton(RemoteClusterService.REMOTE_CLUSTERS_SEEDS)), null) { @Override - public void sendFreeContext(DiscoveryNode node, long contextId, SearchRequest request) { + public void sendFreeContext(Transport.Connection connection, long contextId, SearchRequest request) { numFreedContext.incrementAndGet(); - assertTrue(nodeToContextMap.containsKey(node)); - assertTrue(nodeToContextMap.get(node).remove(contextId)); + assertTrue(nodeToContextMap.containsKey(connection.getNode())); + assertTrue(nodeToContextMap.get(connection.getNode()).remove(contextId)); } }; - Map lookup = new HashMap<>(); - lookup.put(primaryNode.getId(), primaryNode); + Map lookup = new HashMap<>(); + lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); + lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction(logger, transportService, lookup::get, aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) { TestSearchResponse response = new TestSearchResponse(); @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { + protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request, + ActionListener listener) { assertTrue("shard: " + request.shardId() + " has been queried twice", response.queried.add(request.shardId())); - TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), node); - Set ids = nodeToContextMap.computeIfAbsent(node, (n) -> new HashSet<>()); + TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), + connection.getNode()); + Set ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>()); ids.add(testSearchPhaseResult.id); if (randomBoolean()) { listener.onResponse(testSearchPhaseResult); @@ -109,7 +118,7 @@ public class SearchAsyncActionTests extends ESTestCase { for (int i = 0; i < firstResults.length(); i++) { TestSearchPhaseResult result = firstResults.get(i); assertEquals(result.node.getId(), result.shardTarget().getNodeId()); - sendReleaseSearchContext(result.id(), result.node); + sendReleaseSearchContext(result.id(), new MockConnection(result.node)); } responseListener.onResponse(response); latch.countDown(); @@ -130,10 +139,13 @@ public class SearchAsyncActionTests extends ESTestCase { latch.await(); assertNotNull(response.get()); assertFalse(nodeToContextMap.isEmpty()); - assertTrue(nodeToContextMap.containsKey(primaryNode)); + assertTrue(nodeToContextMap.toString(), nodeToContextMap.containsKey(primaryNode) || nodeToContextMap.containsKey(replicaNode)); assertEquals(shardsIter.size(), numFreedContext.get()); - assertTrue(nodeToContextMap.get(primaryNode).toString(), nodeToContextMap.get(primaryNode).isEmpty()); - + if (nodeToContextMap.containsKey(primaryNode)) { + assertTrue(nodeToContextMap.get(primaryNode).toString(), nodeToContextMap.get(primaryNode).isEmpty()); + } else { + assertTrue(nodeToContextMap.get(replicaNode).toString(), nodeToContextMap.get(replicaNode).isEmpty()); + } } private GroupShardsIterator getShardsIter(String index, int numShards, boolean doReplicas, DiscoveryNode primaryNode, @@ -211,4 +223,29 @@ public class SearchAsyncActionTests extends ESTestCase { } } + + public final class MockConnection implements Transport.Connection { + + private final DiscoveryNode node; + + public MockConnection(DiscoveryNode node) { + this.node = node; + } + + @Override + public DiscoveryNode getNode() { + return node; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException(); + } + } } diff --git a/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java b/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java index 4ce23ebcaf0..3789ea40459 100644 --- a/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java +++ b/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -475,6 +477,33 @@ public class SettingTests extends ESTestCase { assertFalse(listAffixSetting.match("foo")); } + public void testGetAllConcreteSettings() { + Setting.AffixSetting> listAffixSetting = Setting.affixKeySetting("foo.", "bar", + (key) -> Setting.listSetting(key, Collections.emptyList(), Function.identity(), Property.NodeScope)); + + Settings settings = Settings.builder() + .putArray("foo.1.bar", "1", "2") + .putArray("foo.2.bar", "3", "4", "5") + .putArray("foo.bar", "6") + .putArray("some.other", "6") + .putArray("foo.3.bar", "6") + .build(); + Stream>> allConcreteSettings = listAffixSetting.getAllConcreteSettings(settings); + Map> collect = allConcreteSettings.collect(Collectors.toMap(Setting::getKey, (s) -> s.get(settings))); + assertEquals(3, collect.size()); + assertEquals(Arrays.asList("1", "2"), collect.get("foo.1.bar")); + assertEquals(Arrays.asList("3", "4", "5"), collect.get("foo.2.bar")); + assertEquals(Arrays.asList("6"), collect.get("foo.3.bar")); + } + + public void testAffixSettingsFailOnGet() { + Setting.AffixSetting> listAffixSetting = Setting.affixKeySetting("foo.", "bar", + (key) -> Setting.listSetting(key, Collections.singletonList("testelement"), Function.identity(), Property.NodeScope)); + expectThrows(UnsupportedOperationException.class, () -> listAffixSetting.get(Settings.EMPTY)); + expectThrows(UnsupportedOperationException.class, () -> listAffixSetting.getRaw(Settings.EMPTY)); + assertEquals(Collections.singletonList("testelement"), listAffixSetting.getDefault(Settings.EMPTY)); + assertEquals("[\"testelement\"]", listAffixSetting.getDefaultRaw(Settings.EMPTY)); + } public void testMinMaxInt() { Setting integerSetting = Setting.intSetting("foo.bar", 1, 0, 10, Property.NodeScope); diff --git a/core/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java b/core/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java index 95978991046..099cce038c1 100644 --- a/core/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java +++ b/core/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java @@ -52,6 +52,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa import static org.hamcrest.Matchers.containsString; public class SimpleSearchIT extends ESIntegTestCase { + public void testSearchNullIndex() { expectThrows(NullPointerException.class, () -> client().prepareSearch((String) null).setQuery(QueryBuilders.termQuery("_id", "XXX1")).get()); @@ -417,7 +418,8 @@ public class SimpleSearchIT extends ESIntegTestCase { client().prepareSearch("idx").setQuery(QueryBuilders.regexpQuery("num", "34")).get(); fail("SearchPhaseExecutionException should have been thrown"); } catch (SearchPhaseExecutionException ex) { - assertThat(ex.getCause().getCause().getMessage(), containsString("Can only use regexp queries on keyword and text fields")); + assertThat(ex.getRootCause().getMessage(), + containsString("Can only use regexp queries on keyword and text fields")); } } diff --git a/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java new file mode 100644 index 00000000000..68dffa1ded9 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -0,0 +1,252 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; + +public class TransportActionProxyTests extends ESTestCase { + protected ThreadPool threadPool; + // we use always a non-alpha or beta version here otherwise minimumCompatibilityVersion will be different for the two used versions + private static final Version CURRENT_VERSION = Version.fromString(String.valueOf(Version.CURRENT.major) + ".0.0"); + protected static final Version version0 = CURRENT_VERSION.minimumCompatibilityVersion(); + + protected DiscoveryNode nodeA; + protected MockTransportService serviceA; + + protected static final Version version1 = Version.fromId(CURRENT_VERSION.id + 1); + protected DiscoveryNode nodeB; + protected MockTransportService serviceB; + + protected DiscoveryNode nodeC; + protected MockTransportService serviceC; + + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + serviceA = buildService(version0); // this one supports dynamic tracer updates + nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); + serviceB = buildService(version1); // this one doesn't support dynamic tracer updates + nodeB = new DiscoveryNode("TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1); + serviceC = buildService(version1); // this one doesn't support dynamic tracer updates + nodeC = new DiscoveryNode("TS_C", serviceC.boundAddress().publishAddress(), emptyMap(), emptySet(), version1); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + IOUtils.close(serviceA, serviceB, serviceC, () -> { + try { + terminate(threadPool); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + private MockTransportService buildService(final Version version) { + MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, version, threadPool, null); + service.start(); + service.acceptIncomingRequests(); + return service; + + } + + + public void testSendMessage() throws InterruptedException { + serviceA.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + assertEquals(request.sourceNode, "TS_A"); + SimpleTestResponse response = new SimpleTestResponse(); + response.targetNode = "TS_A"; + channel.sendResponse(response); + }); + TransportActionProxy.registerProxyAction(serviceA, "/test", SimpleTestResponse::new); + serviceA.connectToNode(nodeB); + + serviceB.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + assertEquals(request.sourceNode, "TS_A"); + SimpleTestResponse response = new SimpleTestResponse(); + response.targetNode = "TS_B"; + channel.sendResponse(response); + }); + TransportActionProxy.registerProxyAction(serviceB, "/test", SimpleTestResponse::new); + serviceB.connectToNode(nodeC); + serviceC.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + assertEquals(request.sourceNode, "TS_A"); + SimpleTestResponse response = new SimpleTestResponse(); + response.targetNode = "TS_C"; + channel.sendResponse(response); + }); + TransportActionProxy.registerProxyAction(serviceC, "/test", SimpleTestResponse::new); + + CountDownLatch latch = new CountDownLatch(1); + serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("/test"), TransportActionProxy.wrapRequest(nodeC, + new SimpleTestRequest("TS_A")), new TransportResponseHandler() { + @Override + public SimpleTestResponse newInstance() { + return new SimpleTestResponse(); + } + + @Override + public void handleResponse(SimpleTestResponse response) { + try { + assertEquals("TS_C", response.targetNode); + } finally { + latch.countDown(); + } + } + + @Override + public void handleException(TransportException exp) { + try { + throw new AssertionError(exp); + } finally { + latch.countDown(); + } + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + latch.await(); + } + + public void testException() throws InterruptedException { + serviceA.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + assertEquals(request.sourceNode, "TS_A"); + SimpleTestResponse response = new SimpleTestResponse(); + response.targetNode = "TS_A"; + channel.sendResponse(response); + }); + TransportActionProxy.registerProxyAction(serviceA, "/test", SimpleTestResponse::new); + serviceA.connectToNode(nodeB); + + serviceB.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + assertEquals(request.sourceNode, "TS_A"); + SimpleTestResponse response = new SimpleTestResponse(); + response.targetNode = "TS_B"; + channel.sendResponse(response); + }); + TransportActionProxy.registerProxyAction(serviceB, "/test", SimpleTestResponse::new); + serviceB.connectToNode(nodeC); + serviceC.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + throw new ElasticsearchException("greetings from TS_C"); + }); + TransportActionProxy.registerProxyAction(serviceC, "/test", SimpleTestResponse::new); + + CountDownLatch latch = new CountDownLatch(1); + serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("/test"), TransportActionProxy.wrapRequest(nodeC, + new SimpleTestRequest("TS_A")), new TransportResponseHandler() { + @Override + public SimpleTestResponse newInstance() { + return new SimpleTestResponse(); + } + + @Override + public void handleResponse(SimpleTestResponse response) { + try { + fail("expected exception"); + } finally { + latch.countDown(); + } + } + + @Override + public void handleException(TransportException exp) { + try { + Throwable cause = ExceptionsHelper.unwrapCause(exp); + assertEquals("greetings from TS_C", cause.getMessage()); + } finally { + latch.countDown(); + } + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + latch.await(); + } + + public static class SimpleTestRequest extends TransportRequest { + String sourceNode; + + public SimpleTestRequest(String sourceNode) { + this.sourceNode = sourceNode; + } + public SimpleTestRequest() {} + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + sourceNode = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sourceNode); + } + } + + public static class SimpleTestResponse extends TransportResponse { + String targetNode; + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + targetNode = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(targetNode); + } + } + +} diff --git a/docs/build.gradle b/docs/build.gradle index 5347e529fa3..798bce6a0a6 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -145,6 +145,7 @@ buildRestTests.expectedUnconvertedCandidates = [ 'reference/modules/scripting/security.asciidoc', 'reference/modules/scripting/using.asciidoc', 'reference/modules/transport.asciidoc', + 'reference/modules/cross-cluster-search.asciidoc', // this is hart to test since we need 2 clusters -- maybe we can trick it into referencing itself... 'reference/query-dsl/exists-query.asciidoc', 'reference/query-dsl/function-score-query.asciidoc', 'reference/query-dsl/geo-shape-query.asciidoc', diff --git a/docs/reference/modules.asciidoc b/docs/reference/modules.asciidoc index 2ab1232d3b5..2699f425c99 100644 --- a/docs/reference/modules.asciidoc +++ b/docs/reference/modules.asciidoc @@ -77,6 +77,11 @@ The modules in this section are: A tribe node joins one or more clusters and acts as a federated client across them. + +<>:: + + Cross cluster search enables executing search requests across more than one cluster without joining them and acts + as a federated client across them. -- diff --git a/docs/reference/modules/cross-cluster-search.asciidoc b/docs/reference/modules/cross-cluster-search.asciidoc new file mode 100644 index 00000000000..7bc3cbf388e --- /dev/null +++ b/docs/reference/modules/cross-cluster-search.asciidoc @@ -0,0 +1,117 @@ +[[modules-cross-cluster-search]] +== Cross cluster search + +The _cross cluster search_ feature allows any node to act as a federated client across +multiple clusters. In contrast to the _tribe_ feature, a _cross cluster search_ node won't +join the remote cluster, instead it connects to a remote cluster in a light fashion in order to execute +federated search requests. + +The _cross cluster search_ feature works by configuring a remote cluster in the cluster state and connects only to a +limited number of nodes in the remote cluster. Each remote cluster is referenced by a name and a list of seed nodes. +Those seed nodes are used to discover other nodes eligible as so-called _gateway nodes_. Each node in a cluster that +has remote clusters configured connects to one or more _gateway nodes_ and uses them to federate search requests to +the remote cluster. + +Remote clusters can either be configured as part of the `elasticsearch.yml` file or be dynamically updated via +the <>. If a remote cluster is configured via `elasticsearch.yml` only +the nodes with the configuration set will be connecting to the remote cluster in which case federated search requests +will have to be sent specifically to those nodes. Remote clusters set via the +<> will be available on every node in the cluster. + +The `elasticsearch.yml` config file for a _cross cluster search_ node just needs to list the +remote clusters that should be connected to, for instance: + +[source,yaml] +-------------------------------- +search: + remote: + cluster_one: <1> + seeds: 127.0.0.1:9300 + cluster_two: <1> + seeds: 127.0.0.1:9301 + +-------------------------------- +<1> `cluster_one` and `cluster_two` are arbitrary cluster aliases representing the connection to each cluster. +These names are subsequently used to distinguish between local and remote indices. + +[float] +=== Using cross cluster search + +To search the `twitter` index on remote cluster `cluster_1` the index name must be prefixed with the cluster alias +separated by a `:` character: + +[source,js] +-------------------------------------------------- +POST /cluster_one:twitter/tweet/_search +{ + "match_all": {} +} +-------------------------------------------------- + +In contrast to the `tribe` feature cross cluster search can also search indices with the same name on different +clusters: + +[source,js] +-------------------------------------------------- +POST /cluster_one:twitter,twitter/tweet/_search +{ + "match_all": {} +} +-------------------------------------------------- + +Search results are disambiguated the same way as the indices are disambiguated in the request. Even if index names are +identical these indices will be treated as different indices when results are merged. All results retrieved from a +remote index +will be prefixed with their remote cluster name: + +[source,js] +-------------------------------------------------- + { + "took" : 89, + "timed_out" : false, + "_shards" : { + "total" : 10, + "successful" : 10, + "failed" : 0 + }, + "hits" : { + "total" : 2, + "max_score" : 1.0, + "hits" : [ + { + "_index" : "cluster_one:twitter", + "_type" : "tweet", + "_id" : "1", + "_score" : 1.0, + "_source" : { + "user" : "kimchy", + "postDate" : "2009-11-15T14:12:12", + "message" : "trying out Elasticsearch" + } + }, + { + "_index" : "twitter", + "_type" : "tweet", + "_id" : "1", + "_score" : 1.0, + "_source" : { + "user" : "kimchy", + "postDate" : "2009-11-15T14:12:12", + "message" : "trying out Elasticsearch" + } + } + ] + } +} +-------------------------------------------------- + +[float] +=== Cross cluster search settings + +* `search.remote.connections_per_cluster` - the number of nodes to connect to per remote cluster. The default is `3` +* `search.remote.initial_connect_timeout` - the time to wait for remote connections to be established when the node +starts. The default is `30s`. +* `search.remote.node.attr` - a node attribute to filter out nodes that are eligible as a gateway node in the +remote cluster. For instance a node can have a node attribute `node.attr.gateway: true` such that only nodes with this + attribute will be connected to if `search.remote.node.attr` is set to `gateway` + diff --git a/qa/multi-cluster-search/build.gradle b/qa/multi-cluster-search/build.gradle new file mode 100644 index 00000000000..b48cd9a3dd1 --- /dev/null +++ b/qa/multi-cluster-search/build.gradle @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +task remoteClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) + cluster { + distribution = 'zip' + numNodes = 2 + clusterName = 'remote-cluster' + } + systemProperty 'tests.rest.suite', 'remote_cluster' +} + +task mixedClusterTest(type: RestIntegTestTask) { + dependsOn(remoteClusterTest) + cluster { + distribution = 'zip' + setting 'search.remote.my_remote_cluster.seeds', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\"" + setting 'search.remote.connections_per_cluster', 1 + + } + systemProperty 'tests.rest.suite', 'multi_cluster' + finalizedBy 'remoteClusterTest#node0.stop','remoteClusterTest#node1.stop' +} + +task integTest { + dependsOn = [mixedClusterTest] +} + +test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test + +check.dependsOn(integTest) diff --git a/qa/multi-cluster-search/src/test/java/org/elasticsearch/upgrades/MultiClusterSearchYamlTestSuiteIT.java b/qa/multi-cluster-search/src/test/java/org/elasticsearch/upgrades/MultiClusterSearchYamlTestSuiteIT.java new file mode 100644 index 00000000000..e8c3592e214 --- /dev/null +++ b/qa/multi-cluster-search/src/test/java/org/elasticsearch/upgrades/MultiClusterSearchYamlTestSuiteIT.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.upgrades; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; +import org.apache.lucene.util.TimeUnits; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; + +import java.io.IOException; + +@TimeoutSuite(millis = 5 * TimeUnits.MINUTE) // to account for slow as hell VMs +public class MultiClusterSearchYamlTestSuiteIT extends ESClientYamlSuiteTestCase { + + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + public MultiClusterSearchYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws IOException { + return createParameters(); + } +} + diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml new file mode 100644 index 00000000000..31b8fbd251e --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml @@ -0,0 +1,131 @@ +--- +"Index data and search on the mixed cluster": + + - do: + indices.create: + index: test_index + body: + settings: + index: + number_of_shards: 2 + number_of_replicas: 0 + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "local_cluster", "filter_field": 0}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "local_cluster", "filter_field": 1}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "local_cluster", "filter_field": 0}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "local_cluster", "filter_field": 1}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "local_cluster", "filter_field": 0}' + + - do: + search: + index: test_index,my_remote_cluster:test_index + body: + aggs: + cluster: + terms: + field: f1.keyword + + - match: { _shards.total: 5 } + - match: { hits.total: 11 } + - length: { aggregations.cluster.buckets: 2 } + - match: { aggregations.cluster.buckets.0.key: "remote_cluster" } + - match: { aggregations.cluster.buckets.0.doc_count: 6 } + - match: { aggregations.cluster.buckets.1.key: "local_cluster" } + - match: { aggregations.cluster.buckets.1.doc_count: 5 } + + - do: + search: + index: test_index,my_remote_cluster:test_index + body: + query: + term: + f1: remote_cluster + aggs: + cluster: + terms: + field: f1.keyword + + - match: { _shards.total: 5 } + - match: { hits.total: 6} + - match: { hits.hits.0._index: "my_remote_cluster:test_index"} + - length: { aggregations.cluster.buckets: 1 } + - match: { aggregations.cluster.buckets.0.key: "remote_cluster" } + - match: { aggregations.cluster.buckets.0.doc_count: 6 } + + - do: + search: + index: my_remote_cluster:test_index + body: + aggs: + cluster: + terms: + field: f1.keyword + + - match: { _shards.total: 3 } + - match: { hits.total: 6} + - match: { hits.hits.0._index: "my_remote_cluster:test_index"} + - length: { aggregations.cluster.buckets: 1 } + - match: { aggregations.cluster.buckets.0.key: "remote_cluster" } + - match: { aggregations.cluster.buckets.0.doc_count: 6 } + + - do: + search: + index: test_index + body: + aggs: + cluster: + terms: + field: f1.keyword + + - match: { _shards.total: 2 } + - match: { hits.total: 5} + - match: { hits.hits.0._index: "test_index"} + - length: { aggregations.cluster.buckets: 1 } + - match: { aggregations.cluster.buckets.0.key: "local_cluster" } + - match: { aggregations.cluster.buckets.0.doc_count: 5 } + +--- +"Add transient remote cluster based on the preset cluster": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + search.remote.test_remote_cluster.seeds: $remote_ip + + - match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}} + + - do: + search: + index: test_remote_cluster:test_index + + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Search an filtered alias on the remote cluster": + + - do: + search: + index: my_remote_cluster:aliased_test_index + + - match: { _shards.total: 3 } + - match: { hits.total: 2 } + - match: { hits.hits.0._source.filter_field: 1 } + - match: { hits.hits.0._index: "my_remote_cluster:test_index" } diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yaml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yaml new file mode 100644 index 00000000000..de4ae736f94 --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yaml @@ -0,0 +1,60 @@ +--- +"Index data and search on the old cluster": + + - do: + indices.create: + index: test_index + body: + settings: + index: + number_of_shards: 3 + number_of_replicas: 0 + aliases: + aliased_test_index: # we use this alias in the multi cluster test to very filtered aliases work + filter: + term: + filter_field : 1 + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "remote_cluster", "filter_field": 0}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "remote_cluster", "filter_field": 1}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "remote_cluster", "filter_field": 0}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "remote_cluster", "filter_field": 1}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "remote_cluster", "filter_field": 0}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "remote_cluster", "filter_field": 0}' + + + - do: + search: + index: test_index + body: + aggs: + cluster: + terms: + field: f1.keyword + + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - length: { aggregations.cluster.buckets: 1 } + - match: { aggregations.cluster.buckets.0.key: "remote_cluster" } + - match: { aggregations.cluster.buckets.0.doc_count: 6 } + + - do: + search: + index: aliased_test_index + + - match: { _shards.total: 3 } + - match: { hits.total: 2 } + - match: { hits.hits.0._source.filter_field: 1 } + - match: { hits.hits.0._index: "test_index" } + + diff --git a/settings.gradle b/settings.gradle index 1125e84325c..61443fb987b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -56,6 +56,7 @@ List projects = [ 'qa:evil-tests', 'qa:no-bootstrap-tests', 'qa:rolling-upgrade', + 'qa:multi-cluster-search', 'qa:smoke-test-client', 'qa:smoke-test-http', 'qa:smoke-test-ingest-with-all-dependencies', diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 60af73abe9b..5a03035e457 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -763,4 +763,8 @@ public final class MockTransportService extends TransportService { assert openConnections.size() == 0 : "still open connections: " + openConnections; } } + + public DiscoveryNode getLocalDiscoNode() { + return this.getLocalNode(); + } }