Prevent `can_match` requests from sending to incompatible nodes (#25705)
With cross cluster search we can potentially proxy `can_match` requests to nodes that don't have the endpoint. This might not cause any problem from a functional perspecitve but will cause ugly error messages on the target node. This commit will cause an IAE if we try to talk to an incompatible node via a proxy. Relates to #25704
This commit is contained in:
parent
11477a608f
commit
0e5d324c36
|
@ -105,8 +105,18 @@ public class SearchTransportService extends AbstractComponent {
|
||||||
|
|
||||||
public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
|
public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
|
||||||
ActionListener<CanMatchResponse> listener) {
|
ActionListener<CanMatchResponse> listener) {
|
||||||
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
|
if (connection.getNode().getVersion().onOrAfter(Version.CURRENT.minimumCompatibilityVersion())) {
|
||||||
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
|
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
|
||||||
|
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
|
||||||
|
} else {
|
||||||
|
// this might look weird but if we are in a CrossClusterSearch environment we can get a connection
|
||||||
|
// to a pre 5.latest node which is proxied by a 5.latest node under the hood since we are only compatible with 5.latest
|
||||||
|
// instead of sending the request we shortcut it here and let the caller deal with this -- see #25704
|
||||||
|
// also failing the request instead of returning a fake answer might trigger a retry on a replica which might be on a
|
||||||
|
// compatible node
|
||||||
|
throw new IllegalArgumentException("can_match is not supported on pre "+ Version.CURRENT.minimumCompatibilityVersion() +
|
||||||
|
" nodes");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
|
public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
|
||||||
|
|
|
@ -860,7 +860,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
} else {
|
} else {
|
||||||
AggregatorFactories.Builder aggregations = source.aggregations();
|
AggregatorFactories.Builder aggregations = source.aggregations();
|
||||||
if (aggregations != null) {
|
if (aggregations != null) {
|
||||||
if (aggregations.mustVisiteAllDocs()) {
|
if (aggregations.mustVisitAllDocs()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -286,7 +286,7 @@ public class AggregatorFactories {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean mustVisiteAllDocs() {
|
public boolean mustVisitAllDocs() {
|
||||||
for (AggregationBuilder builder : aggregationBuilders) {
|
for (AggregationBuilder builder : aggregationBuilders) {
|
||||||
if (builder instanceof GlobalAggregationBuilder) {
|
if (builder instanceof GlobalAggregationBuilder) {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.search.internal.AliasFilter;
|
import org.elasticsearch.search.internal.AliasFilter;
|
||||||
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
|
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.test.VersionUtils;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -102,6 +103,18 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testOldNodesTriggerException() {
|
||||||
|
SearchTransportService searchTransportService = new SearchTransportService(
|
||||||
|
Settings.builder().put("search.remote.connect", false).build(), null);
|
||||||
|
DiscoveryNode node = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), VersionUtils.getPreviousVersion(Version
|
||||||
|
.CURRENT.minimumCompatibilityVersion()));
|
||||||
|
SearchAsyncActionTests.MockConnection mockConnection = new SearchAsyncActionTests.MockConnection(node);
|
||||||
|
IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class,
|
||||||
|
() -> searchTransportService.sendCanMatch(mockConnection, null, null, null));
|
||||||
|
assertEquals("can_match is not supported on pre " + Version
|
||||||
|
.CURRENT.minimumCompatibilityVersion() + " nodes", illegalArgumentException.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
public void testFilterWithFailure() throws InterruptedException {
|
public void testFilterWithFailure() throws InterruptedException {
|
||||||
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(),
|
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(),
|
||||||
System::nanoTime);
|
System::nanoTime);
|
||||||
|
@ -117,13 +130,18 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
||||||
@Override
|
@Override
|
||||||
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
|
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
|
||||||
ActionListener<CanMatchResponse> listener) {
|
ActionListener<CanMatchResponse> listener) {
|
||||||
new Thread(() -> {
|
boolean throwException = request.shardId().id() != 0;
|
||||||
if (request.shardId().id() == 0) {
|
if (throwException && randomBoolean()) {
|
||||||
listener.onResponse(new CanMatchResponse(shard1));
|
throw new IllegalArgumentException("boom");
|
||||||
} else {
|
} else {
|
||||||
listener.onFailure(new NullPointerException());
|
new Thread(() -> {
|
||||||
}
|
if (throwException == false) {
|
||||||
}).start();
|
listener.onResponse(new CanMatchResponse(shard1));
|
||||||
|
} else {
|
||||||
|
listener.onFailure(new NullPointerException());
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue