EQL: Fetch sequence documents using Point-In-Time (#62469)

To preserve the PIT semantics, the retrieval of results has moved from
using multi-get to using an idsQuery.

(cherry picked from commit 1c2362fcf2be62ce568b3772924abce7331ef23c)
This commit is contained in:
Costin Leau 2020-09-16 20:01:16 +03:00 committed by Costin Leau
parent 5da922064f
commit ceaf96061c
6 changed files with 66 additions and 59 deletions

View File

@ -8,24 +8,28 @@ package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest.Item;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.ql.util.ActionListeners;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.index.query.QueryBuilders.idsQuery;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest;
public class BasicQueryClient implements QueryClient {
@ -64,48 +68,52 @@ public class BasicQueryClient implements QueryClient {
}
@Override
public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<GetResponse>>> listener) {
MultiGetRequestBuilder requestBuilder = client.prepareMultiGet();
// no need for real-time
requestBuilder.setRealtime(false)
.setRefresh(false);
public void fetchHits(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
IdsQueryBuilder idsQuery = idsQuery();
int sz = 0;
int innerListSize = 0;
Set<String> indices = new HashSet<>();
// associate each reference with its own
final Map<HitReference, List<Integer>> referenceToPosition = new HashMap<>();
int counter = 0;
for (List<HitReference> list : refs) {
sz = list.size();
innerListSize = list.size();
for (HitReference ref : list) {
Item item = new Item(ref.index(), ref.id());
// make sure to get the whole source
item.fetchSourceContext(FetchSourceContext.FETCH_SOURCE);
requestBuilder.add(item);
idsQuery.addIds(ref.id());
indices.add(ref.index());
// remember the reference position
List<Integer> positions = referenceToPosition.computeIfAbsent(ref, v -> new ArrayList<>(1));
positions.add(counter++);
}
}
final int listSize = sz;
client.multiGet(requestBuilder.request(), wrap(r -> {
List<List<GetResponse>> hits = new ArrayList<>(r.getResponses().length / listSize);
SearchSourceBuilder builder = SearchSourceBuilder.searchSource()
// make sure to fetch the whole source
.fetchSource(FetchSourceContext.FETCH_SOURCE)
.trackTotalHits(false)
.trackScores(false)
.query(idsQuery);
List<GetResponse> sequence = new ArrayList<>(listSize);
int counter = 0;
for (MultiGetItemResponse mgr : r.getResponses()) {
if (mgr.isFailed()) {
listener.onFailure(mgr.getFailure().getFailure());
return;
final int listSize = innerListSize;
final int topListSize = counter / listSize;
// pre-allocate the response matrix
@SuppressWarnings({"rawtypes", "unchecked"})
List<SearchHit>[] hits = new List[topListSize];
for (int i = 0; i < hits.length; i++) {
hits[i] = Arrays.asList(new SearchHit[listSize]);
}
final List<List<SearchHit>> seq = Arrays.asList(hits);
sequence.add(mgr.getResponse());
SearchRequest search = prepareRequest(client, builder, false, indices.toArray(new String[0]));
if (++counter == listSize) {
counter = 0;
hits.add(sequence);
sequence = new ArrayList<>(listSize);
search(search, ActionListeners.map(listener, r -> {
for (SearchHit hit : RuntimeUtils.searchHits(r)) {
List<Integer> positions = referenceToPosition.get(new HitReference(hit));
positions.forEach(pos -> seq.get(pos / listSize).set(pos % listSize, hit));
}
return seq;
}));
}
// send the results
listener.onResponse(hits);
}, listener::onFailure));
}
}

View File

@ -7,8 +7,8 @@
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import java.util.List;
@ -19,7 +19,7 @@ public interface QueryClient {
void query(QueryRequest request, ActionListener<SearchResponse> listener);
void get(Iterable<List<HitReference>> refs, ActionListener<List<List<GetResponse>>> listener);
default void close(ActionListener<Boolean> closed) {}
void fetchHits(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener);
}

View File

@ -64,7 +64,7 @@ public final class RuntimeUtils {
}
return extractors;
}
public static HitExtractor createExtractor(FieldExtraction ref, EqlConfiguration cfg) {
if (ref instanceof SearchHitFieldRef) {
SearchHitFieldRef f = (SearchHitFieldRef) ref;
@ -94,7 +94,7 @@ public final class RuntimeUtils {
throw new EqlIllegalArgumentException("Unexpected value reference {}", ref.getClass());
}
public static SearchRequest prepareRequest(Client client,
SearchSourceBuilder source,
@ -111,4 +111,4 @@ public final class RuntimeUtils {
public static List<SearchHit> searchHits(SearchResponse response) {
return Arrays.asList(response.getHits().getHits());
}
}
}

View File

@ -6,8 +6,8 @@
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event;
import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
@ -18,16 +18,16 @@ class SequencePayload extends AbstractPayload {
private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> values;
SequencePayload(List<Sequence> sequences, List<List<GetResponse>> docs, boolean timedOut, TimeValue timeTook) {
SequencePayload(List<Sequence> sequences, List<List<SearchHit>> docs, boolean timedOut, TimeValue timeTook) {
super(timedOut, timeTook);
values = new ArrayList<>(sequences.size());
for (int i = 0; i < sequences.size(); i++) {
Sequence s = sequences.get(i);
List<GetResponse> hits = docs.get(i);
List<SearchHit> hits = docs.get(i);
List<Event> events = new ArrayList<>(hits.size());
for (GetResponse hit : hits) {
events.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceAsBytesRef()));
for (SearchHit hit : hits) {
events.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceRef()));
}
values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asList(), events));
}

View File

@ -301,8 +301,9 @@ public class TumblingWindow implements Executable {
return;
}
client.get(hits(completed), ActionListeners.map(listener, hits -> {
SequencePayload payload = new SequencePayload(completed, hits, false, timeTook());
// get results through search (to keep using PIT)
client.fetchHits(hits(completed), ActionListeners.map(listener, listOfHits -> {
SequencePayload payload = new SequencePayload(completed, listOfHits, false, timeTook());
close(listener);
return payload;
}));

View File

@ -12,7 +12,6 @@ import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.action.search.SearchResponseSections;
@ -82,7 +81,7 @@ public class SequenceSpecTests extends ESTestCase {
return (long) hit.docId();
}
}
static class KeyExtractor extends EmptyHitExtractor {
@Override
public String extract(SearchHit hit) {
@ -172,7 +171,7 @@ public class SequenceSpecTests extends ESTestCase {
r.searchSource().size(Integer.MAX_VALUE);
}
Map<Integer, Tuple<String, String>> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap();
EventsAsHits eah = new EventsAsHits(evs);
SearchHits searchHits = new SearchHits(eah.hits.toArray(new SearchHit[0]), new TotalHits(eah.hits.size(), Relation.EQUAL_TO),
0.0f);
@ -182,8 +181,7 @@ public class SequenceSpecTests extends ESTestCase {
}
@Override
public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<GetResponse>>> listener) {
//no-op
public void fetchHits(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
}
}
@ -202,7 +200,7 @@ public class SequenceSpecTests extends ESTestCase {
public static Iterable<Object[]> parameters() throws Exception {
return SeriesUtils.readSpec("/sequences.series-spec");
}
public void test() {
int stages = events.size();
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(stages);
@ -214,7 +212,7 @@ public class SequenceSpecTests extends ESTestCase {
// convert the results through a test specific payload
SequenceMatcher matcher = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null);
QueryClient testClient = new TestQueryClient();
TumblingWindow window = new TumblingWindow(testClient, criteria, null, matcher);
@ -229,7 +227,7 @@ public class SequenceSpecTests extends ESTestCase {
String prefix = "Line " + lineNumber + ":";
assertNotNull(prefix + "no matches found", seq);
assertEquals(prefix + "different sequences matched ", matches.size(), seq.size());
for (int i = 0; i < seq.size(); i++) {
Sequence s = seq.get(i);
List<String> match = matches.get(i);
@ -253,4 +251,4 @@ public class SequenceSpecTests extends ESTestCase {
assertEquals(prefix, match.toString(), keys + values);
}
}
}
}