From 23aef2679ddf745568b62fe9eb8a008f2aa1e6e9 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 4 Jan 2017 16:43:24 -0500 Subject: [PATCH] DELETING job status cluster state changes (elastic/elasticsearch#612) Deleting a job now starts a three-step process: 1. Job status updated to DELETING 2. Physical index is deleted 3. Job removed from cluster state When jobs are in DELETING, they cannot be modified/updated/changed at all. Only jobs that are DELETING can actually be removed from the CS. Original commit: elastic/x-pack-elasticsearch@2cd99a240cd4f5d343bdca4d79ef3143638d4f70 --- .../xpack/prelert/PrelertPlugin.java | 5 +- .../xpack/prelert/action/DeleteJobAction.java | 34 ++++++- .../xpack/prelert/job/JobStatus.java | 2 +- .../xpack/prelert/job/manager/JobManager.java | 99 ++++++++++++------- .../prelert/job/metadata/PrelertMetadata.java | 34 +++++-- .../prelert/integration/PrelertJobIT.java | 3 +- .../prelert/integration/ScheduledJobIT.java | 5 +- .../prelert/job/manager/JobManagerTests.java | 2 +- .../job/metadata/JobAllocatorTests.java | 4 +- .../metadata/JobLifeCycleServiceTests.java | 26 ++++- .../job/metadata/PrelertMetadataTests.java | 13 ++- 11 files changed, 171 insertions(+), 56 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 8a291f586e3..5599252135e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchRequestParsers; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -211,6 +212,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { // norelease: we will no longer need to pass the client here after we switch to a client based data extractor new HttpDataExtractorFactory(client, searchRequestParsers), System::currentTimeMillis); + TaskManager taskManager = new TaskManager(settings); JobLifeCycleService jobLifeCycleService = new JobLifeCycleService(settings, client, clusterService, dataProcessor, threadPool.generic()); @@ -235,7 +237,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { dataProcessor, new PrelertInitializationService(settings, threadPool, clusterService, jobProvider), jobDataCountsPersister, - scheduledJobRunner + scheduledJobRunner, + taskManager ); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/DeleteJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/DeleteJobAction.java index 4adf4e3f1f5..7fea4cff2f7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/DeleteJobAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/DeleteJobAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -23,6 +24,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.job.Job; @@ -74,6 +78,11 @@ public class DeleteJobAction extends Action { private final JobManager jobManager; + private final Client client; + private final TaskManager taskManager; @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager) { + public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, + Client client, TaskManager taskManager) { super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); this.jobManager = jobManager; + this.client = client; + this.taskManager = taskManager; } @Override @@ -155,14 +168,27 @@ public class DeleteJobAction extends Action listener) throws Exception { + jobManager.deleteJob(client, request, ActionListener.wrap(response -> { + taskManager.unregister(task); + listener.onResponse(response); + }, e -> { + taskManager.unregister(task); + listener.onFailure(e); + })); + } + @Override protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { - jobManager.deleteJob(request, listener); + throw new UnsupportedOperationException("the Task parameter is required"); } @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } + + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobStatus.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobStatus.java index 7874d952250..b2ac8a110fc 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobStatus.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobStatus.java @@ -20,7 +20,7 @@ import java.util.Locale; */ public enum JobStatus implements Writeable { - CLOSING, CLOSED, OPENING, OPENED, FAILED; + CLOSING, CLOSED, OPENING, OPENED, FAILED, DELETING; public static JobStatus fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java index 21cf860e592..939c1a16054 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java @@ -9,10 +9,14 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -196,7 +200,8 @@ public class JobManager extends AbstractComponent { public ClusterState execute(ClusterState currentState) throws Exception { ClusterState cs = updateClusterState(job, request.isOverwrite(), currentState); if (currentState.metaData().index(AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName())) != null) { - throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS, job.getIndexName())); + throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS, + AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName()))); } return cs; } @@ -209,51 +214,79 @@ public class JobManager extends AbstractComponent { return buildNewClusterState(currentState, builder); } - /** - * Deletes a job. - * - * The clean-up involves: - *
    - *
  • Deleting the index containing job results
  • - *
  • Deleting the job logs
  • - *
  • Removing the job from the cluster state
  • - *
- * - * @param request - * the delete job request - * @param actionListener - * the action listener - */ - public void deleteJob(DeleteJobAction.Request request, ActionListener actionListener) { + + public void deleteJob(Client client, DeleteJobAction.Request request, ActionListener actionListener) { + String jobId = request.getJobId(); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); LOGGER.debug("Deleting job '" + jobId + "'"); - ActionListener delegateListener = ActionListener.wrap(jobDeleted -> { + + // Step 3. Listen for the Cluster State status change + // Chain acknowledged status onto original actionListener + CheckedConsumer deleteStatusConsumer = jobDeleted -> { if (jobDeleted) { - jobProvider.deleteJobRelatedIndices(request.getJobId(), actionListener); - audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_DELETED)); + logger.info("Job [" + jobId + "] deleted."); + actionListener.onResponse(new DeleteJobAction.Response(true)); + + //nocommit: needs #626, because otherwise the audit message re-creates the index + // we just deleted. :) + //audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_DELETED)); } else { actionListener.onResponse(new DeleteJobAction.Response(false)); } - }, actionListener::onFailure); - clusterService.submitStateUpdateTask("delete-job-" + jobId, - new AckedClusterStateUpdateTask(request, delegateListener) { + }; - @Override - protected Boolean newResponse(boolean acknowledged) { - return acknowledged; + + // Step 2. Listen for the Deleted Index response + // If successful, delete from cluster state and chain onto deleteStatusListener + CheckedConsumer deleteIndexConsumer = response -> { + logger.info("Deleting index [" + indexName + "] successful"); + + if (response.isAcknowledged()) { + logger.info("Index deletion acknowledged"); + } else { + logger.warn("Index deletion not acknowledged"); + } + clusterService.submitStateUpdateTask("delete-job-" + jobId, + new AckedClusterStateUpdateTask(request, ActionListener.wrap(deleteStatusConsumer, actionListener::onFailure)) { + + @Override + protected Boolean newResponse(boolean acknowledged) { + return acknowledged && response.isAcknowledged(); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return removeJobFromState(jobId, currentState); + } + }); + + }; + + // Step 1. Update the CS to DELETING + // If successful, attempt to delete the physical index and chain + // onto deleteIndexConsumer + CheckedConsumer updateConsumer = response -> { + // Sucessfully updated the status to DELETING, begin actually deleting + if (response.isAcknowledged()) { + logger.info("Job [" + jobId + "] set to [" + JobStatus.DELETING + "]"); + } else { + logger.warn("Job [" + jobId + "] change to [" + JobStatus.DELETING + "] was not acknowledged."); } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return removeJobFromClusterState(jobId, currentState); - } - }); + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); + client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexConsumer, actionListener::onFailure)); + }; + + UpdateJobStatusAction.Request updateStatusListener = new UpdateJobStatusAction.Request(jobId, JobStatus.DELETING); + setJobStatus(updateStatusListener, ActionListener.wrap(updateConsumer, actionListener::onFailure)); + } - ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) { + ClusterState removeJobFromState(String jobId, ClusterState currentState) { PrelertMetadata.Builder builder = createPrelertMetadataBuilder(currentState); - builder.removeJob(jobId); + builder.deleteJob(jobId); return buildNewClusterState(currentState, builder); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java index b211437a43d..380e3a001c5 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.prelert.job.metadata; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.Diff; @@ -19,6 +20,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobStatus; @@ -234,8 +236,10 @@ public class PrelertMetadata implements MetaData.Custom { return this; } - public Builder removeJob(String jobId) { - if (jobs.remove(jobId) == null) { + public Builder deleteJob(String jobId) { + + Job job = jobs.remove(jobId); + if (job == null) { throw new ResourceNotFoundException("job [" + jobId + "] does not exist"); } @@ -247,10 +251,12 @@ public class PrelertMetadata implements MetaData.Custom { Allocation previousAllocation = this.allocations.remove(jobId); if (previousAllocation != null) { - if (!previousAllocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) { - throw ExceptionsHelper.conflictStatusException(Messages.getMessage( - Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, previousAllocation.getStatus())); + if (!previousAllocation.getStatus().equals(JobStatus.DELETING)) { + throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because it is in [" + + previousAllocation.getStatus() + "] state. Must be in [" + JobStatus.DELETING + "] state."); } + } else { + throw new ResourceNotFoundException("No Cluster State found for job [" + jobId + "]"); } return this; @@ -347,6 +353,22 @@ public class PrelertMetadata implements MetaData.Custom { if (previous == null) { throw new IllegalStateException("[" + jobId + "] no allocation exist to update the status to [" + jobStatus + "]"); } + + // Cannot update the status to DELETING if there are schedulers attached + if (jobStatus.equals(JobStatus.DELETING)) { + Optional scheduler = getSchedulerByJobId(jobId); + if (scheduler.isPresent()) { + throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while scheduler [" + + scheduler.get().getId() + "] refers to it"); + } + } + + + // Once a job goes into Deleting, it cannot be changed + if (previous.getStatus().equals(JobStatus.DELETING)) { + throw new ElasticsearchStatusException("Cannot change status of job [" + jobId + "] to [" + jobStatus + "] because " + + "it is currently in [" + JobStatus.DELETING + "] status.", RestStatus.CONFLICT); + } Allocation.Builder builder = new Allocation.Builder(previous); builder.setStatus(jobStatus); if (reason != null) { @@ -404,4 +426,4 @@ public class PrelertMetadata implements MetaData.Custom { } } -} +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java index 05256c18b2d..c6468b7cd03 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java @@ -107,7 +107,8 @@ public class PrelertJobIT extends ESRestTestCase { "\"time_field\":\"time\",\n" + " \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n" + " }\n" + "}"; - return client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors", Collections.emptyMap(), new StringEntity(job)); + return client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors" , + Collections.emptyMap(), new StringEntity(job)); } public void testGetBucketResults() throws Exception { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java index 1396a59918f..c8eff21fda8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java @@ -115,7 +115,8 @@ public class ScheduledJobIT extends ESRestTestCase { + " \"time_field\":\"time\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n" + "}"; - return client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors", Collections.emptyMap(), new StringEntity(job)); + return client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors", + Collections.emptyMap(), new StringEntity(job)); } private Response createScheduler(String schedulerId, String jobId) throws IOException { @@ -137,7 +138,7 @@ public class ScheduledJobIT extends ESRestTestCase { } @After - public void clearPrelertState() throws IOException { + public void clearPrelertState() throws Exception { new PrelertRestTestStateCleaner(client(), this).clearPrelertMetadata(); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java index 0700316b345..12110ab4d21 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java @@ -185,7 +185,7 @@ public class JobManagerTests extends ESTestCase { } })); - assertEquals("Cannot create index 'my-special-place' as it already exists", e.getMessage()); + assertEquals("Cannot create index '.ml-anomalies-my-special-place' as it already exists", e.getMessage()); } private JobManager createJobManager() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java index c92cf78463c..992d529a53b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.prelert.job.JobStatus; import org.junit.Before; import java.util.concurrent.ExecutorService; @@ -99,7 +100,8 @@ public class JobAllocatorTests extends ESTestCase { expectThrows(IllegalStateException.class, () -> jobAllocator.assignJobsToNodes(cs3)); pmBuilder = new PrelertMetadata.Builder(result1.getMetaData().custom(PrelertMetadata.TYPE)); - pmBuilder.removeJob("my_job_id"); + pmBuilder.updateStatus("my_job_id", JobStatus.DELETING, null); + pmBuilder.deleteJob("my_job_id"); ClusterState cs4 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java index 0c9a9e8ea77..5acaf79f4ed 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.prelert.job.metadata; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -131,12 +132,12 @@ public class JobLifeCycleServiceTests extends ESTestCase { verify(dataProcessor, times(1)).closeJob("my_job_id"); } - public void testClusterChanged_allocationRemovedStopJob() { + public void testClusterChanged_allocationDeletingJob() { jobLifeCycleService.localAssignedJobs.add("my_job_id"); PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("my_job_id").build(), false); - pmBuilder.removeJob("my_job_id"); + pmBuilder.updateStatus("my_job_id", JobStatus.DELETING, null); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() @@ -144,10 +145,31 @@ public class JobLifeCycleServiceTests extends ESTestCase { .localNodeId("_node_id")) .build(); jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1)); + assertEquals(jobLifeCycleService.localAssignedJobs.size(), 1); + + + pmBuilder.deleteJob("my_job_id"); + ClusterState cs2 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() + .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) + .localNodeId("_node_id")) + .build(); + jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs2, cs1)); + assertEquals(jobLifeCycleService.localAssignedJobs.size(), 0); verify(dataProcessor, times(1)).closeJob("my_job_id"); } + public void testClusterChanged_allocationDeletingClosedJob() { + jobLifeCycleService.localAssignedJobs.add("my_job_id"); + + PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); + pmBuilder.putJob(buildJobBuilder("my_job_id").build(), false); + + expectThrows(ElasticsearchStatusException.class, () -> pmBuilder.deleteJob("my_job_id")); + } + public void testStart_openJobFails() { doThrow(new RuntimeException("error")).when(dataProcessor).openJob("my_job_id", false); Allocation.Builder allocation = new Allocation.Builder(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java index fb230d6984f..19555e1c253 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java @@ -131,7 +131,12 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase builder2.removeJob("1")); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder2.deleteJob("1")); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -162,7 +167,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase builder.removeJob(job1.getId())); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.deleteJob(job1.getId())); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); String expectedMsg = "Cannot delete job [" + job1.getId() + "] while scheduler [" + schedulerConfig1.getId() + "] refers to it"; assertThat(e.getMessage(), equalTo(expectedMsg)); @@ -170,7 +175,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase builder1.removeJob("1")); + expectThrows(ResourceNotFoundException.class, () -> builder1.deleteJob("1")); } public void testCrudScheduler() {