Add federated cross cluster search capabilities (#22502)

Today Elasticsearch can act as a tribe node that fully joins another cluster to support operations across all different clusters. While tribe is a popular feature it also has it's downsides like:
 * non-trivial testing
 * special configuration
 * receives a non trivial amount of cluster-state updates
 * maintains connections to all nodes in all clusters and vice versa
 * has to be restarted to join another cluster
 * needs special strategies to disambiguate indices with identical names

That said, from a functionality standpoint the only feature in elasticsearch that needs cross cluster communication is in-fact the search layer. Everything else can be done on the client side by communicating with each cluster individually. For `_search` the merge of aggregations etc. is non trivial and can't be done on the client side. 

The feature added in this PR is called `cross cluster search` and allows any node to act as a federated search node without joining any of the other clusters. There are 2 basic modes of operations: 

 * globally configured remote clusters via cluster state settings
 * locally configured remote clusters via elasticsearch.yaml

A node with such a configuration will `connect` to the remote cluster and discover a set of nodes that it can communicate with for federated search (Default # of nodes is 3). It won't connect to all nodes but only eligible nodes in the remote cluster (depending on their version and optionally on a node attribute (`node.addr`).

Remote clusters can be configured and updated at any time via cluster settings without the need of a node-restart.  Each remote cluster specified via a `name` -> `seed node IP list` ie:

 * `search.remote.my_cluster.seeds: 127.0.0.1:9300, 127.0.0.1:9301`

indices of this cluster can then be addressed via the clusters alias: `GET index_1,my_cluster:index_1, my_cluster:other*/_search` 


Closes #21473
This commit is contained in:
Simon Willnauer 2017-01-18 09:28:04 +01:00 committed by GitHub
commit 6f7caed148
35 changed files with 3077 additions and 127 deletions

View File

@ -19,8 +19,11 @@
package org.elasticsearch.action;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
@ -65,4 +68,41 @@ public interface ActionListener<Response> {
}
};
}
/**
* Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception
* the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining
* listeners will be processed and the caught exception will be re-thrown.
*/
static <Response> void onResponse(Iterable<ActionListener<Response>> listeners, Response response) {
List<Exception> exceptionList = new ArrayList<>();
for (ActionListener<Response> listener : listeners) {
try {
listener.onResponse(response);
} catch (Exception ex) {
try {
listener.onFailure(ex);
} catch (Exception ex1) {
exceptionList.add(ex1);
}
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
}
/**
* Notifies every given listener with the failure passed to {@link #onFailure(Exception)}. If a listener itself throws an exception
* all remaining listeners will be processed and the caught exception will be re-thrown.
*/
static <Response> void onFailure(Iterable<ActionListener<Response>> listeners, Exception failure) {
List<Exception> exceptionList = new ArrayList<>();
for (ActionListener<Response> listener : listeners) {
try {
listener.onFailure(failure);
} catch (Exception ex) {
exceptionList.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
}
}

View File

@ -38,7 +38,7 @@ public class ClusterSearchShardsGroup implements Streamable, ToXContent {
}
ClusterSearchShardsGroup(ShardId shardId, ShardRouting[] shards) {
public ClusterSearchShardsGroup(ShardId shardId, ShardRouting[] shards) {
this.shardId = shardId;
this.shards = shards;
}

View File

@ -42,7 +42,8 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
}
ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, Map<String, AliasFilter> indicesAndFilters) {
public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes,
Map<String, AliasFilter> indicesAndFilters) {
this.groups = groups;
this.nodes = nodes;
this.indicesAndFilters = indicesAndFilters;

View File

@ -41,6 +41,8 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import java.util.List;
import java.util.Map;
@ -59,7 +61,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
private final GroupShardsIterator shardsIts;
protected final SearchRequest request;
/** Used by subclasses to resolve node ids to DiscoveryNodes. **/
protected final Function<String, DiscoveryNode> nodeIdToDiscoveryNode;
protected final Function<String, Transport.Connection> nodeIdToConnection;
protected final SearchTask task;
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;
@ -74,7 +76,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected volatile ScoreDoc[] sortedShardDocs;
protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) {
@ -85,7 +87,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
this.request = request;
this.task = task;
this.listener = listener;
this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode;
this.nodeIdToConnection = nodeIdToConnection;
this.clusterStateVersion = clusterStateVersion;
this.shardsIts = shardsIts;
expectedSuccessfulOps = shardsIts.size();
@ -119,30 +121,34 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// TODO upgrade this to an assert...
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final DiscoveryNode node = nodeIdToDiscoveryNode.apply(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId());
AliasFilter filter = this.aliasFilter.get(shard.index().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
ShardSearchTransportRequest transportRequest = new ShardSearchTransportRequest(request, shardIt.shardId(), shardsIts.size(),
filter, indexBoost, startTime());
sendExecuteFirstPhase(node, transportRequest , new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt);
}
sendExecuteFirstPhase(connection, transportRequest, new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt);
}
@Override
public void onFailure(Exception t) {
onFirstPhaseResult(shardIndex, shard, node.getId(), shardIt, t);
}
});
@Override
public void onFailure(Exception t) {
onFirstPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t);
}
});
} catch (ConnectTransportException | IllegalArgumentException ex) {
// we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to
// the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected
// at all which is not not needed anymore.
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, ex);
}
}
}
@ -292,8 +298,8 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
private void raiseEarlyFailure(Exception e) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.shardTarget().getNodeId());
sendReleaseSearchContext(entry.value.id(), node);
Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId());
sendReleaseSearchContext(entry.value.id(), connection);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.trace("failed to release context", inner);
@ -317,8 +323,8 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
if (queryResult.hasHits()
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.queryResult().shardTarget().getNodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), node);
Transport.Connection connection = nodeIdToConnection.apply(entry.value.queryResult().shardTarget().getNodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), connection);
} catch (Exception e) {
logger.trace("failed to release context", e);
}
@ -327,9 +333,9 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
}
protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {
if (node != null) {
searchTransportService.sendFreeContext(node, contextId, request);
protected void sendReleaseSearchContext(long contextId, Transport.Connection connection) {
if (connection != null) {
searchTransportService.sendFreeContext(connection, contextId, request);
}
}
@ -339,7 +345,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
}
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
protected abstract void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener<FirstResult> listener);
protected final void processFirstPhaseResult(int shardIndex, FirstResult result) {

View File

@ -0,0 +1,502 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
/**
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
* current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not
* fully connected with the current node. From a connection perspective a local cluster forms a bi-directional star network while in the
* remote case we only connect to a subset of the nodes in the cluster in an uni-directional fashion.
*
* This class also handles the discovery of nodes from the remote cluster. The initial list of seed nodes is only used to discover all nodes
* in the remote cluster and connects to all eligible nodes, for details see {@link RemoteClusterService#REMOTE_NODE_ATTRIBUTE}.
*
* In the case of a disconnection, this class will issue a re-connect task to establish at most
* {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of
* connections per cluster has been reached.
*/
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable {
private final TransportService transportService;
private final ConnectionProfile remoteProfile;
private final Set<DiscoveryNode> connectedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Supplier<DiscoveryNode> nodeSupplier;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private volatile List<DiscoveryNode> seedNodes;
private final ConnectHandler connectHandler;
/**
* Creates a new {@link RemoteClusterConnection}
* @param settings the nodes settings object
* @param clusterAlias the configured alias of the cluster to connect to
* @param seedNodes a list of seed nodes to discover eligible nodes from
* @param transportService the local nodes transport service
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable?
builder.addConnections(0, // we don't want this to be used for anything else but search
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY);
remoteProfile = builder.build();
nodeSupplier = new Supplier<DiscoveryNode>() {
private volatile Iterator<DiscoveryNode> current;
@Override
public DiscoveryNode get() {
if (current == null || current.hasNext() == false) {
current = connectedNodes.iterator();
if (current.hasNext() == false) {
throw new IllegalStateException("No node available for cluster: " + clusterAlias + " nodes: " + connectedNodes);
}
}
return current.next();
}
};
this.seedNodes = Collections.unmodifiableList(seedNodes);
this.connectHandler = new ConnectHandler();
transportService.addConnectionListener(this);
}
/**
* Updates the list of seed nodes for this cluster connection
*/
synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
connectHandler.connect(connectListener);
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
boolean remove = connectedNodes.remove(node);
if (remove && connectedNodes.size() < maxNumRemoteConnections) {
// try to reconnect and fill up the slot of the disconnected node
connectHandler.forceConnect();
}
}
/**
* Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
*/
public void fetchSearchShards(SearchRequest searchRequest, final List<String> indices,
ActionListener<ClusterSearchShardsResponse> listener) {
if (connectedNodes.isEmpty()) {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
// we can't proceed with a search on a cluster level.
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller
// end since they provide the listener.
connectHandler.connect(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, indices, listener), listener::onFailure));
} else {
fetchShardsInternal(searchRequest, indices, listener);
}
}
private void fetchShardsInternal(SearchRequest searchRequest, List<String> indices,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = nodeSupplier.get();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()]))
.indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference())
.routing(searchRequest.routing());
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
new TransportResponseHandler<ClusterSearchShardsResponse>() {
@Override
public ClusterSearchShardsResponse newInstance() {
return new ClusterSearchShardsResponse();
}
@Override
public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
listener.onResponse(clusterSearchShardsResponse);
}
@Override
public void handleException(TransportException e) {
listener.onFailure(e);
}
@Override
public String executor() {
return ThreadPool.Names.SEARCH;
}
});
}
/**
* Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the
* given node.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
DiscoveryNode discoveryNode = nodeSupplier.get();
Transport.Connection connection = transportService.getConnection(discoveryNode);
return new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return remoteClusterNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(remoteClusterNode, request), options);
}
@Override
public void close() throws IOException {
assert false: "proxy connections must not be closed";
}
};
}
@Override
public void close() throws IOException {
connectHandler.close();
}
public boolean isClosed() {
return connectHandler.isClosed();
}
/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
* while another job is running the provided listeners are queued and batched up until the current running job returns.
*
* The handler has a built-in queue that can hold up to 100 connect attempts and will reject requests once the queue is full.
* In a scenario when a remote cluster becomes unavailable we will queue requests up but if we can't connect quick enough
* we will just reject the connect trigger which will lead to failing searches.
*/
private class ConnectHandler implements Closeable {
private final Semaphore running = new Semaphore(1);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(100);
private final CancellableThreads cancellableThreads = new CancellableThreads();
/**
* Triggers a connect round iff there are pending requests queued up and if there is no
* connect round currently running.
*/
void maybeConnect() {
connect(null);
}
/**
* Triggers a connect round unless there is one running already. If there is a connect round running, the listener will either
* be queued or rejected and failed.
*/
void connect(ActionListener<Void> connectListener) {
connect(connectListener, false);
}
/**
* Triggers a connect round unless there is one already running. In contrast to {@link #maybeConnect()} will this method also
* trigger a connect round if there is no listener queued up.
*/
void forceConnect() {
connect(null, true);
}
private void connect(ActionListener<Void> connectListener, boolean forceRun) {
final boolean runConnect;
final Collection<ActionListener<Void>> toNotify;
synchronized (queue) {
if (connectListener != null && queue.offer(connectListener) == false) {
connectListener.onFailure(new RejectedExecutionException("connect queue is full"));
return;
}
if (forceRun == false && queue.isEmpty()) {
return;
}
runConnect = running.tryAcquire();
if (runConnect) {
toNotify = new ArrayList<>();
queue.drainTo(toNotify);
if (closed.get()) {
running.release();
ActionListener.onFailure(toNotify, new AlreadyClosedException("connect handler is already closed"));
return;
}
} else {
toNotify = Collections.emptyList();
}
}
if (runConnect) {
forkConnect(toNotify);
}
}
private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
ThreadPool threadPool = transportService.getThreadPool();
ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
executor.submit(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
synchronized (queue) {
running.release();
}
try {
ActionListener.onFailure(toNotify, e);
} finally {
maybeConnect();
}
}
@Override
protected void doRun() throws Exception {
ActionListener<Void> listener = ActionListener.wrap((x) -> {
synchronized (queue) {
running.release();
}
try {
ActionListener.onResponse(toNotify, x);
} finally {
maybeConnect();
}
}, (e) -> {
synchronized (queue) {
running.release();
}
try {
ActionListener.onFailure(toNotify, e);
} finally {
maybeConnect();
}
});
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
}
});
}
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode handshakeNode;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
boolean success = false;
try {
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
(c) -> true);
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
transportService.connectToNode(handshakeNode, remoteProfile);
connectedNodes.add(handshakeNode);
}
ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.nodes(true);
transportService.sendRequest(connection,
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new SniffClusterStateResponseHandler(transportService, connection, listener, seedNodes,
cancellableThreads));
success = true;
} finally {
if (success == false) {
connection.close();
}
}
});
} else {
listener.onFailure(new IllegalStateException("no seed node left"));
}
} catch (CancellableThreads.ExecutionCancelledException ex) {
listener.onFailure(ex); // we got canceled - fail the listener and step out
} catch (ConnectTransportException | IOException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
if (seedNodes.hasNext()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
} else {
listener.onFailure(ex);
}
}
}
@Override
public void close() throws IOException {
try {
if (closed.compareAndSet(false, true)) {
cancellableThreads.cancel("connect handler is closed");
running.acquire(); // acquire the semaphore to ensure all connections are closed and all thread joined
running.release();
maybeConnect(); // now go and notify pending listeners
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
final boolean isClosed() {
return closed.get();
}
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
private final TransportService transportService;
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<DiscoveryNode> seedNodes;
private final CancellableThreads cancellableThreads;
SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
ActionListener<Void> listener, Iterator<DiscoveryNode> seedNodes,
CancellableThreads cancellableThreads) {
this.transportService = transportService;
this.connection = connection;
this.listener = listener;
this.seedNodes = seedNodes;
this.cancellableThreads = cancellableThreads;
}
@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
@Override
public void handleResponse(ClusterStateResponse response) {
try {
cancellableThreads.executeIO(() -> {
DiscoveryNodes nodes = response.getState().nodes();
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, remoteProfile); // noop if node is connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
// fair enough we can't connect just move on
logger.debug((Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node {}", node), ex);
}
}
}
});
connection.close();
listener.onResponse(null);
} catch (CancellableThreads.ExecutionCancelledException ex) {
listener.onFailure(ex); // we got canceled - fail the listener and step out
} catch (Exception ex) {
logger.warn((Supplier<?>)
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
} finally {
// just to make sure we don't leak anything we close the connection here again even if we managed to do so before
IOUtils.closeWhileHandlingException(connection);
}
}
@Override
public void handleException(TransportException exp) {
logger.warn((Supplier<?>)
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias),
exp);
try {
IOUtils.closeWhileHandlingException(connection);
} finally {
// once the connection is closed lets try the next node
collectRemoteNodes(seedNodes, transportService, listener);
}
}
@Override
public String executor() {
return ThreadPool.Names.MANAGEMENT;
}
}
}
boolean assertNoRunningConnections() { // for testing only
assert connectHandler.running.availablePermits() == 1;
return true;
}
}

