EQL: Use Point In Time inside sequences (#62276)

Use the newly introduced PIT API to have a consistent view of the data
while doing sequence matching, which involves multiple calls, aka
repeatable reads and thus avoid race conditions or any in-flight updates
on the data.

(cherry picked from commit daa72fc3c71fd36afb55278021ff6bbc591ef148)
This commit is contained in:
Costin Leau 2020-09-15 15:22:41 +03:00 committed by Costin Leau
parent faf96c175e
commit 03d2395183
14 changed files with 212 additions and 67 deletions

View File

@ -5,6 +5,7 @@ dependencies {
api project(':test:framework')
api project(path: xpackModule('core'), configuration: 'default')
api project(path: xpackModule('core'), configuration: 'testArtifacts')
api project(path: xpackModule('ql'), configuration: 'testArtifacts')
// TOML parser for EqlActionIT tests
api 'io.ous:jtoml:2.0.0'

View File

@ -6,7 +6,6 @@
package org.elasticsearch.test.eql;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.Build;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
@ -14,13 +13,14 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.BeforeClass;
import org.junit.After;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.ql.TestUtils.assertNoSearchContexts;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -31,6 +31,11 @@ public abstract class CommonEqlRestTestCase extends ESRestTestCase {
private static final String defaultValidationIndexName = "eql_search_validation_test";
private static final String validQuery = "process where user = 'SYSTEM'";
@After
public void checkSearchContent() throws Exception {
assertNoSearchContexts(client());
}
private static final String[][] testBadRequests = {
{null, "request body or source parameter is required"},
{"{}", "query is null or empty"},
@ -43,14 +48,9 @@ public abstract class CommonEqlRestTestCase extends ESRestTestCase {
{"{\"query\": \"" + validQuery + "\", \"filter\": {}}", "query malformed, empty clause found"}
};
@BeforeClass
public static void checkForSnapshot() {
assumeTrue("Only works on snapshot builds for now", Build.CURRENT.isSnapshot());
}
public void testBadRequests() throws Exception {
createIndex(defaultValidationIndexName, Settings.EMPTY);
final String contentType = "application/json";
for (String[] test : testBadRequests) {
final String endpoint = "/" + defaultValidationIndexName + "/_eql/search";
@ -64,7 +64,7 @@ public abstract class CommonEqlRestTestCase extends ESRestTestCase {
assertThat(EntityUtils.toString(response.getEntity()), containsString(test[1]));
assertThat(response.getStatusLine().getStatusCode(), is(400));
}
deleteIndex(defaultValidationIndexName);
}

View File

@ -9,8 +9,8 @@ package org.elasticsearch.xpack.eql.execution.assembler;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.PITAwareQueryClient;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
@ -49,7 +49,7 @@ public class ExecutionManager {
TimeValue maxSpan,
Limit limit) {
FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();
boolean descending = direction == OrderDirection.DESC;
// fields
@ -61,7 +61,7 @@ public class ExecutionManager {
// secondary criteriam
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(plans.size() - 1);
// build a criterion for each query
for (int i = 0; i < plans.size(); i++) {
List<Attribute> keys = listOfKeys.get(i);
@ -85,11 +85,11 @@ public class ExecutionManager {
}
}
}
int completionStage = criteria.size() - 1;
SequenceMatcher matcher = new SequenceMatcher(completionStage, descending, maxSpan, limit);
TumblingWindow w = new TumblingWindow(new BasicQueryClient(session),
TumblingWindow w = new TumblingWindow(new PITAwareQueryClient(session),
criteria.subList(0, completionStage),
criteria.get(completionStage),
matcher);
@ -116,4 +116,4 @@ public class ExecutionManager {
}
return extractors;
}
}
}

View File

@ -33,8 +33,8 @@ public class BasicQueryClient implements QueryClient {
private static final Logger log = RuntimeUtils.QUERY_LOG;
private final EqlConfiguration cfg;
private final Client client;
private final String[] indices;
final Client client;
final String[] indices;
public BasicQueryClient(EqlSession eqlSession) {
this.cfg = eqlSession.configuration();
@ -56,7 +56,11 @@ public class BasicQueryClient implements QueryClient {
}
SearchRequest search = prepareRequest(client, searchSource, false, indices);
client.search(search, new BasicListener(listener));
search(search, new BasicListener(listener));
}
protected void search(SearchRequest search, ActionListener<SearchResponse> listener) {
client.search(search, listener);
}
@Override
@ -77,11 +81,11 @@ public class BasicQueryClient implements QueryClient {
requestBuilder.add(item);
}
}
final int listSize = sz;
client.multiGet(requestBuilder.request(), wrap(r -> {
List<List<GetResponse>> hits = new ArrayList<>(r.getResponses().length / listSize);
List<GetResponse> sequence = new ArrayList<>(listSize);
int counter = 0;
@ -104,4 +108,4 @@ public class BasicQueryClient implements QueryClient {
}, listener::onFailure));
}
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeResponse;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.ql.index.IndexResolver;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.ql.util.ActionListeners.map;
/**
* Extension of basic query, adding Point-in-Time awareness.
* Opens a point-in-time, uses it for all queries and closes it when disposed,
* freeing consumer from doing any special management for it.
*/
public class PITAwareQueryClient extends BasicQueryClient {
private String pitId;
private final TimeValue keepAlive;
public PITAwareQueryClient(EqlSession eqlSession) {
super(eqlSession);
this.keepAlive = eqlSession.configuration().requestTimeout();
}
@Override
protected void search(SearchRequest search, ActionListener<SearchResponse> listener) {
// no pitId, ask for one
if (pitId == null) {
openPIT(wrap(r -> {
pitId = r;
searchWithPIT(search, listener);
}, listener::onFailure));
}
else {
searchWithPIT(search, listener);
}
}
private void searchWithPIT(SearchRequest search, ActionListener<SearchResponse> listener) {
// don't increase the keep alive
search.source().pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(pitId, null));
// get the pid on each request
super.search(search, wrap(r -> {
pitId = r.pointInTimeId();
listener.onResponse(r);
},
// always close PIT in case of exceptions
e -> {
if (pitId != null) {
close(wrap(b -> {}, listener::onFailure));
}
listener.onFailure(e);
}));
}
private void openPIT(ActionListener<String> listener) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(
indices,
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
keepAlive,
null,
null
);
client.execute(OpenPointInTimeAction.INSTANCE, request, map(listener, OpenPointInTimeResponse::getSearchContextId));
}
@Override
public void close(ActionListener<Boolean> listener) {
client.execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId),
map(listener, ClosePointInTimeResponse::isSucceeded));
pitId = null;
}
}

View File

@ -20,4 +20,6 @@ public interface QueryClient {
void query(QueryRequest request, ActionListener<SearchResponse> listener);
void get(Iterable<List<HitReference>> refs, ActionListener<List<List<GetResponse>>> listener);
default void close(ActionListener<Boolean> closed) {}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.xpack.eql.session.EmptyPayload;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Payload.Type;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import org.elasticsearch.xpack.ql.util.ActionListeners;
import java.util.Iterator;
import java.util.List;
@ -296,14 +297,20 @@ public class TumblingWindow implements Executable {
if (completed.isEmpty()) {
listener.onResponse(new EmptyPayload(Type.SEQUENCE, timeTook()));
matcher.clear();
close(listener);
return;
}
client.get(hits(completed), wrap(hits -> {
listener.onResponse(new SequencePayload(completed, hits, false, timeTook()));
client.get(hits(completed), ActionListeners.map(listener, hits -> {
SequencePayload payload = new SequencePayload(completed, hits, false, timeTook());
close(listener);
return payload;
}));
}
private void close(ActionListener<Payload> listener) {
matcher.clear();
}, listener::onFailure));
client.close(ActionListener.delegateFailure(listener, (l, r) -> {}));
}
private TimeValue timeTook() {
@ -371,4 +378,4 @@ public class TumblingWindow implements Executable {
};
};
}
}
}

View File

@ -119,8 +119,9 @@ public class SequenceExec extends PhysicalPlan {
@Override
public void execute(EqlSession session, ActionListener<Payload> listener) {
new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker(), direction, maxSpan, limit()).execute(
listener);
new ExecutionManager(session)
.assemble(keys(), children(), timestamp(), tiebreaker(), direction, maxSpan, limit())
.execute(listener);
}
@Override
@ -146,4 +147,4 @@ public class SequenceExec extends PhysicalPlan {
&& Objects.equals(children(), other.children())
&& Objects.equals(keys, other.keys);
}
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ql.util;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import java.util.function.Consumer;
public class ActionListeners {
private ActionListeners() {}
/**
* Combination of {@link ActionListener#wrap(CheckedConsumer, Consumer)} and {@link ActionListener#map(ActionListener, CheckedFunction)}
*/
public static <T, Response> ActionListener<Response> map(ActionListener<T> delegate, CheckedFunction<Response, T, Exception> fn) {
return ActionListener.wrap(r -> delegate.onResponse(fn.apply(r)), delegate::onFailure);
}
}

View File

