SQL: Fix bug caused by empty composites (#30343)
When dealing with filtering, a composite aggregation might return empty buckets (which have been filtered) which gets sent as is to the client. Unfortunately this interprets the response as no more data instead of retrying. This now has changed and the listener keeps retrying until either the query has ended or data passes the filter. Fix #30292
This commit is contained in:
parent
3b260dcfc1
commit
65dbc17510
|
@ -113,12 +113,36 @@ public class CompositeAggregationCursor implements Cursor {
|
|||
|
||||
SearchRequest search = Querier.prepareRequest(client, query, cfg.pageTimeout(), indices);
|
||||
|
||||
client.search(search, ActionListener.wrap(r -> {
|
||||
client.search(search, new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchResponse r) {
|
||||
try {
|
||||
// retry
|
||||
if (shouldRetryDueToEmptyPage(r)) {
|
||||
CompositeAggregationCursor.updateCompositeAfterKey(r, search.source());
|
||||
client.search(search, this);
|
||||
return;
|
||||
}
|
||||
|
||||
updateCompositeAfterKey(r, query);
|
||||
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit,
|
||||
serializeQuery(query), indices);
|
||||
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, serializeQuery(query), indices);
|
||||
listener.onResponse(rowSet);
|
||||
}, listener::onFailure));
|
||||
} catch (Exception ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static boolean shouldRetryDueToEmptyPage(SearchResponse response) {
|
||||
CompositeAggregation composite = getComposite(response);
|
||||
// if there are no buckets but a next page, go fetch it instead of sending an empty response to the client
|
||||
return composite != null && composite.getBuckets().isEmpty() && composite.afterKey() != null && !composite.afterKey().isEmpty();
|
||||
}
|
||||
|
||||
static CompositeAggregation getComposite(SearchResponse response) {
|
||||
|
|
|
@ -206,8 +206,15 @@ public class Querier {
|
|||
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
|
||||
// there are some results
|
||||
if (response.getAggregations().asList().size() > 0) {
|
||||
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
|
||||
|
||||
// retry
|
||||
if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) {
|
||||
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
|
||||
client.search(request, this);
|
||||
return;
|
||||
}
|
||||
|
||||
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
|
||||
byte[] nextSearch = null;
|
||||
try {
|
||||
nextSearch = CompositeAggregationCursor.serializeQuery(request.source());
|
||||
|
|
|
@ -5,13 +5,11 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.qa.sql.security;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.xpack.qa.sql.jdbc.SqlSpecTestCase;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30292")
|
||||
public class JdbcSqlSpecIT extends SqlSpecTestCase {
|
||||
public JdbcSqlSpecIT(String fileName, String groupName, String testName, Integer lineNumber, String query) {
|
||||
super(fileName, groupName, testName, lineNumber, query);
|
||||
|
|
Loading…
Reference in New Issue