View File

@ -0,0 +1,395 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Basic service for accessing remote clusters via gateway nodes
*/
public final class RemoteClusterService extends AbstractComponent implements Closeable {
static final String LOCAL_CLUSTER_GROUP_KEY = "";
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterService::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
/**
* The maximum number of connections that will be established to a remote cluster. For instance if there is only a single
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
*/
public static final Setting<Integer> REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster",
3, 1, Setting.Property.NodeScope);
/**
* The initial connect timeout for remote cluster connections
*/
public static final Setting<TimeValue> REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING =
Setting.positiveTimeSetting("search.remote.initial_connect_timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
/**
* The name of a node attribute to select nodes that should be connected to in the remote cluster.
* For instance a node can be configured with <tt>node.attr.gateway: true</tt> in order to be eligible as a gateway node between
* clusters. In that case <tt>search.remote.node.attr: gateway</tt> can be used to filter out other nodes in the remote cluster.
* The value of the setting is expected to be a boolean, <tt>true</tt> for nodes that can become gateways, <tt>false</tt> otherwise.
*/
public static final Setting<String> REMOTE_NODE_ATTRIBUTE = Setting.simpleString("search.remote.node.attr",
Setting.Property.NodeScope);
private static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
private final TransportService transportService;
private final int numRemoteConnections;
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings);
}
/**
* This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
* @param connectionListener a listener invoked once every configured cluster has been connected to
*/
private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
Map<String, RemoteClusterConnection> remoteClusters = new HashMap<>();
if (seeds.isEmpty()) {
connectionListener.onResponse(null);
} else {
CountDown countDown = new CountDown(seeds.size());
Predicate<DiscoveryNode> nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion());
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for
// cross cluster search
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
nodePredicate = nodePredicate.and((node) -> Boolean.getBoolean(node.getAttributes().getOrDefault(attribute, "false")));
}
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection
try {
IOUtils.close(remote);
} catch (IOException e) {
logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), e);
}
remoteClusters.remove(entry.getKey());
continue;
}
if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections,
nodePredicate);
remoteClusters.put(entry.getKey(), remote);
}
// now update the seed nodes no matter if it's new or already existing
RemoteClusterConnection finalRemote = remote;
remote.updateSeedNodes(entry.getValue(), ActionListener.wrap(
response -> {
if (countDown.countDown()) {
connectionListener.onResponse(response);
}
},
exception -> {
if (countDown.fastForward()) {
connectionListener.onFailure(exception);
}
if (finalRemote.isClosed() == false) {
logger.warn("failed to update seed list for cluster: " + entry.getKey(), exception);
}
}));
}
}
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
}
/**
* Returns <code>true</code> if at least one remote cluster is configured
*/
boolean isCrossClusterSearchEnabled() {
return remoteClusters.isEmpty() == false;
}
/**
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
* {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable.
*
* @param requestIndices the indices in the search request to filter
* @param indexExists a predicate that can test if a certain index or alias exists
*
* @return a map of grouped remote and local indices
*/
Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
for (String index : requestIndices) {
int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
String indexName = index;
String clusterName = LOCAL_CLUSTER_GROUP_KEY;
if (i >= 0) {
String remoteClusterName = index.substring(0, i);
if (isRemoteClusterRegistered(remoteClusterName)) {
if (indexExists.test(index)) {
// we use : as a separator for remote clusters. might conflict if there is an index that is actually named
// remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias
// if that happens
throw new IllegalArgumentException("Can not filter indices; index " + index +
" exists but there is also a remote cluster named: " + remoteClusterName);
}
indexName = index.substring(i + 1);
clusterName = remoteClusterName;
}
}
perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList<String>()).add(indexName);
}
return perClusterIndices;
}
/**
* Returns <code>true</code> iff the given cluster is configured as a remote cluster. Otherwise <code>false</code>
*/
boolean isRemoteClusterRegistered(String clusterName) {
return remoteClusters.containsKey(clusterName);
}
void collectSearchShards(SearchRequest searchRequest, Map<String, List<String>> remoteIndicesByCluster,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<TransportException> transportException = new AtomicReference<>();
for (Map.Entry<String, List<String>> entry : remoteIndicesByCluster.entrySet()) {
final String clusterName = entry.getKey();
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
if (remoteClusterConnection == null) {
throw new IllegalArgumentException("no such remote cluster: " + clusterName);
}
final List<String> indices = entry.getValue();
remoteClusterConnection.fetchSearchShards(searchRequest, indices,
new ActionListener<ClusterSearchShardsResponse>() {
@Override
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
if (responsesCountDown.countDown()) {
TransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
listener.onFailure(transportException.get());
}
}
}
@Override
public void onFailure(Exception e) {
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
clusterName + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
if (responsesCountDown.countDown()) {
listener.onFailure(exception);
}
}
});
}
}
Function<String, Transport.Connection> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
List<ShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, Supplier<Transport.Connection>> nodeToCluster = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterName = entry.getKey();
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterName));
}
Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
//add the cluster name to the remote index names for indices disambiguation
//this ends up in the hits returned with the search response
ShardId shardId = clusterSearchShardsGroup.getShardId();
Index remoteIndex = shardId.getIndex();
Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID());
ShardIterator shardIterator = new PlainShardIterator(new ShardId(index, shardId.getId()),
Arrays.asList(clusterSearchShardsGroup.getShards()));
remoteShardIterators.add(shardIterator);
AliasFilter aliasFilter;
if (indicesAndFilters == null) {
aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
} else {
aliasFilter = indicesAndFilters.get(shardId.getIndexName());
assert aliasFilter != null;
}
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
}
}
return (nodeId) -> {
Supplier<Transport.Connection> supplier = nodeToCluster.get(nodeId);
if (supplier == null) {
throw new IllegalArgumentException("unknown remote node: " + nodeId);
}
return supplier.get();
};
}
/**
* Returns a connection to the given node on the given remote cluster
* @throws IllegalArgumentException if the remote cluster is unknown
*/
private Transport.Connection getConnection(DiscoveryNode node, String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);
}
return connection.getConnection(node);
}
void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
updateRemoteClusters(Collections.singletonMap(clusterAlias, addresses.stream().map(address -> {
TransportAddress transportAddress = new TransportAddress(address);
return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}).collect(Collectors.toList())),
ActionListener.wrap((x) -> {}, (x) -> {}) );
}
static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<DiscoveryNode> nodes = new ArrayList<>();
for (InetSocketAddress address : concreteSetting.get(settings)) {
TransportAddress transportAddress = new TransportAddress(address);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
nodes.add(node);
}
return nodes;
}));
}
private static InetSocketAddress parseSeedAddress(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
String host = remoteHost.substring(0, portSeparator);
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
try {
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return new InetSocketAddress(hostAddress, port);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port must be a number", e);
}
}
/**
* Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection
* to all configured seed nodes.
*/
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Map<String, List<DiscoveryNode>> seeds = buildRemoteClustersSeeds(settings);
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException ex) {
logger.warn("failed to connect to remote clusters within {}", timeValue.toString());
} catch (Exception e) {
throw new IllegalStateException("failed to connect to remote clusters", e);
}
}
@Override
public void close() throws IOException {
IOUtils.close(remoteClusters.values());
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.Map;
@ -46,12 +47,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
private final SearchPhaseController searchPhaseController;
SearchDfsQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
SearchPhaseController searchPhaseController, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts,
long startTime, long clusterStateVersion, SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, concreteIndexBoosts, executor,
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
queryFetchResults = new AtomicArray<>(firstResults.length());
@ -63,9 +64,9 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchTransportService.sendExecuteDfs(node, request, task, listener);
searchTransportService.sendExecuteDfs(connection, request, task, listener);
}
@Override
@ -75,15 +76,15 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().getNodeId());
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
executeSecondPhase(entry.index, dfsResult, counter, connection, querySearchRequest);
}
}
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchTransportService.sendExecuteFetch(node, querySearchRequest, task, new ActionListener<QueryFetchSearchResult>() {
final Transport.Connection connection, final QuerySearchRequest querySearchRequest) {
searchTransportService.sendExecuteFetch(connection, querySearchRequest, task, new ActionListener<QueryFetchSearchResult>() {
@Override
public void onResponse(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());
@ -101,7 +102,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
// the query might not have been executed at all (for example because thread pool rejected execution)
// and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
sendReleaseSearchContext(querySearchRequest.id(), connection);
}
}
});

View File

@ -39,6 +39,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.Map;
@ -54,12 +55,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
private final SearchPhaseController searchPhaseController;
SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
SearchPhaseController searchPhaseController, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion, SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, concreteIndexBoosts, executor,
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
queryResults = new AtomicArray<>(firstResults.length());
@ -73,9 +74,9 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchTransportService.sendExecuteDfs(node, request, task, listener);
searchTransportService.sendExecuteDfs(connection, request, task, listener);
}
@Override
@ -84,15 +85,15 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().getNodeId());
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, connection);
}
}
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
searchTransportService.sendExecuteQuery(node, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
final QuerySearchRequest querySearchRequest, final Transport.Connection connection) {
searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
@Override
public void onResponse(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
@ -110,7 +111,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
sendReleaseSearchContext(querySearchRequest.id(), connection);
}
}
});
@ -155,15 +156,15 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().getNodeId());
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection);
}
}
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) {
searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);

View File

@ -28,6 +28,7 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.Map;
@ -39,13 +40,13 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
private final SearchPhaseController searchPhaseController;
SearchQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, concreteIndexBoosts, executor,
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
@ -57,9 +58,9 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener<QueryFetchSearchResult> listener) {
searchTransportService.sendExecuteFetch(node, request, task, listener);
searchTransportService.sendExecuteFetch(connection, request, task, listener);
}
@Override

View File

@ -36,6 +36,7 @@ import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.Map;
@ -50,13 +51,13 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
private final SearchPhaseController searchPhaseController;
SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, concreteIndexBoosts, executor, request, listener,
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
fetchResults = new AtomicArray<>(firstResults.length());
@ -69,9 +70,9 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener<QuerySearchResultProvider> listener) {
searchTransportService.sendExecuteQuery(node, request, task, listener);
searchTransportService.sendExecuteQuery(connection, request, task, listener);
}
@Override
@ -90,15 +91,15 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().getNodeId());
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection);
}
}
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) {
searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);

View File

@ -25,9 +25,10 @@ import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsSearchResult;
@ -44,19 +45,23 @@ import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.function.Consumer;
/**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
* transport.
*/
public class SearchTransportService extends AbstractComponent {
public class SearchTransportService extends AbstractLifecycleComponent {
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
@ -72,15 +77,19 @@ public class SearchTransportService extends AbstractComponent {
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
private final TransportService transportService;
private final RemoteClusterService remoteClusterService;
public SearchTransportService(Settings settings, TransportService transportService) {
public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
super(settings);
this.transportService = transportService;
this.remoteClusterService = new RemoteClusterService(settings, transportService);
clusterSettings.addAffixUpdateConsumer(RemoteClusterService.REMOTE_CLUSTERS_SEEDS, remoteClusterService::updateRemoteCluster,
(namespace, value) -> {});
}
public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) {
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId),
new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
public void sendFreeContext(Transport.Connection connection, final long contextId, SearchRequest request) {
transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId),
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
@Override
public void onResponse(SearchFreeContextResponse response) {
// no need to respond if it was freed or not
@ -103,64 +112,68 @@ public class SearchTransportService extends AbstractComponent {
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
}
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task,
public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final ActionListener<DfsSearchResult> listener) {
transportService.sendChildRequest(node, DFS_ACTION_NAME, request, task,
transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, DfsSearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task,
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final ActionListener<QuerySearchResultProvider> listener) {
transportService.sendChildRequest(node, QUERY_ACTION_NAME, request, task,
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, SearchTask task,
public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
final ActionListener<QuerySearchResult> listener) {
transportService.sendChildRequest(node, QUERY_ID_ACTION_NAME, request, task,
transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task,
final ActionListener<ScrollQuerySearchResult> listener) {
transportService.sendChildRequest(node, QUERY_SCROLL_ACTION_NAME, request, task,
transportService.sendChildRequest(transportService.getConnection(node), QUERY_SCROLL_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task,
public void sendExecuteFetch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendChildRequest(node, QUERY_FETCH_ACTION_NAME, request, task,
transportService.sendChildRequest(connection, QUERY_FETCH_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, SearchTask task,
public void sendExecuteFetch(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendChildRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, task,
transportService.sendChildRequest(connection, QUERY_QUERY_FETCH_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task,
final ActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendChildRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, SearchTask task,
public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, task, listener);
sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener);
}
public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, task, listener);
sendExecuteFetch(transportService.getConnection(node), FETCH_ID_SCROLL_ACTION_NAME, request, task, listener);
}
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, SearchTask task,
private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) {
transportService.sendChildRequest(node, action, request, task,
transportService.sendChildRequest(connection, action, request, task,
new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
}
public RemoteClusterService getRemoteClusterService() {
return remoteClusterService;
}
static class ScrollFreeContextRequest extends TransportRequest {
private long id;
@ -265,6 +278,7 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(new SearchFreeContextResponse(freed));
}
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
@Override
@ -273,6 +287,7 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(new SearchFreeContextResponse(freed));
}
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
@ -282,6 +297,9 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
});
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
@ -291,6 +309,8 @@ public class SearchTransportService extends AbstractComponent {
}
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
@ -299,6 +319,8 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(result);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
@ -307,6 +329,8 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(result);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
@ -315,6 +339,8 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(result);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
@ -323,6 +349,8 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(result);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new);
transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
@ -331,6 +359,8 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(result);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new);
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
@ -339,6 +369,8 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(result);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
@Override
@ -347,6 +379,8 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(result);
}
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
@Override
@ -355,6 +389,24 @@ public class SearchTransportService extends AbstractComponent {
channel.sendResponse(result);
}
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
}
Transport.Connection getConnection(DiscoveryNode node) {
return transportService.getConnection(node);
}
@Override
protected void doStart() {
// here we start to connect to the remote clusters
remoteClusterService.initializeRemoteClusters();
}
@Override
protected void doStop() {}
@Override
protected void doClose() throws IOException {
remoteClusterService.close();
}
}