@ -6,6 +6,11 @@
package org.elasticsearch.xpack.ql;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Literal;
import org.elasticsearch.xpack.ql.expression.predicate.Range;
@ -20,11 +25,15 @@ import org.elasticsearch.xpack.ql.session.Configuration;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataTypes;
import java.io.IOException;
import java.io.InputStream;
import java.time.ZoneId;
import java.util.Map;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomZone;
import static org.elasticsearch.xpack.ql.tree.Source.EMPTY;
import static org.junit.Assert.assertEquals;
public final class TestUtils {
@ -91,4 +100,40 @@ public final class TestUtils {
public static Range rangeOf(Expression value, Expression lower, boolean includeLower, Expression upper, boolean includeUpper) {
return new Range(EMPTY, value, lower, includeLower, upper, includeUpper, randomZone());
}
//
// Common methods / assertions
//
public static void assertNoSearchContexts(RestClient client) throws IOException {
Map<String, Object> stats = searchStats(client);
@SuppressWarnings("unchecked")
Map<String, Object> indicesStats = (Map<String, Object>) stats.get("indices");
for (String index : indicesStats.keySet()) {
if (index.startsWith(".") == false) { // We are not interested in internal indices
assertEquals(index + " should have no search contexts", 0, getOpenContexts(stats, index));
}
}
}
public static int getNumberOfSearchContexts(RestClient client, String index) throws IOException {
return getOpenContexts(searchStats(client), index);
}
private static Map<String, Object> searchStats(RestClient client) throws IOException {
Response response = client.performRequest(new Request("GET", "/_stats/search"));
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
}
@SuppressWarnings("unchecked")
private static int getOpenContexts(Map<String, Object> stats, String index) {
stats = (Map<String, Object>) stats.get("indices");
stats = (Map<String, Object>) stats.get(index);
stats = (Map<String, Object>) stats.get("total");
stats = (Map<String, Object>) stats.get("search");
return (Integer) stats.get("open_contexts");
}
}

View File

@ -11,6 +11,8 @@ dependencies {
// JDBC testing dependencies
api project(path: xpackModule('sql:jdbc'))
// Common utilities from QL
api project(path: xpackModule('ql'), configuration: 'testArtifacts')
api "net.sourceforge.csvjdbc:csvjdbc:${csvjdbcVersion}"
@ -65,6 +67,7 @@ subprojects {
transitive = false
}
testImplementation project(":test:framework")
testRuntimeOnly project(path: xpackModule('ql'), configuration: 'testArtifacts')
// JDBC testing dependencies
testRuntimeOnly "net.sourceforge.csvjdbc:csvjdbc:${csvjdbcVersion}"

View File

@ -17,7 +17,7 @@ import org.junit.Before;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts;
import static org.elasticsearch.xpack.ql.TestUtils.assertNoSearchContexts;
public abstract class CliIntegrationTestCase extends ESRestTestCase {
/**
@ -46,7 +46,7 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase {
return;
}
cli.close();
assertNoSearchContexts();
assertNoSearchContexts(client());
}
/**

View File

@ -28,14 +28,14 @@ import java.util.Properties;
import java.util.Set;
import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts;
import static org.elasticsearch.xpack.ql.TestUtils.assertNoSearchContexts;
public abstract class JdbcIntegrationTestCase extends ESRestTestCase {
@After
public void checkSearchContent() throws Exception {
// Some context might linger due to fire and forget nature of scroll cleanup
assertNoSearchContexts();
assertNoSearchContexts(client());
}
/**

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.qa.rest;
import com.fasterxml.jackson.core.io.JsonStringEncoder;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
@ -19,7 +18,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.xpack.sql.proto.Mode;
@ -48,6 +46,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.xpack.ql.TestUtils.getNumberOfSearchContexts;
import static org.hamcrest.Matchers.containsString;
/**
@ -1039,7 +1038,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
);
assertEquals(true, response.get("succeeded"));
assertEquals(0, getNumberOfSearchContexts("test"));
assertEquals(0, getNumberOfSearchContexts(client(), "test"));
}
private Tuple<String, String> runSqlAsText(String sql, String accept) throws IOException {
@ -1088,35 +1087,4 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
fail("Response does not match:\n" + message.toString());
}
}
public static int getNumberOfSearchContexts(String index) throws IOException {
return getOpenContexts(searchStats(), index);
}
public static void assertNoSearchContexts() throws IOException {
Map<String, Object> stats = searchStats();
@SuppressWarnings("unchecked")
Map<String, Object> indicesStats = (Map<String, Object>) stats.get("indices");
for (String index : indicesStats.keySet()) {
if (index.startsWith(".") == false) { // We are not interested in internal indices
assertEquals(index + " should have no search contexts", 0, getOpenContexts(stats, index));
}
}
}
@SuppressWarnings("unchecked")
private static int getOpenContexts(Map<String, Object> stats, String index) {
stats = (Map<String, Object>) stats.get("indices");
stats = (Map<String, Object>) stats.get(index);
stats = (Map<String, Object>) stats.get("total");
stats = (Map<String, Object>) stats.get("search");
return (Integer) stats.get("open_contexts");
}
private static Map<String, Object> searchStats() throws IOException {
Response response = client().performRequest(new Request("GET", "/_stats/search"));
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
}
}