diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java index 6dc5933828b..c6537b53d6f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ObjectParser; @@ -19,9 +20,12 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.searchafter.SearchAfterBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Arrays; +import java.util.Map; import java.util.Objects; import java.util.function.Supplier; @@ -287,4 +291,16 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re public IndicesOptions indicesOptions() { return indicesOptions; } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new EqlSearchTask(id, type, action, () -> { + StringBuilder sb = new StringBuilder(); + sb.append("indices["); + Strings.arrayToDelimitedString(indices, ",", sb); + sb.append("], "); + sb.append(query); + return sb.toString(); + }, parentTaskId, headers); + } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java new file mode 100644 index 00000000000..f7850a50d34 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.action; + +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; + +import java.util.Map; +import java.util.function.Supplier; + +public class EqlSearchTask extends CancellableTask { + private final Supplier descriptionSupplier; + + public EqlSearchTask(long id, String type, String action, Supplier descriptionSupplier, TaskId parentTaskId, + Map headers) { + super(id, type, action, null, parentTaskId, headers); + this.descriptionSupplier = descriptionSupplier; + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + + @Override + public String getDescription() { + return descriptionSupplier.get(); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Querier.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Querier.java index e085cee208d..b10275cfd4a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Querier.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Querier.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer; import org.elasticsearch.xpack.eql.session.Configuration; import org.elasticsearch.xpack.eql.session.EqlSession; @@ -56,7 +57,9 @@ public class Querier { if (log.isTraceEnabled()) { log.trace("About to execute query {} on {}", StringUtils.toString(sourceBuilder), index); } - + if (cfg.isCancelled()) { + throw new TaskCancelledException("cancelled"); + } SearchRequest search = prepareRequest(client, sourceBuilder, cfg.requestTimeout(), false, Strings.commaDelimitedListToStringArray(index)); @@ -93,4 +96,4 @@ public class Querier { response.getHits().getTotalHits().value, aggs.size(), aggsNames, response.getFailedShards(), response.getSkippedShards(), response.getSuccessfulShards(), response.getTotalShards(), response.getTook(), response.isTimedOut()); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index 469f570d0dc..b12933b34a1 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.eql.action.EqlSearchAction; import org.elasticsearch.xpack.eql.action.EqlSearchRequest; import org.elasticsearch.xpack.eql.action.EqlSearchResponse; +import org.elasticsearch.xpack.eql.action.EqlSearchTask; import org.elasticsearch.xpack.eql.execution.PlanExecutor; import org.elasticsearch.xpack.eql.parser.ParserParams; import org.elasticsearch.xpack.eql.session.Configuration; @@ -49,10 +50,10 @@ public class TransportEqlSearchAction extends HandledTransportAction listener) { - operation(planExecutor, request, username(securityContext), clusterName(clusterService), listener); + operation(planExecutor, (EqlSearchTask) task, request, username(securityContext), clusterName(clusterService), listener); } - public static void operation(PlanExecutor planExecutor, EqlSearchRequest request, String username, + public static void operation(PlanExecutor planExecutor, EqlSearchTask task, EqlSearchRequest request, String username, String clusterName, ActionListener listener) { // TODO: these should be sent by the client ZoneId zoneId = DateUtils.of("Z"); @@ -67,7 +68,7 @@ public class TransportEqlSearchAction extends HandledTransportAction listener.onResponse(createResponse(r)), listener::onFailure)); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Configuration.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Configuration.java index 864d0269a66..bf6d65fe199 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Configuration.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Configuration.java @@ -9,22 +9,24 @@ package org.elasticsearch.xpack.eql.session; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.xpack.eql.action.EqlSearchTask; import java.time.ZoneId; public class Configuration extends org.elasticsearch.xpack.ql.session.Configuration { - + private final String[] indices; private final TimeValue requestTimeout; private final int size; private final String clientId; private final boolean includeFrozenIndices; + private final EqlSearchTask task; @Nullable private QueryBuilder filter; public Configuration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter, TimeValue requestTimeout, - int size, boolean includeFrozen, String clientId) { + int size, boolean includeFrozen, String clientId, EqlSearchTask task) { super(zi, username, clusterName); @@ -34,6 +36,7 @@ public class Configuration extends org.elasticsearch.xpack.ql.session.Configurat this.size = size; this.clientId = clientId; this.includeFrozenIndices = includeFrozen; + this.task = task; } public String[] indices() { @@ -59,4 +62,8 @@ public class Configuration extends org.elasticsearch.xpack.ql.session.Configurat public boolean includeFrozen() { return includeFrozenIndices; } -} \ No newline at end of file + + public boolean isCancelled() { + return task.isCancelled(); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java index a89eaf00b52..3f0d13f4017 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.session; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xpack.eql.analysis.Analyzer; import org.elasticsearch.xpack.eql.analysis.PreAnalyzer; import org.elasticsearch.xpack.eql.execution.PlanExecutor; @@ -35,7 +36,7 @@ public class EqlSession { public EqlSession(Client client, Configuration cfg, IndexResolver indexResolver, PreAnalyzer preAnalyzer, Analyzer analyzer, Optimizer optimizer, Planner planner, PlanExecutor planExecutor) { - + this.client = client; this.configuration = cfg; this.indexResolver = indexResolver; @@ -60,7 +61,7 @@ public class EqlSession { public void eql(String eql, ParserParams params, ActionListener listener) { eqlExecutable(eql, params, wrap(e -> e.execute(this, listener), listener::onFailure)); } - + public void eqlExecutable(String eql, ParserParams params, ActionListener listener) { try { physicalPlan(doParse(eql, params), listener); @@ -88,7 +89,9 @@ public class EqlSession { private void preAnalyze(LogicalPlan parsed, ActionListener listener) { String indexWildcard = Strings.arrayToCommaDelimitedString(configuration.indices()); - + if(configuration.isCancelled()){ + throw new TaskCancelledException("cancelled"); + } indexResolver.resolveAsMergedMapping(indexWildcard, null, configuration.includeFrozen(), wrap(r -> { listener.onResponse(preAnalyzer.preAnalyze(parsed, r)); }, listener::onFailure)); @@ -97,4 +100,4 @@ public class EqlSession { private LogicalPlan doParse(String eql, ParserParams params) { return new EqlParser().createStatement(eql, params); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java index 774c55a4ecd..310f5c4eff1 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java @@ -7,11 +7,16 @@ package org.elasticsearch.xpack.eql; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.eql.action.EqlSearchAction; +import org.elasticsearch.xpack.eql.action.EqlSearchTask; import org.elasticsearch.xpack.eql.session.Configuration; +import java.util.Collections; + import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomIntBetween; +import static org.elasticsearch.test.ESTestCase.randomLong; import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong; import static org.elasticsearch.test.ESTestCase.randomZone; @@ -21,7 +26,8 @@ public final class EqlTestUtils { } public static final Configuration TEST_CFG = new Configuration(new String[]{"none"}, org.elasticsearch.xpack.ql.util.DateUtils.UTC, - "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, ""); + "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, "", + new EqlSearchTask(-1, "", EqlSearchAction.NAME, () -> "", null, Collections.emptyMap())); public static Configuration randomConfiguration() { return new Configuration(new String[]{randomAlphaOfLength(16)}, @@ -32,6 +38,11 @@ public final class EqlTestUtils { new TimeValue(randomNonNegativeLong()), randomIntBetween(5, 100), randomBoolean(), - randomAlphaOfLength(16)); + randomAlphaOfLength(16), + randomTask()); + } + + public static EqlSearchTask randomTask() { + return new EqlSearchTask(randomLong(), "transport", EqlSearchAction.NAME, () -> "", null, Collections.emptyMap()); } } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/CancellationTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/CancellationTests.java new file mode 100644 index 00000000000..74751a039ef --- /dev/null +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/CancellationTests.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.analysis; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.fieldcaps.FieldCapabilities; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.eql.action.EqlSearchRequest; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse; +import org.elasticsearch.xpack.eql.action.EqlSearchTask; +import org.elasticsearch.xpack.eql.execution.PlanExecutor; +import org.elasticsearch.xpack.eql.plugin.TransportEqlSearchAction; +import org.elasticsearch.xpack.ql.index.IndexResolver; +import org.elasticsearch.xpack.ql.type.DefaultDataTypeRegistry; +import org.mockito.stubbing.Answer; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class CancellationTests extends ESTestCase { + + public void testCancellationBeforeFieldCaps() throws InterruptedException { + Client client = mock(Client.class); + EqlSearchTask task = mock(EqlSearchTask.class); + when(task.isCancelled()).thenReturn(true); + + IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE); + PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList())); + CountDownLatch countDownLatch = new CountDownLatch(1); + TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().query("foo where blah"), "", "", + new ActionListener() { + @Override + public void onResponse(EqlSearchResponse eqlSearchResponse) { + fail("Shouldn't be here"); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + assertThat(e, instanceOf(TaskCancelledException.class)); + countDownLatch.countDown(); + } + }); + countDownLatch.await(); + verify(task, times(1)).isCancelled(); + verifyNoMoreInteractions(client, task); + } + + public void testCancellationBeforeSearch() throws InterruptedException { + Client client = mock(Client.class); + + AtomicBoolean cancelled = new AtomicBoolean(false); + EqlSearchTask task = mock(EqlSearchTask.class); + when(task.isCancelled()).then(invocationOnMock -> cancelled.get()); + + String[] indices = new String[]{"endgame"}; + + FieldCapabilities fooField = + new FieldCapabilities("foo", "integer", true, true, indices, null, null, emptyMap()); + FieldCapabilities categoryField = + new FieldCapabilities("event.category", "keyword", true, true, indices, null, null, emptyMap()); + FieldCapabilities timestampField = + new FieldCapabilities("@timestamp", "date", true, true, indices, null, null, emptyMap()); + Map> fields = new HashMap<>(); + fields.put(fooField.getName(), singletonMap(fooField.getName(), fooField)); + fields.put(categoryField.getName(), singletonMap(categoryField.getName(), categoryField)); + fields.put(timestampField.getName(), singletonMap(timestampField.getName(), timestampField)); + + FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); + when(fieldCapabilitiesResponse.getIndices()).thenReturn(indices); + when(fieldCapabilitiesResponse.get()).thenReturn(fields); + doAnswer((Answer) invocation -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + assertFalse(cancelled.getAndSet(true)); + listener.onResponse(fieldCapabilitiesResponse); + return null; + }).when(client).fieldCaps(any(), any()); + + + IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE); + PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList())); + CountDownLatch countDownLatch = new CountDownLatch(1); + TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().indices("endgame") + .query("process where foo==3"), "", "", new ActionListener() { + @Override + public void onResponse(EqlSearchResponse eqlSearchResponse) { + fail("Shouldn't be here"); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + assertThat(e, instanceOf(TaskCancelledException.class)); + countDownLatch.countDown(); + } + }); + countDownLatch.await(); + verify(client).fieldCaps(any(), any()); + verify(task, times(2)).isCancelled(); + verifyNoMoreInteractions(client, task); + } + +}