View File

@ -26,8 +26,11 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -38,10 +41,13 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@ -58,6 +64,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final ClusterService clusterService;
private final SearchTransportService searchTransportService;
private final RemoteClusterService remoteClusterService;
private final SearchPhaseController searchPhaseController;
private final SearchService searchService;
@ -69,12 +76,14 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
this.searchPhaseController = searchPhaseController;
this.searchTransportService = searchTransportService;
this.remoteClusterService = searchTransportService.getRemoteClusterService();
SearchTransportService.registerRequestHandler(transportService, searchService);
this.clusterService = clusterService;
this.searchService = searchService;
}
private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState, Index[] concreteIndices) {
private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState,
Index[] concreteIndices, Map<String, AliasFilter> remoteAliasMap) {
final Map<String, AliasFilter> aliasFilterMap = new HashMap<>();
for (Index index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index.getName());
@ -82,6 +91,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
assert aliasFilter != null;
aliasFilterMap.put(index.getUUID(), aliasFilter);
}
aliasFilterMap.putAll(remoteAliasMap);
return aliasFilterMap;
}
@ -104,7 +114,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
concreteIndexBoosts.putIfAbsent(concreteIndex.getUUID(), ib.getBoost());
}
}
return Collections.unmodifiableMap(concreteIndexBoosts);
}
@ -112,23 +121,62 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// pure paranoia if time goes backwards we are at least positive
final long startTimeInMillis = Math.max(0, System.currentTimeMillis());
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
final String[] localIndices;
final Map<String, List<String>> remoteClusterIndices;
final ClusterState clusterState = clusterService.state();
if (remoteClusterService.isCrossClusterSearchEnabled()) {
remoteClusterIndices = remoteClusterService.groupClusterIndices( searchRequest.indices(), // empty string is not allowed
idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
List<String> remove = remoteClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
localIndices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]);
} else {
remoteClusterIndices = Collections.emptyMap();
localIndices = searchRequest.indices();
}
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(),
(nodeId) -> null, clusterState, Collections.emptyMap(), listener);
} else {
remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices,
ActionListener.wrap((searchShardsResponses) -> {
List<ShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
Function<String, Transport.Connection> connectionFunction = remoteClusterService.processRemoteShards(
searchShardsResponses, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, remoteShardIterators,
connectionFunction, clusterState, remoteAliasFilters, listener);
}, listener::onFailure));
}
}
private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices,
List<ShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
ClusterState clusterState, Map<String, AliasFilter> remoteAliasMap,
ActionListener<SearchResponse> listener) {
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
Index[] indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
startTimeInMillis, searchRequest.indices());
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices);
final Index[] indices;
if (localIndices.length == 0 && remoteShardIterators.size() > 0) {
indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified
} else {
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
startTimeInMillis, localIndices);
}
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
String[] concreteIndices = new String[indices.length];
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
}
GroupShardsIterator shardIterators = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
searchRequest.preference());
GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, remoteShardIterators);
failIfOverShardCountLimit(clusterService, shardIterators.size());
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
@ -150,41 +198,69 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
}
searchAsyncAction((SearchTask)task, searchRequest, shardIterators, startTimeInMillis, clusterState,
final DiscoveryNodes nodes = clusterState.nodes();
Function<String, Transport.Connection> connectionLookup = (nodeId) -> {
final DiscoveryNode discoveryNode = nodes.get(nodeId);
final Transport.Connection connection;
if (discoveryNode != null) {
connection = searchTransportService.getConnection(discoveryNode);
} else {
connection = remoteConnections.apply(nodeId);
}
if (connection == null) {
throw new IllegalStateException("no node found for id: " + nodeId);
}
return connection;
};
searchAsyncAction(task, searchRequest, shardIterators, startTimeInMillis, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start();
}
private static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator,
List<ShardIterator> remoteShardIterators) {
if (remoteShardIterators.isEmpty()) {
return localShardsIterator;
}
List<ShardIterator> shards = new ArrayList<>();
for (ShardIterator shardIterator : remoteShardIterators) {
shards.add(shardIterator);
}
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(shardIterator);
}
return new GroupShardsIterator(shards);
}
@Override
protected final void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest, GroupShardsIterator shardIterators,
long startTime, ClusterState state, Map<String, AliasFilter> aliasFilter,
long startTime, Function<String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener) {
final Function<String, DiscoveryNode> nodesLookup = state.nodes()::get;
final long clusterStateVersion = state.version();
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
AbstractSearchAsyncAction searchAsyncAction;
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup,
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion, task);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup,
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion, task);
break;
case DFS_QUERY_AND_FETCH:
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup,
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion, task);
break;
case QUERY_AND_FETCH:
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup,
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion, task);
break;
@ -194,7 +270,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
return searchAsyncAction;
}
private void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
final long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING);
if (shardCount > shardCountLimit) {
throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of "

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.settings;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.search.RemoteClusterService;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
@ -54,9 +55,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
@ -253,6 +254,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterService.REMOTE_CLUSTERS_SEEDS,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
TransportService.TRACE_LOG_EXCLUDE_SETTING,
TransportService.TRACE_LOG_INCLUDE_SETTING,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,

View File

@ -42,6 +42,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@ -481,7 +482,7 @@ public class Setting<T> extends ToXContentToBytes {
public Map<AbstractScopedSettings.SettingUpdater<T>, T> getValue(Settings current, Settings previous) {
// we collect all concrete keys and then delegate to the actual setting for validation and settings extraction
final Map<AbstractScopedSettings.SettingUpdater<T>, T> result = new IdentityHashMap<>();
Stream.concat(matchStream(current), matchStream(previous)).forEach(aKey -> {
Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> {
String namespace = key.getNamespace(aKey);
AbstractScopedSettings.SettingUpdater<T> updater =
getConcreteSetting(aKey).newUpdater((v) -> consumer.accept(namespace, v), logger,
@ -505,6 +506,18 @@ public class Setting<T> extends ToXContentToBytes {
};
}
@Override
public T get(Settings settings) {
throw new UnsupportedOperationException("affix settings can't return values" +
" use #getConcreteSetting to obtain a concrete setting");
}
@Override
public String getRaw(Settings settings) {
throw new UnsupportedOperationException("affix settings can't return values" +
" use #getConcreteSetting to obtain a concrete setting");
}
@Override
public Setting<T> getConcreteSetting(String key) {
if (match(key)) {
@ -518,6 +531,22 @@ public class Setting<T> extends ToXContentToBytes {
public void diff(Settings.Builder builder, Settings source, Settings defaultSettings) {
matchStream(defaultSettings).forEach((key) -> getConcreteSetting(key).diff(builder, source, defaultSettings));
}
/**
* Returns the namespace for a concrete settting. Ie. an affix setting with prefix: <tt>search.</tt> and suffix: <tt>username</tt>
* will return <tt>remote</tt> as a namespace for the setting <tt>search.remote.username</tt>
*/
public String getNamespace(Setting<T> concreteSetting) {
return key.getNamespace(concreteSetting.getKey());
}
/**
* Returns a stream of all concrete setting instances for the given settings. AffixSetting is only a specification, concrete
* settings depend on an actual set of setting keys.
*/
public Stream<Setting<T>> getAllConcreteSettings(Settings settings) {
return matchStream(settings).distinct().map(this::getConcreteSetting);
}
}

View File

@ -448,7 +448,8 @@ public class Node implements Closeable {
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase()));
b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings, transportService));
b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings,
settingsModule.getClusterSettings(), transportService));
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
scriptModule.getScriptService()));
b.bind(Transport.class).toInstance(transport);
@ -643,12 +644,16 @@ public class Node implements Closeable {
}
}
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).start();
}
// start nodes now, after the http server, because it may take some time
tribeService.startNodes();
// starts connecting to remote clusters if any cluster is configured
SearchTransportService searchTransportService = injector.getInstance(SearchTransportService.class);
searchTransportService.start();
if (WRITE_PORTS_FIELD_SETTING.get(settings)) {
if (NetworkModule.HTTP_ENABLED.get(settings)) {
@ -692,6 +697,7 @@ public class Node implements Closeable {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
injector.getInstance(SearchTransportService.class).stop();
pluginLifecycleComponents.forEach(LifecycleComponent::stop);
// we should stop this last since it waits for resources to get released
@ -753,6 +759,8 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(() -> stopWatch.stop().start("search_transport_service"));
toClose.add(injector.getInstance(SearchTransportService.class));
for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));

View File

@ -128,4 +128,13 @@ public final class AliasFilter implements Writeable {
public int hashCode() {
return Objects.hash(aliases, filter, reparseAliases);
}
@Override
public String toString() {
return "AliasFilter{" +
"aliases=" + Arrays.toString(aliases) +
", filter=" + filter +
", reparseAliases=" + reparseAliases +
'}';
}
}

View File

@ -198,7 +198,7 @@ public final class ConnectionProfile {
*/
<T> T getChannel(T[] channels) {
if (length == 0) {
throw new IllegalStateException("can't select channel size is 0");
throw new IllegalStateException("can't select channel size is 0 for types: " + types);
}
assert channels.length >= offset + length : "illegal size: " + channels.length + " expected >= " + (offset + length);
return channels[offset + Math.floorMod(counter.incrementAndGet(), length)];

View File

@ -0,0 +1,150 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
/**
* TransportActionProxy allows an arbitrary action to be executed on a defined target node while the initial request is sent to a second
* node that acts as a request proxy to the target node. This is useful if a node is not directly connected to a target node but is
* connected to an intermediate node that establishes a transitive connection.
*/
public final class TransportActionProxy {
private TransportActionProxy() {} // no instance
private static class ProxyRequestHandler<T extends ProxyRequest> implements TransportRequestHandler<T> {
private final TransportService service;
private final String action;
private final Supplier<TransportResponse> responseFactory;
ProxyRequestHandler(TransportService service, String action, Supplier<TransportResponse> responseFactory) {
this.service = service;
this.action = action;
this.responseFactory = responseFactory;
}
@Override
public void messageReceived(T request, TransportChannel channel) throws Exception {
DiscoveryNode targetNode = request.targetNode;
TransportRequest wrappedRequest = request.wrapped;
service.sendRequest(targetNode, action, wrappedRequest, new ProxyResponseHandler<>(channel, responseFactory));
}
}
private static class ProxyResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
private final Supplier<T> responseFactory;
private final TransportChannel channel;
ProxyResponseHandler(TransportChannel channel, Supplier<T> responseFactory) {
this.responseFactory = responseFactory;
this.channel = channel;
}
@Override
public T newInstance() {
return responseFactory.get();
}
@Override
public void handleResponse(T response) {
try {
channel.sendResponse(response);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void handleException(TransportException exp) {
try {
channel.sendResponse(exp);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
T wrapped;
Supplier<T> supplier;
DiscoveryNode targetNode;
ProxyRequest(Supplier<T> supplier) {
this.supplier = supplier;
}
ProxyRequest(T wrapped, DiscoveryNode targetNode) {
this.wrapped = wrapped;
this.targetNode = targetNode;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
targetNode = new DiscoveryNode(in);
wrapped = supplier.get();
wrapped.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
targetNode.writeTo(out);
wrapped.writeTo(out);
}
}
/**
* Registers a proxy request handler that allows to forward requests for the given action to another node.
*/
public static void registerProxyAction(TransportService service, String action, Supplier<TransportResponse> responseSupplier) {
RequestHandlerRegistry requestHandler = service.getRequestHandler(action);
service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME,
true, false, new ProxyRequestHandler<>(service, action, responseSupplier));
}
/**
* Returns the corresponding proxy action for the given action
*/
public static String getProxyAction(String action) {
return "internal:transport/proxy/" + action;
}
/**
* Wraps the actual request in a proxy request object that encodes the target node.
*/
public static TransportRequest wrapRequest(DiscoveryNode node, TransportRequest request) {
return new ProxyRequest<>(request, node);
}
}

View File

@ -63,6 +63,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
@ -163,9 +164,6 @@ public class TransportService extends AbstractLifecycleComponent {
}
}
/**
* Returns the local node representation
*/
public DiscoveryNode getLocalNode() {
return localNode;
}
@ -341,6 +339,25 @@ public class TransportService extends AbstractLifecycleComponent {
public DiscoveryNode handshake(
final Transport.Connection connection,
final long handshakeTimeout) throws ConnectTransportException {
return handshake(connection, handshakeTimeout, clusterName::equals);
}
/**
* Executes a high-level handshake using the given connection
* and returns the discovery node of the node the connection
* was established with. The handshake will fail if the cluster
* name on the target node doesn't match the local cluster name.
*
* @param connection the connection to a specific node
* @param handshakeTimeout handshake timeout
* @param clusterNamePredicate cluster name validation predicate
* @return the connected node
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
*/
public DiscoveryNode handshake(
final Transport.Connection connection,
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
final HandshakeResponse response;
final DiscoveryNode node = connection.getNode();
try {
@ -358,7 +375,7 @@ public class TransportService extends AbstractLifecycleComponent {
throw new IllegalStateException("handshake failed with " + node, e);
}
if (!Objects.equals(clusterName, response.clusterName)) {
if (!clusterNamePredicate.test(response.clusterName)) {
throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node);
} else if (response.version.isCompatible(localNode.getVersion()) == false) {
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
@ -486,19 +503,18 @@ public class TransportService extends AbstractLifecycleComponent {
}
}
public <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportResponseHandler<T> handler) {
sendChildRequest(node, action, request, parentTask, TransportRequestOptions.EMPTY, handler);
sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler);
}
public <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
request.setParentTask(localNode.getId(), parentTask.getId());
try {
final Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, options, handler);
} catch (TaskCancelledException ex) {
// The parent task is already cancelled - just fail the request
@ -1134,6 +1150,13 @@ public class TransportService extends AbstractLifecycleComponent {
}
}
/**
* Returns the internal thread pool
*/
public ThreadPool getThreadPool() {
return threadPool;
}
private boolean isLocalNode(DiscoveryNode discoveryNode) {
return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode);
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class ActionListenerTests extends ESTestCase {
public void testWrap() {
AtomicReference<Boolean> reference = new AtomicReference<>();
AtomicReference<Exception> exReference = new AtomicReference<>();
CheckedConsumer<Boolean, ? extends Exception> handler = (o) -> {
if (Boolean.FALSE.equals(o)) {
throw new IllegalArgumentException("must not be false");
}
reference.set(o);
};
ActionListener<Boolean> wrap = ActionListener.wrap(handler, exReference::set);
wrap.onResponse(Boolean.FALSE);
assertNull(reference.get());
assertNotNull(exReference.get());
assertEquals("must not be false", exReference.get().getMessage());
exReference.set(null);
wrap.onResponse(Boolean.TRUE);
assertTrue(reference.get());
assertNull(exReference.get());
}
public void testOnResponse() {
final int numListeners = randomIntBetween(1, 20);
List<AtomicReference<Boolean>> refList = new ArrayList<>();
List<AtomicReference<Exception>> excList = new ArrayList<>();
List<ActionListener<Boolean>> listeners = new ArrayList<>();
List<Boolean> failOnTrue = new ArrayList<>();
AtomicInteger exceptionCounter = new AtomicInteger(0);
for (int i = 0; i < numListeners; i++) {
boolean doFailOnTrue = rarely();
failOnTrue.add(doFailOnTrue);
AtomicReference<Boolean> reference = new AtomicReference<>();
AtomicReference<Exception> exReference = new AtomicReference<>();
refList.add(reference);
excList.add(exReference);
CheckedConsumer<Boolean, ? extends Exception> handler = (o) -> {
if (Boolean.FALSE.equals(o)) {
throw new IllegalArgumentException("must not be false " + exceptionCounter.getAndIncrement());
}
if (doFailOnTrue) {
throw new IllegalStateException("must not be true");
}
reference.set(o);
};
listeners.add(ActionListener.wrap(handler, exReference::set));
}
ActionListener.onResponse(listeners, Boolean.TRUE);
for (int i = 0; i < numListeners; i++) {
if (failOnTrue.get(i) == false) {
assertTrue("listener index " + i, refList.get(i).get());
refList.get(i).set(null);
} else {
assertNull("listener index " + i, refList.get(i).get());
}
}
for (int i = 0; i < numListeners; i++) {
if (failOnTrue.get(i) == false) {
assertNull("listener index " + i, excList.get(i).get());
} else {
assertEquals("listener index " + i, "must not be true", excList.get(i).get().getMessage());
}
}
ActionListener.onResponse(listeners, Boolean.FALSE);
for (int i = 0; i < numListeners; i++) {
assertNull("listener index " + i, refList.get(i).get());
}
assertEquals(numListeners, exceptionCounter.get());
for (int i = 0; i < numListeners; i++) {
assertNotNull(excList.get(i).get());
assertEquals("listener index " + i, "must not be false " + i, excList.get(i).get().getMessage());
}
}
public void testOnFailure() {
final int numListeners = randomIntBetween(1, 20);
List<AtomicReference<Boolean>> refList = new ArrayList<>();
List<AtomicReference<Exception>> excList = new ArrayList<>();
List<ActionListener<Boolean>> listeners = new ArrayList<>();
final int listenerToFail = randomBoolean() ? -1 : randomIntBetween(0, numListeners-1);
for (int i = 0; i < numListeners; i++) {
AtomicReference<Boolean> reference = new AtomicReference<>();
AtomicReference<Exception> exReference = new AtomicReference<>();
refList.add(reference);
excList.add(exReference);
boolean fail = i == listenerToFail;
CheckedConsumer<Boolean, ? extends Exception> handler = (o) -> {
reference.set(o);
};
listeners.add(ActionListener.wrap(handler, (e) -> {
exReference.set(e);
if (fail) {
throw new RuntimeException("double boom");
}
}));
}
try {
ActionListener.onFailure(listeners, new Exception("booom"));
assertTrue("unexpected succces listener to fail: " + listenerToFail, listenerToFail == -1);
} catch (RuntimeException ex) {
assertTrue("listener to fail: " + listenerToFail, listenerToFail >= 0);
assertNotNull(ex.getCause());
assertEquals("double boom", ex.getCause().getMessage());
}
for (int i = 0; i < numListeners; i++) {
assertNull("listener index " + i, refList.get(i).get());
}
for (int i = 0; i < numListeners; i++) {
assertEquals("listener index " + i, "booom", excList.get(i).get().getMessage());
}
}
}

View File

@ -0,0 +1,511 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportConnectionListener;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.AlreadyConnectedException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
public class RemoteClusterConnectionTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
private MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes, Version version) {
return startTransport(id, knownNodes, version, threadPool);
}
public static MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes, Version version, ThreadPool threadPool) {
boolean success = false;
MockTransportService newService = MockTransportService.createNewService(Settings.EMPTY, version, threadPool, null);
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ClusterSearchShardsRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(ClusterStateAction.NAME, ClusterStateRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
builder.add(node);
}
ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(builder.build()).build();
channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build));
});
newService.start();
newService.acceptIncomingRequests();
success = true;
return newService;
} finally {
if (success == false) {
newService.close();
}
}
}
public void testDiscoverSingleNode() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService incomaptibleTransport = startTransport("incompat_seed_node", knownNodes, Version.fromString("2.0.0"));
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
DiscoveryNode incompatibleSeedNode = incomaptibleTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(incomaptibleTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<DiscoveryNode> seedNodes = Arrays.asList(incompatibleSeedNode, seedNode);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, seedNodes);
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertFalse(service.nodeConnected(incompatibleSeedNode));
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
public void testNodeDisconnected() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT);
MockTransportService spareTransport = startTransport("spare_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
DiscoveryNode spareNode = spareTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertFalse(service.nodeConnected(spareNode));
knownNodes.add(spareNode);
CountDownLatch latchDisconnect = new CountDownLatch(1);
CountDownLatch latchConnected = new CountDownLatch(1);
service.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (node.equals(discoverableNode)) {
latchDisconnect.countDown();
}
}
@Override
public void onNodeConnected(DiscoveryNode node) {
if (node.equals(spareNode)) {
latchConnected.countDown();
}
}
});
discoverableTransport.close();
// now make sure we try to connect again to other nodes once we got disconnected
assertTrue(latchDisconnect.await(10, TimeUnit.SECONDS));
assertTrue(latchConnected.await(10, TimeUnit.SECONDS));
assertTrue(service.nodeConnected(spareNode));
}
}
}
}
public void testFilterDiscoveredNodes() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
DiscoveryNode rejectedNode = randomBoolean() ? seedNode : discoverableNode;
Collections.shuffle(knownNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
if (rejectedNode.equals(seedNode)) {
assertFalse(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
} else {
assertTrue(service.nodeConnected(seedNode));
assertFalse(service.nodeConnected(discoverableNode));
}
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
private void updateSeedNodes(RemoteClusterConnection connection, List<DiscoveryNode> seedNodes) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
exceptionAtomicReference.set(x);
latch.countDown();
});
connection.updateSeedNodes(seedNodes, listener);
latch.await();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
}
}
public void testConnectWithIncompatibleTransports() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.fromString("2.0.0"))) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(seedNode)));
assertFalse(service.nodeConnected(seedNode));
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
@SuppressForbidden(reason = "calls getLocalHost here but it's fine in this case")
public void testSlowNodeCanBeCanceled() throws IOException, InterruptedException {
try (ServerSocket socket = new MockServerSocket()) {
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1);
socket.setReuseAddress(true);
DiscoveryNode seedNode = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
socket.getLocalPort()), emptyMap(),
emptySet(), Version.CURRENT);
CountDownLatch acceptedLatch = new CountDownLatch(1);
CountDownLatch closeRemote = new CountDownLatch(1);
Thread t = new Thread() {
@Override
public void run() {
try (Socket accept = socket.accept()) {
acceptedLatch.countDown();
closeRemote.await();
} catch (IOException e) {
// that's fine we might close
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
t.start();
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
CountDownLatch listenerCalled = new CountDownLatch(1);
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
ActionListener<Void> listener = ActionListener.wrap(x -> {
listenerCalled.countDown();
fail("expected exception");
}, x -> {
exceptionReference.set(x);
listenerCalled.countDown();
});
connection.updateSeedNodes(Arrays.asList(seedNode), listener);
acceptedLatch.await();
connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on
assertTrue(connection.assertNoRunningConnections());
}
closeRemote.countDown();
listenerCalled.await();
assertNotNull(exceptionReference.get());
expectThrows(CancellableThreads.ExecutionCancelledException.class, () -> {throw exceptionReference.get();});
}
}
}
public void testFetchShards() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
if (randomBoolean()) {
updateSeedNodes(connection, Arrays.asList(seedNode));
}
SearchRequest request = new SearchRequest("test-index");
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
ActionListener<ClusterSearchShardsResponse> shardsListener = ActionListener.wrap(
x -> {
reference.set(x);
responseLatch.countDown();
},
x -> {
failReference.set(x);
responseLatch.countDown();
});
connection.fetchSearchShards(request, Arrays.asList("test-index"), shardsListener);
responseLatch.await();
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get();
assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes()));
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService seedTransport1 = startTransport("seed_node_1", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
DiscoveryNode seedNode1 = seedTransport1.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(seedTransport1.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<DiscoveryNode> seedNodes = Arrays.asList(seedNode1, seedNode);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true)) {
int numThreads = randomIntBetween(4, 10);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads);
for (int i = 0; i < threads.length; i++) {
final int numConnectionAttempts = randomIntBetween(10, 200);
threads[i] = new Thread() {
@Override
public void run() {
try {
barrier.await();
CountDownLatch latch = new CountDownLatch(numConnectionAttempts);
for (int i = 0; i < numConnectionAttempts; i++) {
AtomicBoolean executed = new AtomicBoolean(false);
ActionListener<Void> listener = ActionListener.wrap(x -> {
assertTrue(executed.compareAndSet(false, true));
latch.countDown();}, x -> {
assertTrue(executed.compareAndSet(false, true));
latch.countDown();
if (x instanceof RejectedExecutionException) {
// that's fine
} else {
throw new AssertionError(x);
}
});
connection.updateSeedNodes(seedNodes, listener);
}
latch.await();
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
};
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(service.nodeConnected(seedNode1));
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
public void testCloseWhileConcurrentlyConnecting() throws IOException, InterruptedException, BrokenBarrierException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService seedTransport1 = startTransport("seed_node_1", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode seedNode1 = seedTransport1.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(seedTransport1.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<DiscoveryNode> seedNodes = Arrays.asList(seedNode1, seedNode);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true)) {
int numThreads = randomIntBetween(4, 10);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < threads.length; i++) {
final int numConnectionAttempts = randomIntBetween(10, 100);
threads[i] = new Thread() {
@Override
public void run() {
try {
barrier.await();
CountDownLatch latch = new CountDownLatch(numConnectionAttempts);
for (int i = 0; i < numConnectionAttempts; i++) {
AtomicReference<RuntimeException> executed = new AtomicReference<>();
ActionListener<Void> listener = ActionListener.wrap(
x -> {
if (executed.compareAndSet(null, new RuntimeException())) {
latch.countDown();
} else {
throw new AssertionError("shit's been called twice", executed.get());
}
},
x -> {
if (executed.compareAndSet(null, new RuntimeException())) {
latch.countDown();
} else {
throw new AssertionError("shit's been called twice", executed.get());
}
if (x instanceof RejectedExecutionException || x instanceof AlreadyClosedException
|| x instanceof CancellableThreads.ExecutionCancelledException) {
// that's fine
} else {
throw new AssertionError(x);
}
});
connection.updateSeedNodes(seedNodes, listener);
}
latch.await();
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
};
threads[i].start();
}
barrier.await();
connection.close();
}
}
}
}
}

