EQL: Fix bug in returning results (#59673)

Using serialization/deserialization when dealing with non-trivial
documents causes the process to get stuck not to mention it is expensive.
Use a much more simple approach at the expense of losing information
(we're just interested in the source after all).

(cherry picked from commit e1659822db7ce1390ba9bbfb21768e24a0907dff)
This commit is contained in:
Costin Leau 2020-07-16 00:52:35 +03:00 committed by Costin Leau
parent 43481441e9
commit 5f2285a8b3
2 changed files with 10 additions and 23 deletions

View File

@ -8,17 +8,13 @@ package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest.Item;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
@ -30,8 +26,6 @@ import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.List;
@ -94,12 +88,6 @@ public class BasicQueryClient implements QueryClient {
List<SearchHit> sequence = new ArrayList<>(listSize);
// copy streams - reused across the whole loop
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
StreamOutput so = new OutputStreamStreamOutput(out);
StreamInput si = new InputStreamStreamInput(in);
int counter = 0;
Text type = new Text("_doc");
for (MultiGetItemResponse mgr : r.getResponses()) {
@ -107,18 +95,15 @@ public class BasicQueryClient implements QueryClient {
listener.onFailure(mgr.getFailure().getFailure());
return;
}
// HACK: the only way to get GetResult is to serialize it and then load it back :(
mgr.getResponse().writeTo(so);
GetResult result = new GetResult(si);
SearchHit hit = new SearchHit(-1, result.getId(), type, result.getDocumentFields(), result.getMetadataFields());
hit.sourceRef(result.internalSourceRef());
GetResponse response = mgr.getResponse();
SearchHit hit = new SearchHit(-1, response.getId(), type, null, null);
hit.sourceRef(response.getSourceInternal());
// need to create these objects to set the index
hit.shard(new SearchShardTarget(null, new ShardId(result.getIndex(), "", -1), null, null));
hit.setSeqNo(result.getSeqNo());
hit.setPrimaryTerm(result.getPrimaryTerm());
hit.version(result.getVersion());
hit.shard(new SearchShardTarget(null, new ShardId(response.getIndex(), "", -1), null, null));
hit.setSeqNo(response.getSeqNo());
hit.setPrimaryTerm(response.getPrimaryTerm());
hit.version(response.getVersion());
sequence.add(hit);

View File

@ -288,6 +288,8 @@ public class TumblingWindow implements Executable {
private void payload(ActionListener<Payload> listener) {
List<Sequence> completed = matcher.completed();
log.trace("Sending payload for [{}] sequences", completed.size());
if (completed.isEmpty()) {
listener.onResponse(new EmptyPayload(Type.SEQUENCE, timeTook()));
matcher.clear();