[7.x][ML] Allow stopping DF analytics whose config is missing (#56360) (#56408)

It is possible that the config document for a data frame
analytics job is deleted from the config index. If that is
the case the user is unable to stop a running job because
we attempt to retrieve the config and that will throw.

This commit changes that. When the request is forced,
we do not expand the requested ids based on the existing
configs but from the list of running tasks instead.

Backport of #56360
This commit is contained in:
Dimitris Athanasiou 2020-05-08 13:54:44 +03:00 committed by GitHub
parent e2e4c3179c
commit 60b1c67409
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 113 additions and 59 deletions

View File

@ -6,7 +6,12 @@
package org.elasticsearch.xpack.core.ml.utils;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.regex.Pattern;
/**
@ -94,4 +99,33 @@ public final class MlStrings {
}
return fieldPath.substring(0, lastIndexOfDot);
}
/**
* Given a collection of strings and some patterns, it finds the strings that match against at least one pattern.
* @param patterns the patterns may contain wildcards
* @param items the collections of strings
* @return the strings from {@code items} that match against at least one pattern
*/
public static Set<String> findMatching(String[] patterns, Set<String> items) {
if (items.isEmpty()) {
return Collections.emptySet();
}
if (Strings.isAllOrWildcard(patterns)) {
return items;
}
Set<String> matchingItems = new LinkedHashSet<>();
for (String pattern : patterns) {
if (items.contains(pattern)) {
matchingItems.add(pattern);
} else if (Regex.isSimpleMatchPattern(pattern)) {
for (String item : items) {
if (Regex.simpleMatch(pattern, item)) {
matchingItems.add(item);
}
}
}
}
return matchingItems;
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -36,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@ -92,11 +94,11 @@ public class TransportStopDataFrameAnalyticsAction
logger.debug("Received request to stop data frame analytics [{}]", request.getId());
ActionListener<Set<String>> expandedIdsListener = ActionListener.wrap(
expandedIds -> {
logger.debug("Resolved data frame analytics to stop: {}", expandedIds);
idsToStop -> {
logger.debug("Resolved data frame analytics to stop: {}", idsToStop);
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(expandedIds, tasks);
AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(idsToStop, tasks);
if (analyticsByTaskState.isEmpty()) {
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
@ -112,26 +114,51 @@ public class TransportStopDataFrameAnalyticsAction
listener::onFailure
);
expandIds(state, request, expandedIdsListener);
findIdsToStop(state, request, expandedIdsListener);
}
private void expandIds(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request,
private void findIdsToStop(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request,
ActionListener<Set<String>> expandedIdsListener) {
ActionListener<List<DataFrameAnalyticsConfig>> configsListener = ActionListener.wrap(
configs -> {
Set<String> matchingIds = configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toSet());
PersistentTasksCustomMetadata tasksMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
Set<String> startedIds = tasksMetadata == null ? Collections.emptySet() : tasksMetadata.tasks().stream()
.filter(t -> t.getId().startsWith(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX))
.map(t -> t.getId().replaceFirst(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, ""))
.collect(Collectors.toSet());
Set<String> startedIds = getAllStartedIds(clusterState);
ActionListener<Set<String>> matchingIdsListener = ActionListener.wrap(
matchingIds -> {
startedIds.retainAll(matchingIds);
expandedIdsListener.onResponse(startedIds);
},
expandedIdsListener::onFailure
);
configProvider.getMultiple(request.getId(), request.allowNoMatch(), configsListener);
if (request.isForce()) {
matchAllStartedIds(request, startedIds, matchingIdsListener);
} else {
configProvider.getMultiple(request.getId(), request.allowNoMatch(), ActionListener.wrap(
configs -> matchingIdsListener.onResponse(
configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toSet())),
matchingIdsListener::onFailure
));
}
}
private static Set<String> getAllStartedIds(ClusterState clusterState) {
PersistentTasksCustomMetadata tasksMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
return tasksMetadata == null ? Collections.emptySet() : tasksMetadata.tasks().stream()
.filter(t -> t.getId().startsWith(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX))
.map(t -> t.getId().replaceFirst(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, ""))
.collect(Collectors.toSet());
}
private void matchAllStartedIds(StopDataFrameAnalyticsAction.Request request, Set<String> startedIds,
ActionListener<Set<String>> matchingIdsListener) {
String[] tokens = ExpandedIdsMatcher.tokenizeExpression(request.getId());
ExpandedIdsMatcher expandedIdsMatcher = new ExpandedIdsMatcher(tokens, request.allowNoMatch());
expandedIdsMatcher.filterMatchedIds(startedIds);
if (expandedIdsMatcher.hasUnmatchedIds()) {
matchingIdsListener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(expandedIdsMatcher.unmatchedIdsString()));
return;
}
Set<String> matchingStartedIds = MlStrings.findMatching(tokens, startedIds);
matchingIdsListener.onResponse(matchingStartedIds);
}
private void normalStop(Task task, StopDataFrameAnalyticsAction.Request request,

View File

@ -53,6 +53,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
@ -492,27 +493,7 @@ public class DatafeedConfigProvider {
}
static Collection<String> matchingDatafeedIdsWithTasks(String[] datafeedIdPatterns, PersistentTasksCustomMetadata tasksMetadata) {
Set<String> startedDatafeedIds = MlTasks.startedDatafeedIds(tasksMetadata);
if (startedDatafeedIds.isEmpty()) {
return Collections.emptyList() ;
}
if (Strings.isAllOrWildcard(datafeedIdPatterns)) {
return startedDatafeedIds;
}
List<String> matchingDatafeedIds = new ArrayList<>();
for (String datafeedIdPattern : datafeedIdPatterns) {
if (startedDatafeedIds.contains(datafeedIdPattern)) {
matchingDatafeedIds.add(datafeedIdPattern);
} else if (Regex.isSimpleMatchPattern(datafeedIdPattern)) {
for (String startedDatafeedId : startedDatafeedIds) {
if (Regex.simpleMatch(datafeedIdPattern, startedDatafeedId)) {
matchingDatafeedIds.add(startedDatafeedId);
}
}
}
}
return matchingDatafeedIds;
return MlStrings.findMatching(datafeedIdPatterns, MlTasks.startedDatafeedIds(tasksMetadata));
}
private QueryBuilder buildDatafeedJobIdsQuery(Collection<String> jobIds) {

View File

@ -66,12 +66,14 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -529,7 +531,7 @@ public class JobConfigProvider {
.request();
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs);
Set<String> openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata);
Collection<String> openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
@ -748,28 +750,8 @@ public class JobConfigProvider {
));
}
static Set<String> matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetadata tasksMetadata) {
Set<String> openjobs = MlTasks.openJobIds(tasksMetadata);
if (openjobs.isEmpty()) {
return Collections.emptySet();
}
if (Strings.isAllOrWildcard(jobIdPatterns)) {
return openjobs;
}
Set<String> matchingJobIds = new HashSet<>();
for (String jobIdPattern : jobIdPatterns) {
if (openjobs.contains(jobIdPattern)) {
matchingJobIds.add(jobIdPattern);
} else if (Regex.isSimpleMatchPattern(jobIdPattern)) {
for (String openJobId : openjobs) {
if (Regex.simpleMatch(jobIdPattern, openJobId)) {
matchingJobIds.add(openJobId);
}
}
}
}
return matchingJobIds;
static Collection<String> matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetadata tasksMetadata) {
return MlStrings.findMatching(jobIdPatterns, MlTasks.openJobIds(tasksMetadata));
}

View File

@ -9,11 +9,19 @@ package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class MlStringsTests extends ESTestCase {
public void testDoubleQuoteIfNotAlphaNumeric() {
assertEquals("foo2", MlStrings.doubleQuoteIfNotAlphaNumeric("foo2"));
assertEquals("\"fo o\"", MlStrings.doubleQuoteIfNotAlphaNumeric("fo o"));
@ -46,4 +54,26 @@ public class MlStringsTests extends ESTestCase {
assertThat(MlStrings.hasValidLengthForId(randomAlphaOfLength(64)), is(true));
assertThat(MlStrings.hasValidLengthForId(randomAlphaOfLength(65)), is(false));
}
public void testFindMatching_GivenEmptyItems() {
assertThat(MlStrings.findMatching(new String[0], Collections.emptySet()), is(empty()));
}
public void testFindMatching_GivenAllPattern() {
assertThat(MlStrings.findMatching(new String[] {"_all"}, new HashSet<>(Arrays.asList("a", "b"))), contains("a", "b"));
}
public void testFindMatching_GivenWildcardPattern() {
assertThat(MlStrings.findMatching(new String[] {"*"}, new HashSet<>(Arrays.asList("a", "b"))), contains("a", "b"));
}
public void testFindMatching_GivenMixedPatterns() {
assertThat(MlStrings.findMatching(new String[] {"concrete", "wild-*"}, new HashSet<>(
Arrays.asList("a", "concrete", "con*", "wild-1", "wild-2"))), contains("concrete", "wild-1", "wild-2"));
}
public void testFindMatching_GivenItemMatchedByTwoPatterns() {
Set<String> matching = MlStrings.findMatching(new String[]{"a*", "ab*"}, new HashSet<>(Collections.singletonList("abc")));
assertThat(matching, contains("abc"));
}
}