View File

@ -0,0 +1,250 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
public class RemoteClusterServiceTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
private MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes, Version version) {
return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool);
}
public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTERS_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE));
}
public void testRemoteClusterSeedSetting() {
// simple validation
Settings settings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.bar.seed", "[::1]:9090").build();
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
Settings brokenSettings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
}
public void testBuiltRemoteClustersSeeds() throws Exception {
Map<String, List<DiscoveryNode>> map = RemoteClusterService.buildRemoteClustersSeeds(
Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build());
assertEquals(2, map.size());
assertTrue(map.containsKey("foo"));
assertTrue(map.containsKey("bar"));
assertEquals(1, map.get("foo").size());
assertEquals(1, map.get("bar").size());
DiscoveryNode foo = map.get("foo").get(0);
assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080)));
assertEquals(foo.getId(), "foo#192.168.0.1:8080");
assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode bar = map.get("bar").get(0);
assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090)));
assertEquals(bar.getId(), "bar#[::1]:9090");
assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
}
public void testGroupClusterIndices() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(otherSeedTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteClusterRegistered("foo"));
Map<String, List<String>> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> false);
String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
k -> Collections.emptyList()).toArray(new String[0]);
assertNotNull(perClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY));
assertArrayEquals(new String[]{"foo:bar", "foo"}, localIndices);
assertEquals(2, perClusterIndices.size());
assertEquals(Arrays.asList("bar", "test"), perClusterIndices.get("cluster_1"));
assertEquals(Arrays.asList("foo:bar", "foo*"), perClusterIndices.get("cluster_2"));
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> "cluster_1:bar".equals(i)));
assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" +
" cluster_1", iae.getMessage());
}
}
}
}
public void testIncrementallyAddClusters() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(otherSeedTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address()));
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address()));
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
service.updateRemoteCluster("cluster_2", Collections.emptyList());
assertFalse(service.isRemoteClusterRegistered("cluster_2"));
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
() -> service.updateRemoteCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList()));
assertEquals("remote clusters must not have the empty string as its key", iae.getMessage());
}
}
}
}
public void testProcessRemoteShards() throws IOException {
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, null)) {
assertFalse(service.isCrossClusterSearchEnabled());
List<ShardIterator> iteratorList = new ArrayList<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponseMap = new HashMap<>();
DiscoveryNode[] nodes = new DiscoveryNode[] {
new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT),
new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT)
};
Map<String, AliasFilter> indicesAndAliases = new HashMap<>();
indicesAndAliases.put("foo", new AliasFilter(new TermsQueryBuilder("foo", "bar"), Strings.EMPTY_ARRAY));
indicesAndAliases.put("bar", new AliasFilter(new MatchAllQueryBuilder(), Strings.EMPTY_ARRAY));
ClusterSearchShardsGroup[] groups = new ClusterSearchShardsGroup[] {
new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("foo", 0, "node2", false, ShardRoutingState.STARTED)}),
new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 1),
new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("foo", 1, "node2", false, ShardRoutingState.STARTED)}),
new ClusterSearchShardsGroup(new ShardId("bar", "bar_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("bar", 0, "node2", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("bar", 0, "node1", false, ShardRoutingState.STARTED)})
};
searchShardsResponseMap.put("test_cluster_1", new ClusterSearchShardsResponse(groups, nodes, indicesAndAliases));
Map<String, AliasFilter> remoteAliases = new HashMap<>();
service.processRemoteShards(searchShardsResponseMap, iteratorList, remoteAliases);
assertEquals(3, iteratorList.size());
for (ShardIterator iterator : iteratorList) {
if (iterator.shardId().getIndexName().endsWith("foo")) {
assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1);
assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
assertNull(iterator.nextOrNull());
} else {
assertEquals(0, iterator.shardId().getId());
assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
assertNull(iterator.nextOrNull());
}
}
assertEquals(2, remoteAliases.size());
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("foo_id"));
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("bar_id"));
assertEquals(new TermsQueryBuilder("foo", "bar"), remoteAliases.get("foo_id").getQueryBuilder());
assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder());
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
@ -38,6 +39,10 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import java.io.IOException;
import java.util.ArrayList;
@ -76,26 +81,30 @@ public class SearchAsyncActionTests extends ESTestCase {
AtomicInteger contextIdGenerator = new AtomicInteger(0);
GroupShardsIterator shardsIter = getShardsIter("idx", randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) {
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(RemoteClusterService.REMOTE_CLUSTERS_SEEDS)), null) {
@Override
public void sendFreeContext(DiscoveryNode node, long contextId, SearchRequest request) {
public void sendFreeContext(Transport.Connection connection, long contextId, SearchRequest request) {
numFreedContext.incrementAndGet();
assertTrue(nodeToContextMap.containsKey(node));
assertTrue(nodeToContextMap.get(node).remove(contextId));
assertTrue(nodeToContextMap.containsKey(connection.getNode()));
assertTrue(nodeToContextMap.get(connection.getNode()).remove(contextId));
}
};
Map<String, DiscoveryNode> lookup = new HashMap<>();
lookup.put(primaryNode.getId(), primaryNode);
Map<String, Transport.Connection> lookup = new HashMap<>();
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction<TestSearchPhaseResult>(logger, transportService, lookup::get,
aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) {
TestSearchResponse response = new TestSearchResponse();
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) {
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener listener) {
assertTrue("shard: " + request.shardId() + " has been queried twice", response.queried.add(request.shardId()));
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), node);
Set<Long> ids = nodeToContextMap.computeIfAbsent(node, (n) -> new HashSet<>());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>());
ids.add(testSearchPhaseResult.id);
if (randomBoolean()) {
listener.onResponse(testSearchPhaseResult);
@ -109,7 +118,7 @@ public class SearchAsyncActionTests extends ESTestCase {
for (int i = 0; i < firstResults.length(); i++) {
TestSearchPhaseResult result = firstResults.get(i);
assertEquals(result.node.getId(), result.shardTarget().getNodeId());
sendReleaseSearchContext(result.id(), result.node);
sendReleaseSearchContext(result.id(), new MockConnection(result.node));
}
responseListener.onResponse(response);
latch.countDown();
@ -130,10 +139,13 @@ public class SearchAsyncActionTests extends ESTestCase {
latch.await();
assertNotNull(response.get());
assertFalse(nodeToContextMap.isEmpty());
assertTrue(nodeToContextMap.containsKey(primaryNode));
assertTrue(nodeToContextMap.toString(), nodeToContextMap.containsKey(primaryNode) || nodeToContextMap.containsKey(replicaNode));
assertEquals(shardsIter.size(), numFreedContext.get());
assertTrue(nodeToContextMap.get(primaryNode).toString(), nodeToContextMap.get(primaryNode).isEmpty());
if (nodeToContextMap.containsKey(primaryNode)) {
assertTrue(nodeToContextMap.get(primaryNode).toString(), nodeToContextMap.get(primaryNode).isEmpty());
} else {
assertTrue(nodeToContextMap.get(replicaNode).toString(), nodeToContextMap.get(replicaNode).isEmpty());
}
}
private GroupShardsIterator getShardsIter(String index, int numShards, boolean doReplicas, DiscoveryNode primaryNode,
@ -211,4 +223,29 @@ public class SearchAsyncActionTests extends ESTestCase {
}
}
public final class MockConnection implements Transport.Connection {
private final DiscoveryNode node;
public MockConnection(DiscoveryNode node) {
this.node = node;
}
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
throw new UnsupportedOperationException();
}
}
}

