diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/JobAndDatafeedResilienceIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/JobAndDatafeedResilienceIT.java new file mode 100644 index 00000000000..b7167ea118b --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/JobAndDatafeedResilienceIT.java @@ -0,0 +1,241 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.CloseJobAction; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.junit.After; + +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; +import static org.hamcrest.CoreMatchers.equalTo; + +public class JobAndDatafeedResilienceIT extends MlNativeAutodetectIntegTestCase { + + private String index = "empty_index"; + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void testCloseOpenJobWithMissingConfig() throws Exception { + final String jobId = "job-with-missing-config"; + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + + putJob(job); + openJob(job.getId()); + + client().prepareDelete(AnomalyDetectorsIndexFields.CONFIG_INDEX, "_doc", Job.documentId(jobId)).get(); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndexFields.CONFIG_INDEX).get(); + + ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> { + CloseJobAction.Request request = new CloseJobAction.Request(jobId); + request.setAllowNoJobs(false); + client().execute(CloseJobAction.INSTANCE, request).actionGet(); + }); + assertThat(ex.getMessage(), equalTo("No known job with id 'job-with-missing-config'")); + + forceCloseJob(jobId); + assertBusy(() -> + assertThat(client().admin() + .cluster() + .prepareListTasks() + .setActions(MlTasks.JOB_TASK_NAME + "[c]") + .get() + .getTasks() + .size(), equalTo(0)) + ); + } + + public void testStopStartedDatafeedWithMissingConfig() throws Exception { + client().admin().indices().prepareCreate(index) + .addMapping("type", "time", "type=date", "value", "type=long") + .get(); + final String jobId = "job-with-missing-datafeed-with-config"; + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + + DatafeedConfig.Builder datafeedConfigBuilder = + createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, null); + + client().prepareDelete(AnomalyDetectorsIndexFields.CONFIG_INDEX, "_doc", DatafeedConfig.documentId(datafeedConfig.getId())).get(); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndexFields.CONFIG_INDEX).get(); + + ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> { + StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedConfig.getId()); + request.setAllowNoDatafeeds(false); + client().execute(StopDatafeedAction.INSTANCE, request).actionGet(); + }); + assertThat(ex.getMessage(), equalTo("No datafeed with id [job-with-missing-datafeed-with-config-datafeed] exists")); + + + forceStopDatafeed(datafeedConfig.getId()); + assertBusy(() -> + assertThat(client().admin() + .cluster() + .prepareListTasks() + .setActions(MlTasks.DATAFEED_TASK_NAME + "[c]") + .get() + .getTasks() + .size(), equalTo(0)) + ); + closeJob(jobId); + waitUntilJobIsClosed(jobId); + } + + public void testGetJobStats() throws Exception { + final String jobId1 = "job-with-missing-config-stats"; + final String jobId2 = "job-with-config-stats"; + + Job.Builder job1 = createJob(jobId1, TimeValue.timeValueMinutes(5), "count", null); + Job.Builder job2 = createJob(jobId2, TimeValue.timeValueMinutes(5), "count", null); + + putJob(job1); + openJob(job1.getId()); + registerJob(job2); + putJob(job2); + openJob(job2.getId()); + + client().prepareDelete(AnomalyDetectorsIndexFields.CONFIG_INDEX, "_doc", Job.documentId(jobId1)).get(); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndexFields.CONFIG_INDEX).get(); + + List jobStats = client().execute(GetJobsStatsAction.INSTANCE, + new GetJobsStatsAction.Request("*")) + .get() + .getResponse() + .results(); + assertThat(jobStats.size(), equalTo(2)); + assertThat(jobStats.get(0).getJobId(), equalTo(jobId2)); + assertThat(jobStats.get(1).getJobId(), equalTo(jobId1)); + forceCloseJob(jobId1); + closeJob(jobId2); + assertBusy(() -> + assertThat(client().admin() + .cluster() + .prepareListTasks() + .setActions(MlTasks.JOB_TASK_NAME + "[c]") + .get() + .getTasks() + .size(), equalTo(0)) + ); + } + + public void testGetDatafeedStats() throws Exception { + client().admin().indices().prepareCreate(index) + .addMapping("type", "time", "type=date", "value", "type=long") + .get(); + final String jobId1 = "job-with-datafeed-missing-config-stats"; + final String jobId2 = "job-with-datafeed-config-stats"; + + Job.Builder job1 = createJob(jobId1, TimeValue.timeValueMinutes(5), "count", null); + Job.Builder job2 = createJob(jobId2, TimeValue.timeValueMinutes(5), "count", null); + + registerJob(job1); + putJob(job1); + openJob(job1.getId()); + registerJob(job2); + putJob(job2); + openJob(job2.getId()); + + DatafeedConfig.Builder datafeedConfigBuilder1 = + createDatafeedBuilder(job1.getId() + "-datafeed", job1.getId(), Collections.singletonList(index)); + DatafeedConfig datafeedConfig1 = datafeedConfigBuilder1.build(); + + putDatafeed(datafeedConfig1); + startDatafeed(datafeedConfig1.getId(), 0L, null); + + DatafeedConfig.Builder datafeedConfigBuilder2 = + createDatafeedBuilder(job2.getId() + "-datafeed", job2.getId(), Collections.singletonList(index)); + DatafeedConfig datafeedConfig2 = datafeedConfigBuilder2.build(); + + putDatafeed(datafeedConfig2); + startDatafeed(datafeedConfig2.getId(), 0L, null); + + client().prepareDelete(AnomalyDetectorsIndexFields.CONFIG_INDEX, "_doc", DatafeedConfig.documentId(datafeedConfig1.getId())).get(); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndexFields.CONFIG_INDEX).get(); + + List dfStats = client().execute(GetDatafeedsStatsAction.INSTANCE, + new GetDatafeedsStatsAction.Request("*")) + .get() + .getResponse() + .results(); + assertThat(dfStats.size(), equalTo(2)); + assertThat(dfStats.get(0).getDatafeedId(), equalTo(datafeedConfig2.getId())); + assertThat(dfStats.get(1).getDatafeedId(), equalTo(datafeedConfig1.getId())); + + forceStopDatafeed(datafeedConfig1.getId()); + stopDatafeed(datafeedConfig2.getId()); + assertBusy(() -> + assertThat(client().admin() + .cluster() + .prepareListTasks() + .setActions(MlTasks.DATAFEED_TASK_NAME + "[c]") + .get() + .getTasks() + .size(), equalTo(0)) + ); + closeJob(jobId1); + closeJob(jobId2); + waitUntilJobIsClosed(jobId1); + waitUntilJobIsClosed(jobId2); + } + + private CloseJobAction.Response forceCloseJob(String jobId) { + CloseJobAction.Request request = new CloseJobAction.Request(jobId); + request.setForce(true); + return client().execute(CloseJobAction.INSTANCE, request).actionGet(); + } + + private StopDatafeedAction.Response forceStopDatafeed(String datafeedId) { + StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); + request.setForce(true); + return client().execute(StopDatafeedAction.INSTANCE, request).actionGet(); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { + return createJob(id, bucketSpan, function, field, null); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeField("time"); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder(function, field); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(bucketSpan); + analysisConfig.setSummaryCountFieldName(summaryCountField); + + Job.Builder builder = new Job.Builder(); + builder.setId(id); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index c79ee507bd9..3ea419c7e81 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -15,7 +15,6 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -55,7 +54,6 @@ public class TransportCloseJobAction extends TransportTasksAction { validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap( - response -> { - request.setOpenJobIds(response.openJobIds.toArray(new String[0])); - if (response.openJobIds.isEmpty() && response.closingJobIds.isEmpty()) { - listener.onResponse(new CloseJobAction.Response(true)); - return; - } + response -> { + request.setOpenJobIds(response.openJobIds.toArray(new String[0])); + if (response.openJobIds.isEmpty() && response.closingJobIds.isEmpty()) { + listener.onResponse(new CloseJobAction.Response(true)); + return; + } - if (request.isForce()) { - List jobIdsToForceClose = new ArrayList<>(response.openJobIds); - jobIdsToForceClose.addAll(response.closingJobIds); - forceCloseJob(state, request, jobIdsToForceClose, listener); - } else { - Set executorNodes = new HashSet<>(); - PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - for (String resolvedJobId : request.getOpenJobIds()) { - PersistentTasksCustomMetaData.PersistentTask jobTask = - MlTasks.getJobTask(resolvedJobId, tasks); - - if (jobTask == null) { - // This should not happen, because openJobIds was - // derived from the same tasks metadata as jobTask - String msg = "Requested job [" + resolvedJobId - + "] be stopped, but job's task could not be found."; - assert jobTask != null : msg; - logger.error(msg); - } else if (jobTask.isAssigned()) { - executorNodes.add(jobTask.getExecutorNode()); - } else { - // This is the easy case - the job is not currently assigned to a node, so can - // be gracefully stopped simply by removing its persistent task. (Usually a - // graceful stop cannot be achieved by simply removing the persistent task, but - // if the job has no running code then graceful/forceful are basically the same.) - // The listener here can be a no-op, as waitForJobClosed() already waits for - // these persistent tasks to disappear. - persistentTasksService.sendRemoveRequest(jobTask.getId(), - ActionListener.wrap( - r -> logger.trace("[{}] removed task to close unassigned job", resolvedJobId), - e -> logger.error("[" + resolvedJobId - + "] failed to remove task to close unassigned job", e) - )); - } + if (request.isForce()) { + List jobIdsToForceClose = new ArrayList<>(response.openJobIds); + jobIdsToForceClose.addAll(response.closingJobIds); + forceCloseJob(state, request, jobIdsToForceClose, listener); + } else { + Set executorNodes = new HashSet<>(); + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + for (String resolvedJobId : request.getOpenJobIds()) { + PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(resolvedJobId, tasks); + if (jobTask == null) { + // This should not happen, because openJobIds was + // derived from the same tasks metadata as jobTask + String msg = "Requested job [" + resolvedJobId + + "] be stopped, but job's task could not be found."; + assert jobTask != null : msg; + logger.error(msg); + } else if (jobTask.isAssigned()) { + executorNodes.add(jobTask.getExecutorNode()); + } else { + // This is the easy case - the job is not currently assigned to a node, so can + // be gracefully stopped simply by removing its persistent task. (Usually a + // graceful stop cannot be achieved by simply removing the persistent task, but + // if the job has no running code then graceful/forceful are basically the same.) + // The listener here can be a no-op, as waitForJobClosed() already waits for + // these persistent tasks to disappear. + persistentTasksService.sendRemoveRequest(jobTask.getId(), + ActionListener.wrap( + r -> logger.trace("[{}] removed task to close unassigned job", resolvedJobId), + e -> logger.error("[" + resolvedJobId + + "] failed to remove task to close unassigned job", e) + )); } - request.setNodes(executorNodes.toArray(new String[0])); - - normalCloseJob(state, task, request, response.openJobIds, response.closingJobIds, listener); } + request.setNodes(executorNodes.toArray(new String[0])); + + normalCloseJob(state, task, request, response.openJobIds, response.closingJobIds, listener); + } }, - listener::onFailure - )); + listener::onFailure + )); }, - listener::onFailure + listener::onFailure )); } } @@ -189,7 +189,7 @@ public class TransportCloseJobAction extends TransportTasksAction expandedJobIds, boolean forceClose, PersistentTasksCustomMetaData tasksMetaData, - ActionListener listener) { + ActionListener listener) { checkDatafeedsHaveStopped(expandedJobIds, tasksMetaData, ActionListener.wrap( response -> { @@ -221,14 +221,14 @@ public class TransportCloseJobAction extends TransportTasksAction jobIds, PersistentTasksCustomMetaData tasksMetaData, - ActionListener listener) { + ActionListener listener) { datafeedConfigProvider.findDatafeedsForJobIds(jobIds, ActionListener.wrap( datafeedIds -> { for (String datafeedId : datafeedIds) { DatafeedState datafeedState = MlTasks.getDatafeedState(datafeedId, tasksMetaData); if (datafeedState != DatafeedState.STOPPED) { listener.onFailure(ExceptionsHelper.conflictStatusException( - "cannot close job datafeed [{}] hasn't been stopped", datafeedId)); + "cannot close job datafeed [{}] hasn't been stopped", datafeedId)); return; } } @@ -239,7 +239,7 @@ public class TransportCloseJobAction extends TransportTasksAction openJobs, List closingJobs, List failedJobs) { + List openJobs, List closingJobs, List failedJobs) { JobState jobState = MlTasks.getJobState(jobId, tasksMetaData); switch (jobState) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index cdd90b0bb69..260b6171a64 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -209,7 +209,17 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finalListener.onFailure(e)); + e -> { + if (request.isForce() + && MlTasks.getJobTask(request.getJobId(), state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)) != null) { + logger.info( + "[{}] config is missing but task exists. Attempting to delete tasks and stop process", + request.getJobId()); + forceDeleteJob(parentTaskClient, request, finalListener); + } else { + finalListener.onFailure(e); + } + }); // First check that the job exists, because we don't want to audit // the beginning of its deletion if it didn't exist in the first place diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index 7e6f45d0fbe..150742b7609 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -32,6 +32,9 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.function.Function; import java.util.stream.Collectors; public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction listener) throws Exception { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); + final PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + ActionListener> expandIdsListener = ActionListener.wrap( + expandedIds -> { + datafeedConfigProvider.expandDatafeedConfigs( + request.getDatafeedId(), + // Already took into account the request parameter when we expanded the IDs with the tasks earlier + // Should allow for no datafeeds in case the config is gone + true, + ActionListener.wrap( + datafeedBuilders -> { + Map existingConfigs = datafeedBuilders.stream() + .map(DatafeedConfig.Builder::build) + .collect(Collectors.toMap(DatafeedConfig::getId, Function.identity())); - datafeedConfigProvider.expandDatafeedConfigs( - request.getDatafeedId(), - request.allowNoDatafeeds(), - ActionListener.wrap( - datafeedBuilders -> { - List jobIds = - datafeedBuilders.stream() - .map(DatafeedConfig.Builder::build) - .map(DatafeedConfig::getJobId) - .collect(Collectors.toList()); - jobResultsProvider.datafeedTimingStats( - jobIds, - timingStatsByJobId -> { - PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - List results = - datafeedBuilders.stream() - .map(DatafeedConfig.Builder::build) - .map( - datafeed -> getDatafeedStats( - datafeed.getId(), - state, - tasksInProgress, - datafeed.getJobId(), - timingStatsByJobId.get(datafeed.getJobId()))) - .collect(Collectors.toList()); - QueryPage statsPage = - new QueryPage<>(results, results.size(), DatafeedConfig.RESULTS_FIELD); - listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage)); + List jobIds = existingConfigs.values() + .stream() + .map(DatafeedConfig::getJobId) + .collect(Collectors.toList()); + jobResultsProvider.datafeedTimingStats( + jobIds, + timingStatsByJobId -> { + List results = expandedIds.stream() + .map(datafeedId -> { + DatafeedConfig config = existingConfigs.get(datafeedId); + String jobId = config == null ? null : config.getJobId(); + DatafeedTimingStats timingStats = jobId == null ? null : timingStatsByJobId.get(jobId); + return buildDatafeedStats( + datafeedId, + state, + tasksInProgress, + jobId, + timingStats + ); + }) + .collect(Collectors.toList()); + QueryPage statsPage = + new QueryPage<>(results, results.size(), DatafeedConfig.RESULTS_FIELD); + listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage)); + }, + listener::onFailure); }, - listener::onFailure); - }, - listener::onFailure) + listener::onFailure) + ); + }, + listener::onFailure ); + + // This might also include datafeed tasks that exist but no longer have a config + datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), + request.allowNoDatafeeds(), + tasksInProgress, + true, + expandIdsListener); } - private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(String datafeedId, - ClusterState state, - PersistentTasksCustomMetaData tasks, - String jobId, - DatafeedTimingStats timingStats) { + private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats(String datafeedId, + ClusterState state, + PersistentTasksCustomMetaData tasks, + String jobId, + DatafeedTimingStats timingStats) { PersistentTasksCustomMetaData.PersistentTask task = MlTasks.getDatafeedTask(datafeedId, tasks); DatafeedState datafeedState = MlTasks.getDatafeedState(datafeedId, tasks); DiscoveryNode node = null; @@ -116,7 +138,7 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct node = state.nodes().get(task.getExecutorNode()); explanation = task.getAssignment().getExplanation(); } - if (timingStats == null) { + if (timingStats == null && jobId != null) { timingStats = new DatafeedTimingStats(jobId); } return new GetDatafeedsStatsAction.Response.DatafeedStats(datafeedId, datafeedState, node, explanation, timingStats); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index d11abcba7e5..00581c9a2c2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -75,7 +75,10 @@ public class TransportGetJobsStatsAction extends TransportTasksAction finalListener) { logger.debug("Get stats for job [{}]", request.getJobId()); - jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap( + ClusterState state = clusterService.state(); + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + // If there are deleted configs, but the task is still around, we probably want to return the tasks in the stats call + jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, tasks, true, ActionListener.wrap( expandedIds -> { request.setExpandedJobsIds(new ArrayList<>(expandedIds)); ActionListener jobStatsListener = ActionListener.wrap( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportIsolateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportIsolateDatafeedAction.java index 07ab911ebc3..d9c4a380234 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportIsolateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportIsolateDatafeedAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -44,9 +43,6 @@ public class TransportIsolateDatafeedAction extends TransportTasksAction task.markAsCompleted(), - e -> logger.error("error finalizing job [" + jobId + "]", e) + e -> { + logger.error("error finalizing job [" + jobId + "]", e); + Throwable unwrapped = ExceptionsHelper.unwrapCause(e); + if (unwrapped instanceof DocumentMissingException || unwrapped instanceof ResourceNotFoundException) { + task.markAsCompleted(); + } else { + task.markAsFailed(e); + } + } )); } else { task.markAsCompleted(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index d90f175e46b..67e499598ec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -131,10 +131,13 @@ public class TransportStopDatafeedAction extends TransportTasksAction(listener, StopDatafeedAction.Response::new)); } } else { - datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), + request.allowNoDatafeeds(), + tasks, + request.isForce(), + ActionListener.wrap( expandedIds -> { - PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - List startedDatafeeds = new ArrayList<>(); List stoppingDatafeeds = new ArrayList<>(); List notStoppedDatafeeds = new ArrayList<>(); @@ -257,7 +260,9 @@ public class TransportStopDatafeedAction extends TransportTasksAction listener) { DatafeedState taskState = DatafeedState.STOPPING; - datafeedTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> { + datafeedTask.updatePersistentTaskState(taskState, + ActionListener.wrap( + task -> { // we need to fork because we are now on a network threadpool threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index bcdf3753028..62202ffb799 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -42,9 +42,12 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.query.WildcardQueryBuilder; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -350,9 +353,15 @@ public class DatafeedConfigProvider { * @param allowNoDatafeeds if {@code false}, an error is thrown when no name matches the {@code expression}. * This only applies to wild card expressions, if {@code expression} is not a * wildcard then setting this true will not suppress the exception + * @param tasks The current tasks meta-data. For expanding IDs when datafeeds might have missing configurations + * @param allowMissingConfigs If a datafeed has a task, but is missing a config, allow the ID to be expanded via the existing task * @param listener The expanded datafeed IDs listener */ - public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener> listener) { + public void expandDatafeedIds(String expression, + boolean allowNoDatafeeds, + PersistentTasksCustomMetaData tasks, + boolean allowMissingConfigs, + ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); @@ -366,6 +375,7 @@ public class DatafeedConfigProvider { .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); + Collection matchingStartedDatafeedIds = matchingDatafeedIdsWithTasks(tokens, tasks); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.wrap( @@ -375,6 +385,9 @@ public class DatafeedConfigProvider { for (SearchHit hit : hits) { datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue()); } + if (allowMissingConfigs) { + datafeedIds.addAll(matchingStartedDatafeedIds); + } requiredMatches.filterMatchedIds(datafeedIds); if (requiredMatches.hasUnmatchedIds()) { @@ -391,10 +404,10 @@ public class DatafeedConfigProvider { } /** - * The same logic as {@link #expandDatafeedIds(String, boolean, ActionListener)} but + * The same logic as {@link #expandDatafeedIds(String, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)} but * the full datafeed configuration is returned. * - * See {@link #expandDatafeedIds(String, boolean, ActionListener)} + * See {@link #expandDatafeedIds(String, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)} * * @param expression the expression to resolve * @param allowNoDatafeeds if {@code false}, an error is thrown when no name matches the {@code expression}. @@ -478,6 +491,30 @@ public class DatafeedConfigProvider { return boolQueryBuilder; } + static Collection matchingDatafeedIdsWithTasks(String[] datafeedIdPatterns, PersistentTasksCustomMetaData tasksMetaData) { + Set startedDatafeedIds = MlTasks.startedDatafeedIds(tasksMetaData); + if (startedDatafeedIds.isEmpty()) { + return Collections.emptyList() ; + } + if (Strings.isAllOrWildcard(datafeedIdPatterns)) { + return startedDatafeedIds; + } + + List matchingDatafeedIds = new ArrayList<>(); + for (String datafeedIdPattern : datafeedIdPatterns) { + if (startedDatafeedIds.contains(datafeedIdPattern)) { + matchingDatafeedIds.add(datafeedIdPattern); + } else if (Regex.isSimpleMatchPattern(datafeedIdPattern)) { + for (String startedDatafeedId : startedDatafeedIds) { + if (Regex.simpleMatch(datafeedIdPattern, startedDatafeedId)) { + matchingDatafeedIds.add(startedDatafeedId); + } + } + } + } + return matchingDatafeedIds; + } + private QueryBuilder buildDatafeedJobIdsQuery(Collection jobIds) { BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.filter(new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 91fdfdbc26d..dcea7bf72b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.regex.Regex; @@ -50,11 +51,13 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.query.WildcardQueryBuilder; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -502,9 +505,17 @@ public class JobConfigProvider { * This only applies to wild card expressions, if {@code expression} is not a * wildcard then setting this true will not suppress the exception * @param excludeDeleting If true exclude jobs marked as deleting + * @param tasksCustomMetaData The current persistent task metadata. + * For resolving jobIds that have tasks, but for some reason, don't have configs + * @param allowMissingConfigs If a job has a task, but is missing a config, allow the ID to be expanded via the existing task * @param listener The expanded job Ids listener */ - public void expandJobsIds(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener> listener) { + public void expandJobsIds(String expression, + boolean allowNoJobs, + boolean excludeDeleting, + @Nullable PersistentTasksCustomMetaData tasksCustomMetaData, + boolean allowMissingConfigs, + ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); sourceBuilder.sort(Job.ID.getPreferredName()); @@ -519,6 +530,7 @@ public class JobConfigProvider { .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); + Set openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetaData); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.wrap( @@ -533,7 +545,9 @@ public class JobConfigProvider { groupsIds.addAll(groups.stream().map(Object::toString).collect(Collectors.toList())); } } - + if (allowMissingConfigs) { + jobIds.addAll(openMatchingJobs); + } groupsIds.addAll(jobIds); requiredMatches.filterMatchedIds(groupsIds); if (requiredMatches.hasUnmatchedIds()) { @@ -550,10 +564,10 @@ public class JobConfigProvider { } /** - * The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but + * The same logic as {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)} but * the full anomaly detector job configuration is returned. * - * See {@link #expandJobsIds(String, boolean, boolean, ActionListener)} + * See {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)} * * @param expression the expression to resolve * @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}. @@ -611,7 +625,7 @@ public class JobConfigProvider { /** * Expands the list of job group Ids to the set of jobs which are members of the groups. - * Unlike {@link #expandJobsIds(String, boolean, boolean, ActionListener)} it is not an error + * Unlike {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetaData, boolean, ActionListener)} it is not an error * if a group Id does not exist. * Wildcard expansion of group Ids is not supported. * @@ -735,6 +749,31 @@ public class JobConfigProvider { )); } + static Set matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetaData tasksMetaData) { + Set openjobs = MlTasks.openJobIds(tasksMetaData); + if (openjobs.isEmpty()) { + return Collections.emptySet(); + } + if (Strings.isAllOrWildcard(jobIdPatterns)) { + return openjobs; + } + + Set matchingJobIds = new HashSet<>(); + for (String jobIdPattern : jobIdPatterns) { + if (openjobs.contains(jobIdPattern)) { + matchingJobIds.add(jobIdPattern); + } else if (Regex.isSimpleMatchPattern(jobIdPattern)) { + for (String openJobId : openjobs) { + if (Regex.simpleMatch(jobIdPattern, openJobId)) { + matchingJobIds.add(openJobId); + } + } + } + } + return matchingJobIds; + } + + private void parseJobLenientlyFromSource(BytesReference source, ActionListener jobListener) { try (InputStream stream = source.streamInput(); XContentParser parser = XContentFactory.xContent(XContentType.JSON) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index 61059e08480..998dfd32d70 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; @@ -276,7 +275,7 @@ public class TransportCloseJobActionTests extends ESTestCase { private TransportCloseJobAction createAction() { return new TransportCloseJobAction(mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class), - clusterService, mock(Client.class), mock(AnomalyDetectionAuditor.class), mock(PersistentTasksService.class), + clusterService, mock(AnomalyDetectionAuditor.class), mock(PersistentTasksService.class), jobConfigProvider, datafeedConfigProvider); } @@ -293,11 +292,11 @@ public class TransportCloseJobActionTests extends ESTestCase { @SuppressWarnings("unchecked") private void mockJobConfigProviderExpandIds(Set expandedIds) { doAnswer(invocation -> { - ActionListener> listener = (ActionListener>) invocation.getArguments()[3]; + ActionListener> listener = (ActionListener>) invocation.getArguments()[5]; listener.onResponse(expandedIds); return null; - }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(ActionListener.class)); + }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(), anyBoolean(), any(ActionListener.class)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 9496f4ca0d8..c031b78f64a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -11,8 +11,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -33,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -40,6 +44,8 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; public class DatafeedConfigProviderIT extends MlSingleNodeTestCase { private DatafeedConfigProvider datafeedConfigProvider; @@ -206,7 +212,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase { AtomicReference> datafeedIdsHolder = new AtomicReference<>(); AtomicReference exceptionHolder = new AtomicReference<>(); - blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, actionListener), + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, null, false, actionListener), datafeedIdsHolder, exceptionHolder); assertNull(datafeedIdsHolder.get()); @@ -215,7 +221,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase { assertThat(exceptionHolder.get().getMessage(), containsString("No datafeed with id [*] exists")); exceptionHolder.set(null); - blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, actionListener), + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, null, false,actionListener), datafeedIdsHolder, exceptionHolder); assertNotNull(datafeedIdsHolder.get()); assertNull(exceptionHolder.get()); @@ -245,24 +251,28 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase { client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); - // Test job IDs only + // Test datafeed IDs only SortedSet expandedIds = - blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, actionListener)); + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); - expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true, actionListener)); + expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true,null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds); - expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, actionListener)); + expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2")), expandedIds); - expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, actionListener)); + expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, null, false, actionListener)); assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds); - expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, actionListener)); + expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", + true, + null, + false, + actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1", "foo-2")), expandedIds); - // Test full job config + // Test full datafeed config List expandedDatafeedBuilders = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, actionListener)); List expandedDatafeeds = @@ -290,6 +300,31 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase { assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2)); } + public void testExpandDatafeedsWithTaskData() throws Exception { + putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap()); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.datafeedTaskId("foo-1"), + MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams("foo-1", 0L), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference> datafeedIdsHolder = new AtomicReference<>(); + // Test datafeed IDs only + SortedSet expandedIds = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", false, tasks, true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); + + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo-1*,foo-2*", false, tasks, false, actionListener), + datafeedIdsHolder, + exceptionHolder); + assertThat(exceptionHolder.get(), is(not(nullValue()))); + assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass()); + assertThat(exceptionHolder.get().getMessage(), containsString("No datafeed with id [foo-1*] exists")); + } + public void testFindDatafeedsForJobIds() throws Exception { putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap()); putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index b1879eb07f1..371063b47d2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -12,10 +12,13 @@ import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -42,6 +45,7 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -49,6 +53,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.core.IsInstanceOf.instanceOf; public class JobConfigProviderIT extends MlSingleNodeTestCase { @@ -256,7 +261,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase { AtomicReference> jobIdsHolder = new AtomicReference<>(); AtomicReference exceptionHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", false, true, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", false, true, null, false, actionListener), jobIdsHolder, exceptionHolder); assertNull(jobIdsHolder.get()); @@ -265,7 +270,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase { assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id")); exceptionHolder.set(null); - blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, null, false, actionListener), jobIdsHolder, exceptionHolder); assertNotNull(jobIdsHolder.get()); assertNull(exceptionHolder.get()); @@ -296,21 +301,31 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase { // Job Ids SortedSet expandedIds = blockingCall(actionListener -> - jobConfigProvider.expandJobsIds("_all", true, false, actionListener)); + jobConfigProvider.expandJobsIds("_all", true, false, null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,harry", true, false, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,harry", + true, + false, + null, + false, + actionListener)); assertEquals(new TreeSet<>(Arrays.asList("tom", "harry")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom", true, false, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom", + true, + false, + null, + false, + actionListener)); assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr", "tom")), expandedIds); AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference> jobIdsHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, false, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, false, null, false, actionListener), jobIdsHolder, exceptionHolder); assertNull(jobIdsHolder.get()); assertNotNull(exceptionHolder.get()); @@ -357,16 +372,21 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase { client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); // Test job IDs only - SortedSet expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, actionListener)); + SortedSet expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", + true, + true, + null, + false, + actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, true,actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, true, null, false,actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("bar*", true, true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("bar*", true, true, null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2", "nbar")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, true, null, false, actionListener)); assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds); // Test full job config @@ -399,16 +419,21 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase { client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); - SortedSet expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, actionListener)); + SortedSet expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", + true, + true, + null, + false, + actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, false, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, false, null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "foo-deleting")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "bar")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, false, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, false, null, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "foo-deleting", "bar")), expandedIds); List expandedJobsBuilders = @@ -419,6 +444,34 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase { assertThat(expandedJobsBuilders, hasSize(3)); } + public void testExpandJobIdsWithTaskData() throws Exception { + putJob(createJob("foo-1", null)); + putJob(createJob("bar", null)); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId("foo-2"), + MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-2"), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + // Test job IDs only + SortedSet expandedIds = + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", false, false, tasks, true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); + + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo-1*,foo-2*", false, false, tasks, false, actionListener), + jobIdsHolder, + exceptionHolder); + assertThat(exceptionHolder.get(), is(not(nullValue()))); + assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass()); + assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id 'foo-2*'")); + } + public void testExpandGroups() throws Exception { putJob(createJob("apples", Collections.singletonList("fruit"))); putJob(createJob("pears", Collections.singletonList("fruit")));