EQL: Improve request logging (#64206)

Add logging to multi-search queries
Log response count

(cherry picked from commit ee9b9d58f68e2d545d5d841e2f683ec4e96f79e6)
(cherry picked from commit 02a4c6b83475cebe715311eeba123ad6fc8d6ba1)
This commit is contained in:
Costin Leau 2020-10-27 16:17:25 +02:00 committed by Costin Leau
parent 2363c4be4b
commit 6ca0b6ae6d
3 changed files with 50 additions and 53 deletions

View File

@ -1,49 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.logSearchResponse;
public class BasicListener implements ActionListener<SearchResponse> {
private static final Logger log = RuntimeUtils.QUERY_LOG;
private final ActionListener<SearchResponse> listener;
public BasicListener(ActionListener<SearchResponse> listener) {
this.listener = listener;
}
@Override
public void onResponse(SearchResponse response) {
try {
ShardSearchFailure[] failures = response.getShardFailures();
if (CollectionUtils.isEmpty(failures) == false) {
listener.onFailure(new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause()));
} else {
if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}
listener.onResponse(response);
}
} catch (Exception ex) {
onFailure(ex);
}
}
@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);
}
}

View File

@ -32,7 +32,9 @@ import java.util.Map;
import java.util.StringJoiner;
import static org.elasticsearch.index.query.QueryBuilders.idsQuery;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.multiSearchLogListener;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchLogListener;
public class BasicQueryClient implements QueryClient {
@ -55,7 +57,7 @@ public class BasicQueryClient implements QueryClient {
searchSource.timeout(cfg.requestTimeout());
SearchRequest search = prepareRequest(client, searchSource, false, indices);
search(search, new BasicListener(listener));
search(search, searchLogListener(listener, log));
}
protected void search(SearchRequest search, ActionListener<SearchResponse> listener) {
@ -85,7 +87,7 @@ public class BasicQueryClient implements QueryClient {
log.trace("About to execute multi-queries {} on {}", sj, indices);
}
client.multiSearch(search, listener);
client.multiSearch(search, multiSearchLogListener(listener, log));
}
@Override

View File

@ -8,9 +8,13 @@ package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
@ -44,7 +48,45 @@ public final class RuntimeUtils {
private RuntimeUtils() {}
static void logSearchResponse(SearchResponse response, Logger logger) {
public static ActionListener<SearchResponse> searchLogListener(ActionListener<SearchResponse> listener, Logger log) {
return ActionListener.wrap(response -> {
ShardSearchFailure[] failures = response.getShardFailures();
if (CollectionUtils.isEmpty(failures) == false) {
listener.onFailure(new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause()));
return;
}
if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}
listener.onResponse(response);
}, listener::onFailure);
}
public static ActionListener<MultiSearchResponse> multiSearchLogListener(ActionListener<MultiSearchResponse> listener, Logger log) {
return ActionListener.wrap(items -> {
for (MultiSearchResponse.Item item : items) {
Exception failure = item.getFailure();
SearchResponse response = item.getResponse();
if (failure == null) {
ShardSearchFailure[] failures = response.getShardFailures();
if (CollectionUtils.isEmpty(failures) == false) {
failure = new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause());
}
}
if (failure != null) {
listener.onFailure(failure);
return;
}
if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}
}
listener.onResponse(items);
}, listener::onFailure);
}
private static void logSearchResponse(SearchResponse response, Logger logger) {
List<Aggregation> aggs = Collections.emptyList();
if (response.getAggregations() != null) {
aggs = response.getAggregations().asList();
@ -54,8 +96,10 @@ public final class RuntimeUtils {
aggsNames.append(aggs.get(i).getName() + (i + 1 == aggs.size() ? "" : ", "));
}
SearchHit[] hits = response.getHits().getHits();
int count = hits != null ? hits.length : 0;
logger.trace("Got search response [hits {}, {} aggregations: [{}], {} failed shards, {} skipped shards, "
+ "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits(), aggs.size(),
+ "{} successful shards, {} total shards, took {}, timed out [{}]]", count, aggs.size(),
aggsNames, response.getFailedShards(), response.getSkippedShards(), response.getSuccessfulShards(),
response.getTotalShards(), response.getTook(), response.isTimedOut());
}