From 5f2285a8b3abb3a7dc5130acaf1d3b5c2eb63d06 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 16 Jul 2020 00:52:35 +0300 Subject: [PATCH] EQL: Fix bug in returning results (#59673) Using serialization/deserialization when dealing with non-trivial documents causes the process to get stuck not to mention it is expensive. Use a much more simple approach at the expense of losing information (we're just interested in the source after all). (cherry picked from commit e1659822db7ce1390ba9bbfb21768e24a0907dff) --- .../execution/search/BasicQueryClient.java | 31 +++++-------------- .../execution/sequence/TumblingWindow.java | 2 ++ 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java index 1e7930b0576..671966ef071 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java @@ -8,17 +8,13 @@ package org.elasticsearch.xpack.eql.execution.search; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequest.Item; import org.elasticsearch.action.get.MultiGetRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.io.stream.InputStreamStreamInput; -import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.text.Text; -import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchShardTarget; @@ -30,8 +26,6 @@ import org.elasticsearch.xpack.eql.session.EqlSession; import org.elasticsearch.xpack.eql.session.Payload; import org.elasticsearch.xpack.ql.util.StringUtils; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.util.ArrayList; import java.util.List; @@ -94,12 +88,6 @@ public class BasicQueryClient implements QueryClient { List sequence = new ArrayList<>(listSize); - // copy streams - reused across the whole loop - PipedInputStream in = new PipedInputStream(); - PipedOutputStream out = new PipedOutputStream(in); - StreamOutput so = new OutputStreamStreamOutput(out); - StreamInput si = new InputStreamStreamInput(in); - int counter = 0; Text type = new Text("_doc"); for (MultiGetItemResponse mgr : r.getResponses()) { @@ -107,18 +95,15 @@ public class BasicQueryClient implements QueryClient { listener.onFailure(mgr.getFailure().getFailure()); return; } - // HACK: the only way to get GetResult is to serialize it and then load it back :( - mgr.getResponse().writeTo(so); - GetResult result = new GetResult(si); - SearchHit hit = new SearchHit(-1, result.getId(), type, result.getDocumentFields(), result.getMetadataFields()); - hit.sourceRef(result.internalSourceRef()); + GetResponse response = mgr.getResponse(); + SearchHit hit = new SearchHit(-1, response.getId(), type, null, null); + hit.sourceRef(response.getSourceInternal()); // need to create these objects to set the index - hit.shard(new SearchShardTarget(null, new ShardId(result.getIndex(), "", -1), null, null)); - - hit.setSeqNo(result.getSeqNo()); - hit.setPrimaryTerm(result.getPrimaryTerm()); - hit.version(result.getVersion()); + hit.shard(new SearchShardTarget(null, new ShardId(response.getIndex(), "", -1), null, null)); + hit.setSeqNo(response.getSeqNo()); + hit.setPrimaryTerm(response.getPrimaryTerm()); + hit.version(response.getVersion()); sequence.add(hit); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java index 848c3fe3526..984a9bf7ed0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java @@ -288,6 +288,8 @@ public class TumblingWindow implements Executable { private void payload(ActionListener listener) { List completed = matcher.completed(); + log.trace("Sending payload for [{}] sequences", completed.size()); + if (completed.isEmpty()) { listener.onResponse(new EmptyPayload(Type.SEQUENCE, timeTook())); matcher.clear();