EQL: Make queries using Point-In-Time rely on index filtering (#63161)

Point-In-Time queries cannot be ran on individual indices but on all.
Thus all PIT queries move their index from the request level to a filter
so this condition is fulfilled while keeping the query scoped
accordingly.

Fix #63132

(cherry picked from commit c8eb4f724d5dcc0fcc172c6219ecfbc1dc1fbbae)
This commit is contained in:
Costin Leau 2020-10-05 14:13:50 +03:00 committed by Costin Leau
parent b4a1199e87
commit 4f593bdd69
1 changed files with 29 additions and 6 deletions

View File

@ -11,8 +11,14 @@ import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeResponse; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeResponse;
@ -24,6 +30,8 @@ import org.elasticsearch.xpack.ql.index.IndexResolver;
import java.util.function.Function; import java.util.function.Function;
import static org.elasticsearch.action.ActionListener.wrap; import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
import static org.elasticsearch.xpack.ql.util.ActionListeners.map; import static org.elasticsearch.xpack.ql.util.ActionListeners.map;
/** /**
@ -54,11 +62,10 @@ public class PITAwareQueryClient extends BasicQueryClient {
} }
} }
private void searchWithPIT(SearchRequest search, ActionListener<SearchResponse> listener) { private void searchWithPIT(SearchRequest request, ActionListener<SearchResponse> listener) {
// don't increase the keep alive makeRequestPITCompatible(request);
search.source().pointInTimeBuilder(new PointInTimeBuilder(pitId));
// get the pid on each response // get the pid on each response
super.search(search, pitListener(SearchResponse::pointInTimeId, listener)); super.search(request, pitListener(SearchResponse::pointInTimeId, listener));
} }
@Override @Override
@ -72,9 +79,8 @@ public class PITAwareQueryClient extends BasicQueryClient {
} }
private void searchWithPIT(MultiSearchRequest search, ActionListener<MultiSearchResponse> listener) { private void searchWithPIT(MultiSearchRequest search, ActionListener<MultiSearchResponse> listener) {
// don't increase the keep alive
for (SearchRequest request : search.requests()) { for (SearchRequest request : search.requests()) {
request.source().pointInTimeBuilder(new PointInTimeBuilder(pitId)); makeRequestPITCompatible(request);
} }
// get the pid on each request // get the pid on each request
@ -91,6 +97,23 @@ public class PITAwareQueryClient extends BasicQueryClient {
}, listener)); }, listener));
} }
private void makeRequestPITCompatible(SearchRequest request) {
SearchSourceBuilder source = request.source();
// don't increase the keep alive
source.pointInTimeBuilder(new PointInTimeBuilder(pitId));
// move the indices from the search request to a index filter - see #63132
String[] indices = request.indices();
if (CollectionUtils.isEmpty(indices) == false) {
request.indices(Strings.EMPTY_ARRAY);
BoolQueryBuilder indexFilter = boolQuery().filter(termsQuery(GetResult._INDEX, indices));
QueryBuilder query = source.query();
if (query != null) {
indexFilter.must(query);
}
source.query(indexFilter);
}
}
// listener handing the extraction of new PIT and closing in case of exceptions // listener handing the extraction of new PIT and closing in case of exceptions
private <Response> ActionListener<Response> pitListener(Function<Response, String> pitIdExtractor, ActionListener<Response> listener) { private <Response> ActionListener<Response> pitListener(Function<Response, String> pitIdExtractor, ActionListener<Response> listener) {
return wrap(r -> { return wrap(r -> {