View File

@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -475,6 +477,33 @@ public class SettingTests extends ESTestCase {
assertFalse(listAffixSetting.match("foo"));
}
public void testGetAllConcreteSettings() {
Setting.AffixSetting<List<String>> listAffixSetting = Setting.affixKeySetting("foo.", "bar",
(key) -> Setting.listSetting(key, Collections.emptyList(), Function.identity(), Property.NodeScope));
Settings settings = Settings.builder()
.putArray("foo.1.bar", "1", "2")
.putArray("foo.2.bar", "3", "4", "5")
.putArray("foo.bar", "6")
.putArray("some.other", "6")
.putArray("foo.3.bar", "6")
.build();
Stream<Setting<List<String>>> allConcreteSettings = listAffixSetting.getAllConcreteSettings(settings);
Map<String, List<String>> collect = allConcreteSettings.collect(Collectors.toMap(Setting::getKey, (s) -> s.get(settings)));
assertEquals(3, collect.size());
assertEquals(Arrays.asList("1", "2"), collect.get("foo.1.bar"));
assertEquals(Arrays.asList("3", "4", "5"), collect.get("foo.2.bar"));
assertEquals(Arrays.asList("6"), collect.get("foo.3.bar"));
}
public void testAffixSettingsFailOnGet() {
Setting.AffixSetting<List<String>> listAffixSetting = Setting.affixKeySetting("foo.", "bar",
(key) -> Setting.listSetting(key, Collections.singletonList("testelement"), Function.identity(), Property.NodeScope));
expectThrows(UnsupportedOperationException.class, () -> listAffixSetting.get(Settings.EMPTY));
expectThrows(UnsupportedOperationException.class, () -> listAffixSetting.getRaw(Settings.EMPTY));
assertEquals(Collections.singletonList("testelement"), listAffixSetting.getDefault(Settings.EMPTY));
assertEquals("[\"testelement\"]", listAffixSetting.getDefaultRaw(Settings.EMPTY));
}
public void testMinMaxInt() {
Setting<Integer> integerSetting = Setting.intSetting("foo.bar", 1, 0, 10, Property.NodeScope);

View File

@ -52,6 +52,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
import static org.hamcrest.Matchers.containsString;
public class SimpleSearchIT extends ESIntegTestCase {
public void testSearchNullIndex() {
expectThrows(NullPointerException.class,
() -> client().prepareSearch((String) null).setQuery(QueryBuilders.termQuery("_id", "XXX1")).get());
@ -417,7 +418,8 @@ public class SimpleSearchIT extends ESIntegTestCase {
client().prepareSearch("idx").setQuery(QueryBuilders.regexpQuery("num", "34")).get();
fail("SearchPhaseExecutionException should have been thrown");
} catch (SearchPhaseExecutionException ex) {
assertThat(ex.getCause().getCause().getMessage(), containsString("Can only use regexp queries on keyword and text fields"));
assertThat(ex.getRootCause().getMessage(),
containsString("Can only use regexp queries on keyword and text fields"));
}
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
public class TransportActionProxyTests extends ESTestCase {
protected ThreadPool threadPool;
// we use always a non-alpha or beta version here otherwise minimumCompatibilityVersion will be different for the two used versions
private static final Version CURRENT_VERSION = Version.fromString(String.valueOf(Version.CURRENT.major) + ".0.0");
protected static final Version version0 = CURRENT_VERSION.minimumCompatibilityVersion();
protected DiscoveryNode nodeA;
protected MockTransportService serviceA;
protected static final Version version1 = Version.fromId(CURRENT_VERSION.id + 1);
protected DiscoveryNode nodeB;
protected MockTransportService serviceB;
protected DiscoveryNode nodeC;
protected MockTransportService serviceC;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
serviceA = buildService(version0); // this one supports dynamic tracer updates
nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceB = buildService(version1); // this one doesn't support dynamic tracer updates
nodeB = new DiscoveryNode("TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1);
serviceC = buildService(version1); // this one doesn't support dynamic tracer updates
nodeC = new DiscoveryNode("TS_C", serviceC.boundAddress().publishAddress(), emptyMap(), emptySet(), version1);
}
@Override
public void tearDown() throws Exception {
super.tearDown();
IOUtils.close(serviceA, serviceB, serviceC, () -> {
try {
terminate(threadPool);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private MockTransportService buildService(final Version version) {
MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, version, threadPool, null);
service.start();
service.acceptIncomingRequests();
return service;
}
public void testSendMessage() throws InterruptedException {
serviceA.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_A";
channel.sendResponse(response);
});
TransportActionProxy.registerProxyAction(serviceA, "/test", SimpleTestResponse::new);
serviceA.connectToNode(nodeB);
serviceB.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_B";
channel.sendResponse(response);
});
TransportActionProxy.registerProxyAction(serviceB, "/test", SimpleTestResponse::new);
serviceB.connectToNode(nodeC);
serviceC.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_C";
channel.sendResponse(response);
});
TransportActionProxy.registerProxyAction(serviceC, "/test", SimpleTestResponse::new);
CountDownLatch latch = new CountDownLatch(1);
serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("/test"), TransportActionProxy.wrapRequest(nodeC,
new SimpleTestRequest("TS_A")), new TransportResponseHandler<SimpleTestResponse>() {
@Override
public SimpleTestResponse newInstance() {
return new SimpleTestResponse();
}
@Override
public void handleResponse(SimpleTestResponse response) {
try {
assertEquals("TS_C", response.targetNode);
} finally {
latch.countDown();
}
}
@Override
public void handleException(TransportException exp) {
try {
throw new AssertionError(exp);
} finally {
latch.countDown();
}
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
latch.await();
}
public void testException() throws InterruptedException {
serviceA.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_A";
channel.sendResponse(response);
});
TransportActionProxy.registerProxyAction(serviceA, "/test", SimpleTestResponse::new);
serviceA.connectToNode(nodeB);
serviceB.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_B";
channel.sendResponse(response);
});
TransportActionProxy.registerProxyAction(serviceB, "/test", SimpleTestResponse::new);
serviceB.connectToNode(nodeC);
serviceC.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
throw new ElasticsearchException("greetings from TS_C");
});
TransportActionProxy.registerProxyAction(serviceC, "/test", SimpleTestResponse::new);
CountDownLatch latch = new CountDownLatch(1);
serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("/test"), TransportActionProxy.wrapRequest(nodeC,
new SimpleTestRequest("TS_A")), new TransportResponseHandler<SimpleTestResponse>() {
@Override
public SimpleTestResponse newInstance() {
return new SimpleTestResponse();
}
@Override
public void handleResponse(SimpleTestResponse response) {
try {
fail("expected exception");
} finally {
latch.countDown();
}
}
@Override
public void handleException(TransportException exp) {
try {
Throwable cause = ExceptionsHelper.unwrapCause(exp);
assertEquals("greetings from TS_C", cause.getMessage());
} finally {
latch.countDown();
}
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
latch.await();
}
public static class SimpleTestRequest extends TransportRequest {
String sourceNode;
public SimpleTestRequest(String sourceNode) {
this.sourceNode = sourceNode;
}
public SimpleTestRequest() {}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sourceNode = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sourceNode);
}
}
public static class SimpleTestResponse extends TransportResponse {
String targetNode;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
targetNode = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(targetNode);
}
}
}

