Cross Cluster Search: preserve remote status code (#30976)
In case an error is returned when calling search_shards on a remote cluster, which will lead to throwing an exception in the coordinating node, we should make sure that the status code returned by the coordinating node is the same as the one returned by the remote cluster. Up until now a 500 - Internal Server Error was always returned. This commit changes this behaviour so that for instance if an index is not found, which causes an 404, a 404 is also returned by the coordinating node to the client. Closes #27461
This commit is contained in:
parent
31351ab880
commit
70749e01c4
|
@ -1,7 +1,7 @@
|
|||
---
|
||||
"Search with missing remote index pattern":
|
||||
- do:
|
||||
catch: "request"
|
||||
catch: "missing"
|
||||
search:
|
||||
index: "my_remote_cluster:foo"
|
||||
|
||||
|
@ -34,7 +34,7 @@
|
|||
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
|
||||
|
||||
- do:
|
||||
catch: "request"
|
||||
catch: "missing"
|
||||
search:
|
||||
index: "my_remote_cluster:test_index,my_remote_cluster:foo"
|
||||
body:
|
||||
|
|
|
@ -215,7 +215,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
|
||||
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
|
||||
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
|
||||
final AtomicReference<TransportException> transportException = new AtomicReference<>();
|
||||
final AtomicReference<RemoteTransportException> transportException = new AtomicReference<>();
|
||||
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
|
||||
final String clusterName = entry.getKey();
|
||||
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
|
||||
|
@ -232,7 +232,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
|
||||
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
|
||||
if (responsesCountDown.countDown()) {
|
||||
TransportException exception = transportException.get();
|
||||
RemoteTransportException exception = transportException.get();
|
||||
if (exception == null) {
|
||||
listener.onResponse(searchShardsResponses);
|
||||
} else {
|
||||
|
@ -243,8 +243,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
|
||||
clusterName + "]", e);
|
||||
RemoteTransportException exception = new RemoteTransportException("error while communicating with remote cluster ["
|
||||
+ clusterName + "]", e);
|
||||
if (transportException.compareAndSet(null, exception) == false) {
|
||||
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
|
||||
current.addSuppressed(previous);
|
||||
|
|
|
@ -19,15 +19,9 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.Build;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
|
||||
|
@ -42,17 +36,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.http.HttpInfo;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.mocksocket.MockServerSocket;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
@ -121,8 +114,12 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
try {
|
||||
newService.registerRequestHandler(ClusterSearchShardsAction.NAME,ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
|
||||
(request, channel) -> {
|
||||
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
|
||||
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
|
||||
if ("index_not_found".equals(request.preference())) {
|
||||
channel.sendResponse(new IndexNotFoundException("index"));
|
||||
} else {
|
||||
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
|
||||
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
|
||||
}
|
||||
});
|
||||
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
|
||||
(request, channel) -> {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
@ -469,7 +470,6 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
|||
assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage());
|
||||
}
|
||||
{
|
||||
|
||||
logger.info("closing all source nodes");
|
||||
// close all targets and check for the transport level failure path
|
||||
IOUtils.close(c1N1, c1N2, c2N1, c2N2);
|
||||
|
@ -559,7 +559,20 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
|||
assertEquals(1, shardsResponse.getNodes().length);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
|
||||
AtomicReference<Exception> failure = new AtomicReference<>();
|
||||
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found",
|
||||
null, remoteIndicesByCluster,
|
||||
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
|
||||
assertTrue(latch.await(1, TimeUnit.SECONDS));
|
||||
assertNull(response.get());
|
||||
assertNotNull(failure.get());
|
||||
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
|
||||
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
|
||||
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
|
||||
}
|
||||
int numDisconnectedClusters = randomIntBetween(1, numClusters);
|
||||
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
|
||||
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
|
||||
|
@ -593,8 +606,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
|||
assertTrue(latch.await(1, TimeUnit.SECONDS));
|
||||
assertNull(response.get());
|
||||
assertNotNull(failure.get());
|
||||
assertThat(failure.get(), instanceOf(TransportException.class));
|
||||
assertThat(failure.get().getMessage(), containsString("unable to communicate with remote cluster"));
|
||||
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
|
||||
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
|
||||
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
|
||||
}
|
||||
|
||||
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
|
||||
|
|
|
@ -56,13 +56,13 @@ teardown:
|
|||
- match: { hits.total: 0 }
|
||||
|
||||
- do:
|
||||
catch: "request"
|
||||
catch: "forbidden"
|
||||
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
|
||||
search:
|
||||
index: "*:foo-bar"
|
||||
|
||||
- do:
|
||||
catch: "request"
|
||||
catch: "forbidden"
|
||||
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
|
||||
search:
|
||||
index: "my_remote_cluster:foo-bar"
|
||||
|
|
Loading…
Reference in New Issue