From 60b1c67409c6f47b56eca0a11ca74b9b672e10bf Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 8 May 2020 13:54:44 +0300 Subject: [PATCH] [7.x][ML] Allow stopping DF analytics whose config is missing (#56360) (#56408) It is possible that the config document for a data frame analytics job is deleted from the config index. If that is the case the user is unable to stop a running job because we attempt to retrieve the config and that will throw. This commit changes that. When the request is forced, we do not expand the requested ids based on the existing configs but from the list of running tasks instead. Backport of #56360 --- .../xpack/core/ml/utils/MlStrings.java | 34 +++++++++++ ...TransportStopDataFrameAnalyticsAction.java | 57 ++++++++++++++----- .../persistence/DatafeedConfigProvider.java | 23 +------- .../ml/job/persistence/JobConfigProvider.java | 28 ++------- .../xpack/ml/utils/MlStringsTests.java | 30 ++++++++++ 5 files changed, 113 insertions(+), 59 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlStrings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlStrings.java index dac30084086..b520a97200d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlStrings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlStrings.java @@ -6,7 +6,12 @@ package org.elasticsearch.xpack.core.ml.utils; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.regex.Regex; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.regex.Pattern; /** @@ -94,4 +99,33 @@ public final class MlStrings { } return fieldPath.substring(0, lastIndexOfDot); } + + /** + * Given a collection of strings and some patterns, it finds the strings that match against at least one pattern. + * @param patterns the patterns may contain wildcards + * @param items the collections of strings + * @return the strings from {@code items} that match against at least one pattern + */ + public static Set findMatching(String[] patterns, Set items) { + if (items.isEmpty()) { + return Collections.emptySet(); + } + if (Strings.isAllOrWildcard(patterns)) { + return items; + } + + Set matchingItems = new LinkedHashSet<>(); + for (String pattern : patterns) { + if (items.contains(pattern)) { + matchingItems.add(pattern); + } else if (Regex.isSimpleMatchPattern(pattern)) { + for (String item : items) { + if (Regex.simpleMatch(pattern, item)) { + matchingItems.add(item); + } + } + } + } + return matchingItems; + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java index 123e038c5ca..41cfcef56d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -36,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; @@ -92,11 +94,11 @@ public class TransportStopDataFrameAnalyticsAction logger.debug("Received request to stop data frame analytics [{}]", request.getId()); ActionListener> expandedIdsListener = ActionListener.wrap( - expandedIds -> { - logger.debug("Resolved data frame analytics to stop: {}", expandedIds); + idsToStop -> { + logger.debug("Resolved data frame analytics to stop: {}", idsToStop); PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); - AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(expandedIds, tasks); + AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(idsToStop, tasks); if (analyticsByTaskState.isEmpty()) { listener.onResponse(new StopDataFrameAnalyticsAction.Response(true)); @@ -112,26 +114,51 @@ public class TransportStopDataFrameAnalyticsAction listener::onFailure ); - expandIds(state, request, expandedIdsListener); + findIdsToStop(state, request, expandedIdsListener); } - private void expandIds(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request, - ActionListener> expandedIdsListener) { - ActionListener> configsListener = ActionListener.wrap( - configs -> { - Set matchingIds = configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toSet()); - PersistentTasksCustomMetadata tasksMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); - Set startedIds = tasksMetadata == null ? Collections.emptySet() : tasksMetadata.tasks().stream() - .filter(t -> t.getId().startsWith(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX)) - .map(t -> t.getId().replaceFirst(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, "")) - .collect(Collectors.toSet()); + private void findIdsToStop(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request, + ActionListener> expandedIdsListener) { + Set startedIds = getAllStartedIds(clusterState); + + ActionListener> matchingIdsListener = ActionListener.wrap( + matchingIds -> { startedIds.retainAll(matchingIds); expandedIdsListener.onResponse(startedIds); }, expandedIdsListener::onFailure ); - configProvider.getMultiple(request.getId(), request.allowNoMatch(), configsListener); + if (request.isForce()) { + matchAllStartedIds(request, startedIds, matchingIdsListener); + } else { + configProvider.getMultiple(request.getId(), request.allowNoMatch(), ActionListener.wrap( + configs -> matchingIdsListener.onResponse( + configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toSet())), + matchingIdsListener::onFailure + )); + } + } + + private static Set getAllStartedIds(ClusterState clusterState) { + PersistentTasksCustomMetadata tasksMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + return tasksMetadata == null ? Collections.emptySet() : tasksMetadata.tasks().stream() + .filter(t -> t.getId().startsWith(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX)) + .map(t -> t.getId().replaceFirst(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, "")) + .collect(Collectors.toSet()); + } + + private void matchAllStartedIds(StopDataFrameAnalyticsAction.Request request, Set startedIds, + ActionListener> matchingIdsListener) { + String[] tokens = ExpandedIdsMatcher.tokenizeExpression(request.getId()); + ExpandedIdsMatcher expandedIdsMatcher = new ExpandedIdsMatcher(tokens, request.allowNoMatch()); + expandedIdsMatcher.filterMatchedIds(startedIds); + if (expandedIdsMatcher.hasUnmatchedIds()) { + matchingIdsListener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(expandedIdsMatcher.unmatchedIdsString())); + return; + } + Set matchingStartedIds = MlStrings.findMatching(tokens, startedIds); + matchingIdsListener.onResponse(matchingStartedIds); } private void normalStop(Task task, StopDataFrameAnalyticsAction.Request request, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index dd07688264f..f1c51fc2fd2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -53,6 +53,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; @@ -492,27 +493,7 @@ public class DatafeedConfigProvider { } static Collection matchingDatafeedIdsWithTasks(String[] datafeedIdPatterns, PersistentTasksCustomMetadata tasksMetadata) { - Set startedDatafeedIds = MlTasks.startedDatafeedIds(tasksMetadata); - if (startedDatafeedIds.isEmpty()) { - return Collections.emptyList() ; - } - if (Strings.isAllOrWildcard(datafeedIdPatterns)) { - return startedDatafeedIds; - } - - List matchingDatafeedIds = new ArrayList<>(); - for (String datafeedIdPattern : datafeedIdPatterns) { - if (startedDatafeedIds.contains(datafeedIdPattern)) { - matchingDatafeedIds.add(datafeedIdPattern); - } else if (Regex.isSimpleMatchPattern(datafeedIdPattern)) { - for (String startedDatafeedId : startedDatafeedIds) { - if (Regex.simpleMatch(datafeedIdPattern, startedDatafeedId)) { - matchingDatafeedIds.add(startedDatafeedId); - } - } - } - } - return matchingDatafeedIds; + return MlStrings.findMatching(datafeedIdPatterns, MlTasks.startedDatafeedIds(tasksMetadata)); } private QueryBuilder buildDatafeedJobIdsQuery(Collection jobIds) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 0835f4d9227..63bb516db02 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -66,12 +66,14 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -529,7 +531,7 @@ public class JobConfigProvider { .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); - Set openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata); + Collection openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.wrap( @@ -748,28 +750,8 @@ public class JobConfigProvider { )); } - static Set matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetadata tasksMetadata) { - Set openjobs = MlTasks.openJobIds(tasksMetadata); - if (openjobs.isEmpty()) { - return Collections.emptySet(); - } - if (Strings.isAllOrWildcard(jobIdPatterns)) { - return openjobs; - } - - Set matchingJobIds = new HashSet<>(); - for (String jobIdPattern : jobIdPatterns) { - if (openjobs.contains(jobIdPattern)) { - matchingJobIds.add(jobIdPattern); - } else if (Regex.isSimpleMatchPattern(jobIdPattern)) { - for (String openJobId : openjobs) { - if (Regex.simpleMatch(jobIdPattern, openJobId)) { - matchingJobIds.add(openJobId); - } - } - } - } - return matchingJobIds; + static Collection matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetadata tasksMetadata) { + return MlStrings.findMatching(jobIdPatterns, MlTasks.openJobIds(tasksMetadata)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java index fed7e79655a..f4e638fc919 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java @@ -9,11 +9,19 @@ package org.elasticsearch.xpack.ml.utils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.utils.MlStrings; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class MlStringsTests extends ESTestCase { + public void testDoubleQuoteIfNotAlphaNumeric() { assertEquals("foo2", MlStrings.doubleQuoteIfNotAlphaNumeric("foo2")); assertEquals("\"fo o\"", MlStrings.doubleQuoteIfNotAlphaNumeric("fo o")); @@ -46,4 +54,26 @@ public class MlStringsTests extends ESTestCase { assertThat(MlStrings.hasValidLengthForId(randomAlphaOfLength(64)), is(true)); assertThat(MlStrings.hasValidLengthForId(randomAlphaOfLength(65)), is(false)); } + + public void testFindMatching_GivenEmptyItems() { + assertThat(MlStrings.findMatching(new String[0], Collections.emptySet()), is(empty())); + } + + public void testFindMatching_GivenAllPattern() { + assertThat(MlStrings.findMatching(new String[] {"_all"}, new HashSet<>(Arrays.asList("a", "b"))), contains("a", "b")); + } + + public void testFindMatching_GivenWildcardPattern() { + assertThat(MlStrings.findMatching(new String[] {"*"}, new HashSet<>(Arrays.asList("a", "b"))), contains("a", "b")); + } + + public void testFindMatching_GivenMixedPatterns() { + assertThat(MlStrings.findMatching(new String[] {"concrete", "wild-*"}, new HashSet<>( + Arrays.asList("a", "concrete", "con*", "wild-1", "wild-2"))), contains("concrete", "wild-1", "wild-2")); + } + + public void testFindMatching_GivenItemMatchedByTwoPatterns() { + Set matching = MlStrings.findMatching(new String[]{"a*", "ab*"}, new HashSet<>(Collections.singletonList("abc"))); + assertThat(matching, contains("abc")); + } }