View File

@ -145,6 +145,7 @@ buildRestTests.expectedUnconvertedCandidates = [
'reference/modules/scripting/security.asciidoc',
'reference/modules/scripting/using.asciidoc',
'reference/modules/transport.asciidoc',
'reference/modules/cross-cluster-search.asciidoc', // this is hart to test since we need 2 clusters -- maybe we can trick it into referencing itself...
'reference/query-dsl/exists-query.asciidoc',
'reference/query-dsl/function-score-query.asciidoc',
'reference/query-dsl/geo-shape-query.asciidoc',

View File

@ -77,6 +77,11 @@ The modules in this section are:
A tribe node joins one or more clusters and acts as a federated
client across them.
<<modules-cross-cluster-search, Cross cluster Search>>::
Cross cluster search enables executing search requests across more than one cluster without joining them and acts
as a federated client across them.
--

View File

@ -0,0 +1,117 @@
[[modules-cross-cluster-search]]
== Cross cluster search
The _cross cluster search_ feature allows any node to act as a federated client across
multiple clusters. In contrast to the _tribe_ feature, a _cross cluster search_ node won't
join the remote cluster, instead it connects to a remote cluster in a light fashion in order to execute
federated search requests.
The _cross cluster search_ feature works by configuring a remote cluster in the cluster state and connects only to a
limited number of nodes in the remote cluster. Each remote cluster is referenced by a name and a list of seed nodes.
Those seed nodes are used to discover other nodes eligible as so-called _gateway nodes_. Each node in a cluster that
has remote clusters configured connects to one or more _gateway nodes_ and uses them to federate search requests to
the remote cluster.
Remote clusters can either be configured as part of the `elasticsearch.yml` file or be dynamically updated via
the <<cluster settings API, cluster-update-settings>>. If a remote cluster is configured via `elasticsearch.yml` only
the nodes with the configuration set will be connecting to the remote cluster in which case federated search requests
will have to be sent specifically to those nodes. Remote clusters set via the
<<cluster settings API, cluster-update-settings>> will be available on every node in the cluster.
The `elasticsearch.yml` config file for a _cross cluster search_ node just needs to list the
remote clusters that should be connected to, for instance:
[source,yaml]
--------------------------------
search:
remote:
cluster_one: <1>
seeds: 127.0.0.1:9300
cluster_two: <1>
seeds: 127.0.0.1:9301
--------------------------------
<1> `cluster_one` and `cluster_two` are arbitrary cluster aliases representing the connection to each cluster.
These names are subsequently used to distinguish between local and remote indices.
[float]
=== Using cross cluster search
To search the `twitter` index on remote cluster `cluster_1` the index name must be prefixed with the cluster alias
separated by a `:` character:
[source,js]
--------------------------------------------------
POST /cluster_one:twitter/tweet/_search
{
"match_all": {}
}
--------------------------------------------------
In contrast to the `tribe` feature cross cluster search can also search indices with the same name on different
clusters:
[source,js]
--------------------------------------------------
POST /cluster_one:twitter,twitter/tweet/_search
{
"match_all": {}
}
--------------------------------------------------
Search results are disambiguated the same way as the indices are disambiguated in the request. Even if index names are
identical these indices will be treated as different indices when results are merged. All results retrieved from a
remote index
will be prefixed with their remote cluster name:
[source,js]
--------------------------------------------------
{
"took" : 89,
"timed_out" : false,
"_shards" : {
"total" : 10,
"successful" : 10,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "cluster_one:twitter",
"_type" : "tweet",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"user" : "kimchy",
"postDate" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
},
{
"_index" : "twitter",
"_type" : "tweet",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"user" : "kimchy",
"postDate" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
}
]
}
}
--------------------------------------------------
[float]
=== Cross cluster search settings
* `search.remote.connections_per_cluster` - the number of nodes to connect to per remote cluster. The default is `3`
* `search.remote.initial_connect_timeout` - the time to wait for remote connections to be established when the node
starts. The default is `30s`.
* `search.remote.node.attr` - a node attribute to filter out nodes that are eligible as a gateway node in the
remote cluster. For instance a node can have a node attribute `node.attr.gateway: true` such that only nodes with this
attribute will be connected to if `search.remote.node.attr` is set to `gateway`

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.test.RestIntegTestTask
apply plugin: 'elasticsearch.standalone-test'
task remoteClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
cluster {
distribution = 'zip'
numNodes = 2
clusterName = 'remote-cluster'
}
systemProperty 'tests.rest.suite', 'remote_cluster'
}
task mixedClusterTest(type: RestIntegTestTask) {
dependsOn(remoteClusterTest)
cluster {
distribution = 'zip'
setting 'search.remote.my_remote_cluster.seeds', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
setting 'search.remote.connections_per_cluster', 1
}
systemProperty 'tests.rest.suite', 'multi_cluster'
finalizedBy 'remoteClusterTest#node0.stop','remoteClusterTest#node1.stop'
}
task integTest {
dependsOn = [mixedClusterTest]
}
test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test
check.dependsOn(integTest)

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.upgrades;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import java.io.IOException;
@TimeoutSuite(millis = 5 * TimeUnits.MINUTE) // to account for slow as hell VMs
public class MultiClusterSearchYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
public MultiClusterSearchYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws IOException {
return createParameters();
}
}

