[ML] allow close/stop for jobs/datafeeds with missing configs (#51888) (#51997)

If the configs are removed (by some horrific means), we should still allow tasks to be cleaned up easily.

Datafeeds and jobs with missing configs are now visible in their respective _stats calls and can be stopped/closed.
This commit is contained in:
Benjamin Trent 2020-02-06 12:10:18 -05:00 committed by GitHub
parent 03fb5cdaae
commit 846f87a26e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 589 additions and 139 deletions

View File

@ -0,0 +1,241 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.junit.After;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.hamcrest.CoreMatchers.equalTo;
public class JobAndDatafeedResilienceIT extends MlNativeAutodetectIntegTestCase {
private String index = "empty_index";
@After
public void cleanUpTest() {
cleanUp();
}
public void testCloseOpenJobWithMissingConfig() throws Exception {
final String jobId = "job-with-missing-config";
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
putJob(job);
openJob(job.getId());
client().prepareDelete(AnomalyDetectorsIndexFields.CONFIG_INDEX, "_doc", Job.documentId(jobId)).get();
client().admin().indices().prepareRefresh(AnomalyDetectorsIndexFields.CONFIG_INDEX).get();
ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> {
CloseJobAction.Request request = new CloseJobAction.Request(jobId);
request.setAllowNoJobs(false);
client().execute(CloseJobAction.INSTANCE, request).actionGet();
});
assertThat(ex.getMessage(), equalTo("No known job with id 'job-with-missing-config'"));
forceCloseJob(jobId);
assertBusy(() ->
assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(0))
);
}
public void testStopStartedDatafeedWithMissingConfig() throws Exception {
client().admin().indices().prepareCreate(index)
.addMapping("type", "time", "type=date", "value", "type=long")
.get();
final String jobId = "job-with-missing-datafeed-with-config";
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
DatafeedConfig.Builder datafeedConfigBuilder =
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerJob(job);
putJob(job);
openJob(job.getId());
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, null);
client().prepareDelete(AnomalyDetectorsIndexFields.CONFIG_INDEX, "_doc", DatafeedConfig.documentId(datafeedConfig.getId())).get();
client().admin().indices().prepareRefresh(AnomalyDetectorsIndexFields.CONFIG_INDEX).get();
ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> {
StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedConfig.getId());
request.setAllowNoDatafeeds(false);
client().execute(StopDatafeedAction.INSTANCE, request).actionGet();
});
assertThat(ex.getMessage(), equalTo("No datafeed with id [job-with-missing-datafeed-with-config-datafeed] exists"));
forceStopDatafeed(datafeedConfig.getId());
assertBusy(() ->
assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(0))
);
closeJob(jobId);
waitUntilJobIsClosed(jobId);
}
public void testGetJobStats() throws Exception {
final String jobId1 = "job-with-missing-config-stats";
final String jobId2 = "job-with-config-stats";
Job.Builder job1 = createJob(jobId1, TimeValue.timeValueMinutes(5), "count", null);
Job.Builder job2 = createJob(jobId2, TimeValue.timeValueMinutes(5), "count", null);
putJob(job1);
openJob(job1.getId());
registerJob(job2);
putJob(job2);
openJob(job2.getId());
client().prepareDelete(AnomalyDetectorsIndexFields.CONFIG_INDEX, "_doc", Job.documentId(jobId1)).get();
client().admin().indices().prepareRefresh(AnomalyDetectorsIndexFields.CONFIG_INDEX).get();
List<GetJobsStatsAction.Response.JobStats> jobStats = client().execute(GetJobsStatsAction.INSTANCE,
new GetJobsStatsAction.Request("*"))
.get()
.getResponse()
.results();
assertThat(jobStats.size(), equalTo(2));
assertThat(jobStats.get(0).getJobId(), equalTo(jobId2));
assertThat(jobStats.get(1).getJobId(), equalTo(jobId1));
forceCloseJob(jobId1);
closeJob(jobId2);
assertBusy(() ->
assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(0))
);
}
public void testGetDatafeedStats() throws Exception {
client().admin().indices().prepareCreate(index)
.addMapping("type", "time", "type=date", "value", "type=long")
.get();
final String jobId1 = "job-with-datafeed-missing-config-stats";
final String jobId2 = "job-with-datafeed-config-stats";
Job.Builder job1 = createJob(jobId1, TimeValue.timeValueMinutes(5), "count", null);
Job.Builder job2 = createJob(jobId2, TimeValue.timeValueMinutes(5), "count", null);
registerJob(job1);
putJob(job1);
openJob(job1.getId());
registerJob(job2);
putJob(job2);
openJob(job2.getId());
DatafeedConfig.Builder datafeedConfigBuilder1 =
createDatafeedBuilder(job1.getId() + "-datafeed", job1.getId(), Collections.singletonList(index));
DatafeedConfig datafeedConfig1 = datafeedConfigBuilder1.build();
putDatafeed(datafeedConfig1);
startDatafeed(datafeedConfig1.getId(), 0L, null);
DatafeedConfig.Builder datafeedConfigBuilder2 =
createDatafeedBuilder(job2.getId() + "-datafeed", job2.getId(), Collections.singletonList(index));
DatafeedConfig datafeedConfig2 = datafeedConfigBuilder2.build();
putDatafeed(datafeedConfig2);
startDatafeed(datafeedConfig2.getId(), 0L, null);
client().prepareDelete(AnomalyDetectorsIndexFields.CONFIG_INDEX, "_doc", DatafeedConfig.documentId(datafeedConfig1.getId())).get();
client().admin().indices().prepareRefresh(AnomalyDetectorsIndexFields.CONFIG_INDEX).get();
List<GetDatafeedsStatsAction.Response.DatafeedStats> dfStats = client().execute(GetDatafeedsStatsAction.INSTANCE,
new GetDatafeedsStatsAction.Request("*"))
.get()
.getResponse()
.results();
assertThat(dfStats.size(), equalTo(2));
assertThat(dfStats.get(0).getDatafeedId(), equalTo(datafeedConfig2.getId()));
assertThat(dfStats.get(1).getDatafeedId(), equalTo(datafeedConfig1.getId()));
forceStopDatafeed(datafeedConfig1.getId());
stopDatafeed(datafeedConfig2.getId());
assertBusy(() ->
assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(0))
);
closeJob(jobId1);
closeJob(jobId2);
waitUntilJobIsClosed(jobId1);
waitUntilJobIsClosed(jobId2);
}
private CloseJobAction.Response forceCloseJob(String jobId) {
CloseJobAction.Request request = new CloseJobAction.Request(jobId);
request.setForce(true);
return client().execute(CloseJobAction.INSTANCE, request).actionGet();
}
private StopDatafeedAction.Response forceStopDatafeed(String datafeedId) {
StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId);
request.setForce(true);
return client().execute(StopDatafeedAction.INSTANCE, request).actionGet();
}
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
return createJob(id, bucketSpan, function, field, null);
}
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeField("time");
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
Detector.Builder d = new Detector.Builder(function, field);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setBucketSpan(bucketSpan);
analysisConfig.setSummaryCountFieldName(summaryCountField);
Job.Builder builder = new Job.Builder();
builder.setId(id);
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
return builder;
}
}

