diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlStrings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlStrings.java index dac30084086..b520a97200d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlStrings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlStrings.java @@ -6,7 +6,12 @@ package org.elasticsearch.xpack.core.ml.utils; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.regex.Regex; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.regex.Pattern; /** @@ -94,4 +99,33 @@ public final class MlStrings { } return fieldPath.substring(0, lastIndexOfDot); } + + /** + * Given a collection of strings and some patterns, it finds the strings that match against at least one pattern. + * @param patterns the patterns may contain wildcards + * @param items the collections of strings + * @return the strings from {@code items} that match against at least one pattern + */ + public static Set findMatching(String[] patterns, Set items) { + if (items.isEmpty()) { + return Collections.emptySet(); + } + if (Strings.isAllOrWildcard(patterns)) { + return items; + } + + Set matchingItems = new LinkedHashSet<>(); + for (String pattern : patterns) { + if (items.contains(pattern)) { + matchingItems.add(pattern); + } else if (Regex.isSimpleMatchPattern(pattern)) { + for (String item : items) { + if (Regex.simpleMatch(pattern, item)) { + matchingItems.add(item); + } + } + } + } + return matchingItems; + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java index 123e038c5ca..41cfcef56d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -36,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; @@ -92,11 +94,11 @@ public class TransportStopDataFrameAnalyticsAction logger.debug("Received request to stop data frame analytics [{}]", request.getId()); ActionListener> expandedIdsListener = ActionListener.wrap( - expandedIds -> { - logger.debug("Resolved data frame analytics to stop: {}", expandedIds); + idsToStop -> { + logger.debug("Resolved data frame analytics to stop: {}", idsToStop); PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); - AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(expandedIds, tasks); + AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(idsToStop, tasks); if (analyticsByTaskState.isEmpty()) { listener.onResponse(new StopDataFrameAnalyticsAction.Response(true)); @@ -112,26 +114,51 @@ public class TransportStopDataFrameAnalyticsAction listener::onFailure ); - expandIds(state, request, expandedIdsListener); + findIdsToStop(state, request, expandedIdsListener); } - private void expandIds(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request, - ActionListener> expandedIdsListener) { - ActionListener> configsListener = ActionListener.wrap( - configs -> { - Set matchingIds = configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toSet()); - PersistentTasksCustomMetadata tasksMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); - Set startedIds = tasksMetadata == null ? Collections.emptySet() : tasksMetadata.tasks().stream() - .filter(t -> t.getId().startsWith(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX)) - .map(t -> t.getId().replaceFirst(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, "")) - .collect(Collectors.toSet()); + private void findIdsToStop(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request, + ActionListener> expandedIdsListener) { + Set startedIds = getAllStartedIds(clusterState); + + ActionListener> matchingIdsListener = ActionListener.wrap( + matchingIds -> { startedIds.retainAll(matchingIds); expandedIdsListener.onResponse(startedIds); }, expandedIdsListener::onFailure ); - configProvider.getMultiple(request.getId(), request.allowNoMatch(), configsListener); + if (request.isForce()) { + matchAllStartedIds(request, startedIds, matchingIdsListener); + } else { + configProvider.getMultiple(request.getId(), request.allowNoMatch(), ActionListener.wrap( + configs -> matchingIdsListener.onResponse( + configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toSet())), + matchingIdsListener::onFailure + )); + } + } + + private static Set getAllStartedIds(ClusterState clusterState) { + PersistentTasksCustomMetadata tasksMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + return tasksMetadata == null ? Collections.emptySet() : tasksMetadata.tasks().stream() + .filter(t -> t.getId().startsWith(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX)) + .map(t -> t.getId().replaceFirst(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, "")) + .collect(Collectors.toSet()); + } + + private void matchAllStartedIds(StopDataFrameAnalyticsAction.Request request, Set startedIds, + ActionListener> matchingIdsListener) { + String[] tokens = ExpandedIdsMatcher.tokenizeExpression(request.getId()); + ExpandedIdsMatcher expandedIdsMatcher = new ExpandedIdsMatcher(tokens, request.allowNoMatch()); + expandedIdsMatcher.filterMatchedIds(startedIds); + if (expandedIdsMatcher.hasUnmatchedIds()) { + matchingIdsListener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(expandedIdsMatcher.unmatchedIdsString())); + return; + } + Set matchingStartedIds = MlStrings.findMatching(tokens, startedIds); + matchingIdsListener.onResponse(matchingStartedIds); } private void normalStop(Task task, StopDataFrameAnalyticsAction.Request request, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index dd07688264f..f1c51fc2fd2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -53,6 +53,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; @@ -492,27 +493,7 @@ public class DatafeedConfigProvider { } static Collection matchingDatafeedIdsWithTasks(String[] datafeedIdPatterns, PersistentTasksCustomMetadata tasksMetadata) { - Set startedDatafeedIds = MlTasks.startedDatafeedIds(tasksMetadata); - if (startedDatafeedIds.isEmpty()) { - return Collections.emptyList() ; - } - if (Strings.isAllOrWildcard(datafeedIdPatterns)) { - return startedDatafeedIds; - } - - List matchingDatafeedIds = new ArrayList<>(); - for (String datafeedIdPattern : datafeedIdPatterns) { - if (startedDatafeedIds.contains(datafeedIdPattern)) { - matchingDatafeedIds.add(datafeedIdPattern); - } else if (Regex.isSimpleMatchPattern(datafeedIdPattern)) { - for (String startedDatafeedId : startedDatafeedIds) { - if (Regex.simpleMatch(datafeedIdPattern, startedDatafeedId)) { - matchingDatafeedIds.add(startedDatafeedId); - } - } - } - } - return matchingDatafeedIds; + return MlStrings.findMatching(datafeedIdPatterns, MlTasks.startedDatafeedIds(tasksMetadata)); } private QueryBuilder buildDatafeedJobIdsQuery(Collection jobIds) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 0835f4d9227..63bb516db02 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -66,12 +66,14 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -529,7 +531,7 @@ public class JobConfigProvider { .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); - Set openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata); + Collection openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.wrap( @@ -748,28 +750,8 @@ public class JobConfigProvider { )); } - static Set matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetadata tasksMetadata) { - Set openjobs = MlTasks.openJobIds(tasksMetadata); - if (openjobs.isEmpty()) { - return Collections.emptySet(); - } - if (Strings.isAllOrWildcard(jobIdPatterns)) { - return openjobs; - } - - Set matchingJobIds = new HashSet<>(); - for (String jobIdPattern : jobIdPatterns) { - if (openjobs.contains(jobIdPattern)) { - matchingJobIds.add(jobIdPattern); - } else if (Regex.isSimpleMatchPattern(jobIdPattern)) { - for (String openJobId : openjobs) { - if (Regex.simpleMatch(jobIdPattern, openJobId)) { - matchingJobIds.add(openJobId); - } - } - } - } - return matchingJobIds; + static Collection matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetadata tasksMetadata) { + return MlStrings.findMatching(jobIdPatterns, MlTasks.openJobIds(tasksMetadata)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java index fed7e79655a..f4e638fc919 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java @@ -9,11 +9,19 @@ package org.elasticsearch.xpack.ml.utils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.utils.MlStrings; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class MlStringsTests extends ESTestCase { + public void testDoubleQuoteIfNotAlphaNumeric() { assertEquals("foo2", MlStrings.doubleQuoteIfNotAlphaNumeric("foo2")); assertEquals("\"fo o\"", MlStrings.doubleQuoteIfNotAlphaNumeric("fo o")); @@ -46,4 +54,26 @@ public class MlStringsTests extends ESTestCase { assertThat(MlStrings.hasValidLengthForId(randomAlphaOfLength(64)), is(true)); assertThat(MlStrings.hasValidLengthForId(randomAlphaOfLength(65)), is(false)); } + + public void testFindMatching_GivenEmptyItems() { + assertThat(MlStrings.findMatching(new String[0], Collections.emptySet()), is(empty())); + } + + public void testFindMatching_GivenAllPattern() { + assertThat(MlStrings.findMatching(new String[] {"_all"}, new HashSet<>(Arrays.asList("a", "b"))), contains("a", "b")); + } + + public void testFindMatching_GivenWildcardPattern() { + assertThat(MlStrings.findMatching(new String[] {"*"}, new HashSet<>(Arrays.asList("a", "b"))), contains("a", "b")); + } + + public void testFindMatching_GivenMixedPatterns() { + assertThat(MlStrings.findMatching(new String[] {"concrete", "wild-*"}, new HashSet<>( + Arrays.asList("a", "concrete", "con*", "wild-1", "wild-2"))), contains("concrete", "wild-1", "wild-2")); + } + + public void testFindMatching_GivenItemMatchedByTwoPatterns() { + Set matching = MlStrings.findMatching(new String[]{"a*", "ab*"}, new HashSet<>(Collections.singletonList("abc"))); + assertThat(matching, contains("abc")); + } }