View File

@ -0,0 +1,131 @@
---
"Index data and search on the mixed cluster":
- do:
indices.create:
index: test_index
body:
settings:
index:
number_of_shards: 2
number_of_replicas: 0
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "local_cluster", "filter_field": 1}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "local_cluster", "filter_field": 1}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- do:
search:
index: test_index,my_remote_cluster:test_index
body:
aggs:
cluster:
terms:
field: f1.keyword
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- length: { aggregations.cluster.buckets: 2 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- match: { aggregations.cluster.buckets.1.key: "local_cluster" }
- match: { aggregations.cluster.buckets.1.doc_count: 5 }
- do:
search:
index: test_index,my_remote_cluster:test_index
body:
query:
term:
f1: remote_cluster
aggs:
cluster:
terms:
field: f1.keyword
- match: { _shards.total: 5 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
- length: { aggregations.cluster.buckets: 1 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- do:
search:
index: my_remote_cluster:test_index
body:
aggs:
cluster:
terms:
field: f1.keyword
- match: { _shards.total: 3 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
- length: { aggregations.cluster.buckets: 1 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- do:
search:
index: test_index
body:
aggs:
cluster:
terms:
field: f1.keyword
- match: { _shards.total: 2 }
- match: { hits.total: 5}
- match: { hits.hits.0._index: "test_index"}
- length: { aggregations.cluster.buckets: 1 }
- match: { aggregations.cluster.buckets.0.key: "local_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 5 }
---
"Add transient remote cluster based on the preset cluster":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
- do:
search:
index: test_remote_cluster:test_index
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
---
"Search an filtered alias on the remote cluster":
- do:
search:
index: my_remote_cluster:aliased_test_index
- match: { _shards.total: 3 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:test_index" }

View File

@ -0,0 +1,60 @@
---
"Index data and search on the old cluster":
- do:
indices.create:
index: test_index
body:
settings:
index:
number_of_shards: 3
number_of_replicas: 0
aliases:
aliased_test_index: # we use this alias in the multi cluster test to very filtered aliases work
filter:
term:
filter_field : 1
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "remote_cluster", "filter_field": 1}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "remote_cluster", "filter_field": 1}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- do:
search:
index: test_index
body:
aggs:
cluster:
terms:
field: f1.keyword
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- length: { aggregations.cluster.buckets: 1 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- do:
search:
index: aliased_test_index
- match: { _shards.total: 3 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
- match: { hits.hits.0._index: "test_index" }

View File

@ -56,6 +56,7 @@ List projects = [
'qa:evil-tests',
'qa:no-bootstrap-tests',
'qa:rolling-upgrade',
'qa:multi-cluster-search',
'qa:smoke-test-client',
'qa:smoke-test-http',
'qa:smoke-test-ingest-with-all-dependencies',

View File

@ -763,4 +763,8 @@ public final class MockTransportService extends TransportService {
assert openConnections.size() == 0 : "still open connections: " + openConnections;
}
}
public DiscoveryNode getLocalDiscoNode() {
return this.getLocalNode();
}
}