View File

@ -15,7 +15,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
@ -55,7 +54,6 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
private static final Logger logger = LogManager.getLogger(TransportCloseJobAction.class);
private final ThreadPool threadPool;
private final Client client;
private final ClusterService clusterService;
private final AnomalyDetectionAuditor auditor;
private final PersistentTasksService persistentTasksService;
@ -64,14 +62,13 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
@Inject
public TransportCloseJobAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
ClusterService clusterService, Client client, AnomalyDetectionAuditor auditor,
ClusterService clusterService, AnomalyDetectionAuditor auditor,
PersistentTasksService persistentTasksService, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider) {
// We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here:
super(CloseJobAction.NAME, clusterService, transportService, actionFilters,
CloseJobAction.Request::new, CloseJobAction.Response::new, CloseJobAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
this.client = client;
this.clusterService = clusterService;
this.auditor = auditor;
this.persistentTasksService = persistentTasksService;
@ -110,7 +107,12 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
*/
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
jobConfigProvider.expandJobsIds(request.getJobId(),
request.allowNoJobs(),
true,
tasksMetaData,
request.isForce(),
ActionListener.wrap(
expandedJobIds -> {
validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap(
response -> {
@ -128,9 +130,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
for (String resolvedJobId : request.getOpenJobIds()) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask =
MlTasks.getJobTask(resolvedJobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(resolvedJobId, tasks);
if (jobTask == null) {
// This should not happen, because openJobIds was
// derived from the same tasks metadata as jobTask

View File

@ -209,7 +209,17 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId));
markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener);
},
e -> finalListener.onFailure(e));
e -> {
if (request.isForce()
&& MlTasks.getJobTask(request.getJobId(), state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)) != null) {
logger.info(
"[{}] config is missing but task exists. Attempting to delete tasks and stop process",
request.getJobId());
forceDeleteJob(parentTaskClient, request, finalListener);
} else {
finalListener.onFailure(e);
}
});
// First check that the job exists, because we don't want to audit
// the beginning of its deletion if it didn't exist in the first place

View File

@ -32,6 +32,9 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.function.Function;
import java.util.stream.Collectors;
public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction<GetDatafeedsStatsAction.Request,
@ -67,31 +70,40 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct
protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterState state,
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
final PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
ActionListener<SortedSet<String>> expandIdsListener = ActionListener.wrap(
expandedIds -> {
datafeedConfigProvider.expandDatafeedConfigs(
request.getDatafeedId(),
request.allowNoDatafeeds(),
// Already took into account the request parameter when we expanded the IDs with the tasks earlier
// Should allow for no datafeeds in case the config is gone
true,
ActionListener.wrap(
datafeedBuilders -> {
List<String> jobIds =
datafeedBuilders.stream()
Map<String, DatafeedConfig> existingConfigs = datafeedBuilders.stream()
.map(DatafeedConfig.Builder::build)
.collect(Collectors.toMap(DatafeedConfig::getId, Function.identity()));
List<String> jobIds = existingConfigs.values()
.stream()
.map(DatafeedConfig::getJobId)
.collect(Collectors.toList());
jobResultsProvider.datafeedTimingStats(
jobIds,
timingStatsByJobId -> {
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<GetDatafeedsStatsAction.Response.DatafeedStats> results =
datafeedBuilders.stream()
.map(DatafeedConfig.Builder::build)
.map(
datafeed -> getDatafeedStats(
datafeed.getId(),
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedIds.stream()
.map(datafeedId -> {
DatafeedConfig config = existingConfigs.get(datafeedId);
String jobId = config == null ? null : config.getJobId();
DatafeedTimingStats timingStats = jobId == null ? null : timingStatsByJobId.get(jobId);
return buildDatafeedStats(
datafeedId,
state,
tasksInProgress,
datafeed.getJobId(),
timingStatsByJobId.get(datafeed.getJobId())))
jobId,
timingStats
);
})
.collect(Collectors.toList());
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage =
new QueryPage<>(results, results.size(), DatafeedConfig.RESULTS_FIELD);
@ -101,9 +113,19 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct
},
listener::onFailure)
);
},
listener::onFailure
);
// This might also include datafeed tasks that exist but no longer have a config
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(),
request.allowNoDatafeeds(),
tasksInProgress,
true,
expandIdsListener);
}
private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(String datafeedId,
private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats(String datafeedId,
ClusterState state,
PersistentTasksCustomMetaData tasks,
String jobId,
@ -116,7 +138,7 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct
node = state.nodes().get(task.getExecutorNode());
explanation = task.getAssignment().getExplanation();
}
if (timingStats == null) {
if (timingStats == null && jobId != null) {
timingStats = new DatafeedTimingStats(jobId);
}
return new GetDatafeedsStatsAction.Response.DatafeedStats(datafeedId, datafeedState, node, explanation, timingStats);

View File

@ -75,7 +75,10 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {
logger.debug("Get stats for job [{}]", request.getJobId());
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
ClusterState state = clusterService.state();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
// If there are deleted configs, but the task is still around, we probably want to return the tasks in the stats call
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, tasks, true, ActionListener.wrap(
expandedIds -> {
request.setExpandedJobsIds(new ArrayList<>(expandedIds));
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -44,9 +43,6 @@ public class TransportIsolateDatafeedAction extends TransportTasksAction<Transpo
return;
}
String executorNode = datafeedTask.getExecutorNode();
DiscoveryNodes nodes = state.nodes();
request.setNodes(datafeedTask.getExecutorNode());
super.doExecute(task, request, listener);
}

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
@ -27,6 +28,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.AllocatedPersistentTask;
@ -447,7 +449,15 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(
response -> task.markAsCompleted(),
e -> logger.error("error finalizing job [" + jobId + "]", e)
e -> {
logger.error("error finalizing job [" + jobId + "]", e);
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof DocumentMissingException || unwrapped instanceof ResourceNotFoundException) {
task.markAsCompleted();
} else {
task.markAsFailed(e);
}
}
));
} else {
task.markAsCompleted();

View File

@ -131,10 +131,13 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new));
}
} else {
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
expandedIds -> {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(),
request.allowNoDatafeeds(),
tasks,
request.isForce(),
ActionListener.wrap(
expandedIds -> {
List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
List<String> notStoppedDatafeeds = new ArrayList<>();
@ -257,7 +260,9 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTask,
ActionListener<StopDatafeedAction.Response> listener) {
DatafeedState taskState = DatafeedState.STOPPING;
datafeedTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> {
datafeedTask.updatePersistentTaskState(taskState,
ActionListener.wrap(
task -> {
// we need to fork because we are now on a network threadpool
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override

View File

@ -42,9 +42,12 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -350,9 +353,15 @@ public class DatafeedConfigProvider {
* @param allowNoDatafeeds if {@code false}, an error is thrown when no name matches the {@code expression}.
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @param tasks The current tasks meta-data. For expanding IDs when datafeeds might have missing configurations
* @param allowMissingConfigs If a datafeed has a task, but is missing a config, allow the ID to be expanded via the existing task
* @param listener The expanded datafeed IDs listener
*/
public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener<SortedSet<String>> listener) {
public void expandDatafeedIds(String expression,
boolean allowNoDatafeeds,
PersistentTasksCustomMetaData tasks,
boolean allowMissingConfigs,
ActionListener<SortedSet<String>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens));
sourceBuilder.sort(DatafeedConfig.ID.getPreferredName());
@ -366,6 +375,7 @@ public class DatafeedConfigProvider {
.request();
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);
Collection<String> matchingStartedDatafeedIds = matchingDatafeedIdsWithTasks(tokens, tasks);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
@ -375,6 +385,9 @@ public class DatafeedConfigProvider {
for (SearchHit hit : hits) {
datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue());
}
if (allowMissingConfigs) {
datafeedIds.addAll(matchingStartedDatafeedIds);
}
requiredMatches.filterMatchedIds(datafeedIds);
if (requiredMatches.hasUnmatchedIds()) {
@ -391,10 +404,10 @@ public class DatafeedConfigProvider {
}
/**
* The same logic as {@link #expandDatafeedIds(String, boolean, ActionListener)} but
* The same logic as {@link #expandDatafeedIds(String, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)} but
* the full datafeed configuration is returned.
*
* See {@link #expandDatafeedIds(String, boolean, ActionListener)}
* See {@link #expandDatafeedIds(String, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)}
*
* @param expression the expression to resolve
* @param allowNoDatafeeds if {@code false}, an error is thrown when no name matches the {@code expression}.
@ -478,6 +491,30 @@ public class DatafeedConfigProvider {
return boolQueryBuilder;
}
static Collection<String> matchingDatafeedIdsWithTasks(String[] datafeedIdPatterns, PersistentTasksCustomMetaData tasksMetaData) {
Set<String> startedDatafeedIds = MlTasks.startedDatafeedIds(tasksMetaData);
if (startedDatafeedIds.isEmpty()) {
return Collections.emptyList() ;
}
if (Strings.isAllOrWildcard(datafeedIdPatterns)) {
return startedDatafeedIds;
}
List<String> matchingDatafeedIds = new ArrayList<>();
for (String datafeedIdPattern : datafeedIdPatterns) {
if (startedDatafeedIds.contains(datafeedIdPattern)) {
matchingDatafeedIds.add(datafeedIdPattern);
} else if (Regex.isSimpleMatchPattern(datafeedIdPattern)) {
for (String startedDatafeedId : startedDatafeedIds) {
if (Regex.simpleMatch(datafeedIdPattern, startedDatafeedId)) {
matchingDatafeedIds.add(startedDatafeedId);
}
}
}
}
return matchingDatafeedIds;
}
private QueryBuilder buildDatafeedJobIdsQuery(Collection<String> jobIds) {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.filter(new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE));

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.regex.Regex;
@ -50,11 +51,13 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
@ -502,9 +505,17 @@ public class JobConfigProvider {
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @param excludeDeleting If true exclude jobs marked as deleting
* @param tasksCustomMetaData The current persistent task metadata.
* For resolving jobIds that have tasks, but for some reason, don't have configs
* @param allowMissingConfigs If a job has a task, but is missing a config, allow the ID to be expanded via the existing task
* @param listener The expanded job Ids listener
*/
public void expandJobsIds(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<SortedSet<String>> listener) {
public void expandJobsIds(String expression,
boolean allowNoJobs,
boolean excludeDeleting,
@Nullable PersistentTasksCustomMetaData tasksCustomMetaData,
boolean allowMissingConfigs,
ActionListener<SortedSet<String>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());
@ -519,6 +530,7 @@ public class JobConfigProvider {
.request();
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs);
Set<String> openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetaData);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
@ -533,7 +545,9 @@ public class JobConfigProvider {
groupsIds.addAll(groups.stream().map(Object::toString).collect(Collectors.toList()));
}
}
if (allowMissingConfigs) {
jobIds.addAll(openMatchingJobs);
}
groupsIds.addAll(jobIds);
requiredMatches.filterMatchedIds(groupsIds);
if (requiredMatches.hasUnmatchedIds()) {
@ -550,10 +564,10 @@ public class JobConfigProvider {
}
/**
* The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but
* The same logic as {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)} but
* the full anomaly detector job configuration is returned.
*
* See {@link #expandJobsIds(String, boolean, boolean, ActionListener)}
* See {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)}
*
* @param expression the expression to resolve
* @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}.
@ -611,7 +625,7 @@ public class JobConfigProvider {
/**
* Expands the list of job group Ids to the set of jobs which are members of the groups.
* Unlike {@link #expandJobsIds(String, boolean, boolean, ActionListener)} it is not an error
* Unlike {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)} it is not an error
* if a group Id does not exist.
* Wildcard expansion of group Ids is not supported.
*
@ -735,6 +749,31 @@ public class JobConfigProvider {
));
}
static Set<String> matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetaData tasksMetaData) {
Set<String> openjobs = MlTasks.openJobIds(tasksMetaData);
if (openjobs.isEmpty()) {
return Collections.emptySet();
}
if (Strings.isAllOrWildcard(jobIdPatterns)) {
return openjobs;
}
Set<String> matchingJobIds = new HashSet<>();
for (String jobIdPattern : jobIdPatterns) {
if (openjobs.contains(jobIdPattern)) {
matchingJobIds.add(jobIdPattern);
} else if (Regex.isSimpleMatchPattern(jobIdPattern)) {
for (String openJobId : openjobs) {
if (Regex.simpleMatch(jobIdPattern, openJobId)) {
matchingJobIds.add(openJobId);
}
}
}
}
return matchingJobIds;
}
private void parseJobLenientlyFromSource(BytesReference source, ActionListener<Job.Builder> jobListener) {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
@ -276,7 +275,7 @@ public class TransportCloseJobActionTests extends ESTestCase {
private TransportCloseJobAction createAction() {
return new TransportCloseJobAction(mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class),
clusterService, mock(Client.class), mock(AnomalyDetectionAuditor.class), mock(PersistentTasksService.class),
clusterService, mock(AnomalyDetectionAuditor.class), mock(PersistentTasksService.class),
jobConfigProvider, datafeedConfigProvider);
}
@ -293,11 +292,11 @@ public class TransportCloseJobActionTests extends ESTestCase {
@SuppressWarnings("unchecked")
private void mockJobConfigProviderExpandIds(Set<String> expandedIds) {
doAnswer(invocation -> {
ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[3];
ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[5];
listener.onResponse(expandedIds);
return null;
}).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(ActionListener.class));
}).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(), anyBoolean(), any(ActionListener.class));
}
}

View File

@ -11,8 +11,11 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@ -33,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
@ -40,6 +44,8 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
private DatafeedConfigProvider datafeedConfigProvider;
@ -206,7 +212,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<SortedSet<String>> datafeedIdsHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, actionListener),
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, null, false, actionListener),
datafeedIdsHolder, exceptionHolder);
assertNull(datafeedIdsHolder.get());
@ -215,7 +221,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
assertThat(exceptionHolder.get().getMessage(), containsString("No datafeed with id [*] exists"));
exceptionHolder.set(null);
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, actionListener),
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, null, false,actionListener),
datafeedIdsHolder, exceptionHolder);
assertNotNull(datafeedIdsHolder.get());
assertNull(exceptionHolder.get());
@ -245,24 +251,28 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get();
// Test job IDs only
// Test datafeed IDs only
SortedSet<String> expandedIds =
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, actionListener));
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true, actionListener));
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true,null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds);
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, actionListener));
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2")), expandedIds);
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, actionListener));
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, null, false, actionListener));
assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds);
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, actionListener));
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*",
true,
null,
false,
actionListener));
assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1", "foo-2")), expandedIds);
// Test full job config
// Test full datafeed config
List<DatafeedConfig.Builder> expandedDatafeedBuilders =
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, actionListener));
List<DatafeedConfig> expandedDatafeeds =
@ -290,6 +300,31 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2));
}
public void testExpandDatafeedsWithTaskData() throws Exception {
putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap());
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MlTasks.datafeedTaskId("foo-1"),
MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams("foo-1", 0L),
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<SortedSet<String>> datafeedIdsHolder = new AtomicReference<>();
// Test datafeed IDs only
SortedSet<String> expandedIds =
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", false, tasks, true, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo-1*,foo-2*", false, tasks, false, actionListener),
datafeedIdsHolder,
exceptionHolder);
assertThat(exceptionHolder.get(), is(not(nullValue())));
assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass());
assertThat(exceptionHolder.get().getMessage(), containsString("No datafeed with id [foo-1*] exists"));
}
public void testFindDatafeedsForJobIds() throws Exception {
putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap());
putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap());

View File

@ -12,10 +12,13 @@ import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
@ -42,6 +45,7 @@ import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
@ -49,6 +53,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class JobConfigProviderIT extends MlSingleNodeTestCase {
@ -256,7 +261,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<SortedSet<String>> jobIdsHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", false, true, actionListener),
blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", false, true, null, false, actionListener),
jobIdsHolder, exceptionHolder);
assertNull(jobIdsHolder.get());
@ -265,7 +270,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id"));
exceptionHolder.set(null);
blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, actionListener),
blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, null, false, actionListener),
jobIdsHolder, exceptionHolder);
assertNotNull(jobIdsHolder.get());
assertNull(exceptionHolder.get());
@ -296,21 +301,31 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
// Job Ids
SortedSet<String> expandedIds = blockingCall(actionListener ->
jobConfigProvider.expandJobsIds("_all", true, false, actionListener));
jobConfigProvider.expandJobsIds("_all", true, false, null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,harry", true, false, actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,harry",
true,
false,
null,
false,
actionListener));
assertEquals(new TreeSet<>(Arrays.asList("tom", "harry")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom", true, false, actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom",
true,
false,
null,
false,
actionListener));
assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr", "tom")), expandedIds);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<SortedSet<String>> jobIdsHolder = new AtomicReference<>();
blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, false, actionListener),
blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, false, null, false, actionListener),
jobIdsHolder, exceptionHolder);
assertNull(jobIdsHolder.get());
assertNotNull(exceptionHolder.get());
@ -357,16 +372,21 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get();
// Test job IDs only
SortedSet<String> expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, actionListener));
SortedSet<String> expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*",
true,
true,
null,
false,
actionListener));
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, true,actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, true, null, false,actionListener));
assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("bar*", true, true, actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("bar*", true, true, null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2", "nbar")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, true, actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, true, null, false, actionListener));
assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds);
// Test full job config
@ -399,16 +419,21 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get();
SortedSet<String> expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, actionListener));
SortedSet<String> expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*",
true,
true,
null,
false,
actionListener));
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, false, actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, false, null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "foo-deleting")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "bar")), expandedIds);
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, false, actionListener));
expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, false, null, false, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "foo-deleting", "bar")), expandedIds);
List<Job.Builder> expandedJobsBuilders =
@ -419,6 +444,34 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
assertThat(expandedJobsBuilders, hasSize(3));
}
public void testExpandJobIdsWithTaskData() throws Exception {
putJob(createJob("foo-1", null));
putJob(createJob("bar", null));
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MlTasks.jobTaskId("foo-2"),
MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-2"),
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<SortedSet<String>> jobIdsHolder = new AtomicReference<>();
// Test job IDs only
SortedSet<String> expandedIds =
blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", false, false, tasks, true, actionListener));
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo-1*,foo-2*", false, false, tasks, false, actionListener),
jobIdsHolder,
exceptionHolder);
assertThat(exceptionHolder.get(), is(not(nullValue())));
assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass());
assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id 'foo-2*'"));
}
public void testExpandGroups() throws Exception {
putJob(createJob("apples", Collections.singletonList("fruit")));
putJob(createJob("pears", Collections.singletonList("fruit")));