From 0e5d324c36b66668e7d5b5bdf679cf5640dd8253 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 13 Jul 2017 14:59:41 +0200 Subject: [PATCH] Prevent `can_match` requests from sending to incompatible nodes (#25705) With cross cluster search we can potentially proxy `can_match` requests to nodes that don't have the endpoint. This might not cause any problem from a functional perspecitve but will cause ugly error messages on the target node. This commit will cause an IAE if we try to talk to an incompatible node via a proxy. Relates to #25704 --- .../action/search/SearchTransportService.java | 14 ++++++-- .../elasticsearch/search/SearchService.java | 2 +- .../aggregations/AggregatorFactories.java | 2 +- .../CanMatchPreFilterSearchPhaseTests.java | 32 +++++++++++++++---- 4 files changed, 39 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index bbe9f65fca3..3bd1b125c9f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -105,8 +105,18 @@ public class SearchTransportService extends AbstractComponent { public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final ActionListener listener) { - transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task, - TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new)); + if (connection.getNode().getVersion().onOrAfter(Version.CURRENT.minimumCompatibilityVersion())) { + transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task, + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new)); + } else { + // this might look weird but if we are in a CrossClusterSearch environment we can get a connection + // to a pre 5.latest node which is proxied by a 5.latest node under the hood since we are only compatible with 5.latest + // instead of sending the request we shortcut it here and let the caller deal with this -- see #25704 + // also failing the request instead of returning a fake answer might trigger a retry on a replica which might be on a + // compatible node + throw new IllegalArgumentException("can_match is not supported on pre "+ Version.CURRENT.minimumCompatibilityVersion() + + " nodes"); + } } public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener listener) { diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 707312cd281..662638ae1a5 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -860,7 +860,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } else { AggregatorFactories.Builder aggregations = source.aggregations(); if (aggregations != null) { - if (aggregations.mustVisiteAllDocs()) { + if (aggregations.mustVisitAllDocs()) { return false; } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index f995c484d82..579ff4bdad1 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -286,7 +286,7 @@ public class AggregatorFactories { } } - public boolean mustVisiteAllDocs() { + public boolean mustVisitAllDocs() { for (AggregationBuilder builder : aggregationBuilders) { if (builder instanceof GlobalAggregationBuilder) { return true; diff --git a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 7c393b41c07..8d2ef87fcbf 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.transport.Transport; import java.io.IOException; @@ -102,6 +103,18 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { } } + public void testOldNodesTriggerException() { + SearchTransportService searchTransportService = new SearchTransportService( + Settings.builder().put("search.remote.connect", false).build(), null); + DiscoveryNode node = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), VersionUtils.getPreviousVersion(Version + .CURRENT.minimumCompatibilityVersion())); + SearchAsyncActionTests.MockConnection mockConnection = new SearchAsyncActionTests.MockConnection(node); + IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, + () -> searchTransportService.sendCanMatch(mockConnection, null, null, null)); + assertEquals("can_match is not supported on pre " + Version + .CURRENT.minimumCompatibilityVersion() + " nodes", illegalArgumentException.getMessage()); + } + public void testFilterWithFailure() throws InterruptedException { final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); @@ -117,13 +130,18 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, ActionListener listener) { - new Thread(() -> { - if (request.shardId().id() == 0) { - listener.onResponse(new CanMatchResponse(shard1)); - } else { - listener.onFailure(new NullPointerException()); - } - }).start(); + boolean throwException = request.shardId().id() != 0; + if (throwException && randomBoolean()) { + throw new IllegalArgumentException("boom"); + } else { + new Thread(() -> { + if (throwException == false) { + listener.onResponse(new CanMatchResponse(shard1)); + } else { + listener.onFailure(new NullPointerException()); + } + }).start(); + } } };