From 9fc3c7790521d91d2b491ce2140fff7892457223 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 23 Nov 2016 16:00:36 +0000 Subject: [PATCH] Allow job delete only when job is not running (elastic/elasticsearch#357) Original commit: elastic/x-pack-elasticsearch@f2959fe2ba212010c72b036c451b976c93440d1b --- .../xpack/prelert/job/manager/JobManager.java | 6 +++- .../xpack/prelert/job/messages/Messages.java | 2 +- .../job/messages/prelert_messages.properties | 4 +-- .../prelert/integration/ScheduledJobIT.java | 11 ++++++++ .../prelert/job/manager/JobManagerTests.java | 28 +++++++++++++++++++ 5 files changed, 47 insertions(+), 4 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java index d329c8a5718..1d24842d88c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java @@ -245,7 +245,7 @@ public class JobManager { public void deleteJob(DeleteJobAction.Request request, ActionListener actionListener) { String jobId = request.getJobId(); LOGGER.debug("Deleting job '" + jobId + "'"); - // NORELEASE: Should first gracefully stop any running process + ActionListener delegateListener = new ActionListener() { @Override public void onResponse(Boolean jobDeleted) { @@ -293,6 +293,10 @@ public class JobManager { if (schedulerState != null && schedulerState.getStatus() != JobSchedulerStatus.STOPPED) { throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId)); } + if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.PAUSED, JobStatus.FAILED)) { + throw ExceptionsHelper.conflictStatusException(Messages.getMessage( + Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, allocation.getStatus())); + } } ClusterState.Builder newState = ClusterState.builder(currentState); newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/messages/Messages.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/messages/Messages.java index f1ab944871a..a58f73ca49d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/messages/Messages.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/messages/Messages.java @@ -62,6 +62,7 @@ public final class Messages public static final String SYSTEM_AUDIT_STARTED = "system.audit.started"; public static final String SYSTEM_AUDIT_SHUTDOWN = "system.audit.shutdown"; + public static final String JOB_CANNOT_DELETE_WHILE_RUNNING = "job.cannot.delete.while.running"; public static final String JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS = "job.cannot.delete.while.scheduler.runs"; public static final String JOB_CANNOT_PAUSE = "job.cannot.pause"; public static final String JOB_CANNOT_RESUME = "job.cannot.resume"; @@ -198,7 +199,6 @@ public final class Messages public static final String JOB_CONFIG_SCHEDULER_MULTIPLE_AGGREGATIONS = "job.config.scheduler.multiple.aggregations"; public static final String JOB_DATA_CONCURRENT_USE_CLOSE = "job.data.concurrent.use.close"; - public static final String JOB_DATA_CONCURRENT_USE_DELETE = "job.data.concurrent.use.delete"; public static final String JOB_DATA_CONCURRENT_USE_FLUSH = "job.data.concurrent.use.flush"; public static final String JOB_DATA_CONCURRENT_USE_PAUSE = "job.data.concurrent.use.pause"; public static final String JOB_DATA_CONCURRENT_USE_RESUME = "job.data.concurrent.use.resume"; diff --git a/elasticsearch/src/main/resources/org/elasticsearch/xpack/prelert/job/messages/prelert_messages.properties b/elasticsearch/src/main/resources/org/elasticsearch/xpack/prelert/job/messages/prelert_messages.properties index c713096e23f..9ff1356f512 100644 --- a/elasticsearch/src/main/resources/org/elasticsearch/xpack/prelert/job/messages/prelert_messages.properties +++ b/elasticsearch/src/main/resources/org/elasticsearch/xpack/prelert/job/messages/prelert_messages.properties @@ -43,7 +43,8 @@ job.audit.scheduler.recovered = Scheduler has recovered data extraction and anal system.audit.started = System started system.audit.shutdown = System shut down -job.cannot.delete.while.scheduler.runs = Cannot delete job {0} while the scheduler is running +job.cannot.delete.while.running = Cannot delete job ''{0}'' while it is {1} +job.cannot.delete.while.scheduler.runs = Cannot delete job ''{0}'' while the scheduler is running job.cannot.pause = Cannot pause job ''{0}'' while its status is {1} job.cannot.resume = Cannot resume job ''{0}'' while its status is {1} @@ -149,7 +150,6 @@ job.config.scheduler.multiple.passwords = Both password and encryptedPassword we job.config.scheduler.multiple.aggregations = Both aggregations and aggs were specified - please just specify one job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job -job.data.concurrent.use.delete = Cannot delete job {0} while another connection {2}is {1} the job job.data.concurrent.use.flush = Cannot flush job {0} while another connection {2}is {1} the job job.data.concurrent.use.pause = Cannot pause job {0} while another connection {2}is {1} the job job.data.concurrent.use.resume = Cannot resume job {0} while another connection {2}is {1} the job diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java index 1f0f86905a7..60d97afb58d 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.prelert.integration; import org.apache.http.HttpHost; import org.apache.http.entity.StringEntity; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; @@ -89,12 +91,21 @@ public class ScheduledJobIT extends ESRestTestCase { } }); + ResponseException e = expectThrows(ResponseException.class, + () -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/scheduled")); + response = e.getResponse(); + assertThat(response.getStatusLine().getStatusCode(), equalTo(409)); + assertThat(responseEntityToString(response), containsString("Cannot delete job 'scheduled' while the scheduler is running")); + response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/scheduled/_stop"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); waitForSchedulerToBeStopped(); + response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/scheduled"); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); } private void createAirlineDataIndex() throws Exception { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java index a7e3a133184..88ccd07327f 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.prelert.job.manager; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.ClusterName; @@ -13,9 +14,14 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.JobStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; +import org.elasticsearch.xpack.prelert.job.metadata.Allocation; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; @@ -84,6 +90,28 @@ public class JobManagerTests extends ESTestCase { assertThat(prelertMetadata.getJobs().containsKey("foo"), is(false)); } + public void testRemoveJobFromClusterState_GivenJobIsRunning() { + JobManager jobManager = createJobManager(); + ClusterState clusterState = createClusterState(); + Job job = buildJobBuilder("foo").build(); + clusterState = jobManager.innerPutJob(job, false, clusterState); + Allocation.Builder allocation = new Allocation.Builder(); + allocation.setNodeId("myNode"); + allocation.setJobId(job.getId()); + allocation.setStatus(JobStatus.RUNNING); + PrelertMetadata.Builder newMetadata = new PrelertMetadata.Builder(clusterState.metaData().custom(PrelertMetadata.TYPE)); + newMetadata.putAllocation("myNode", job.getId()); + newMetadata.updateAllocation(job.getId(), allocation.build()); + + ClusterState jobRunningClusterState = new ClusterState.Builder(clusterState) + .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, newMetadata.build())).build(); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> jobManager.removeJobFromClusterState("foo", jobRunningClusterState)); + assertThat(e.status(), equalTo(RestStatus.CONFLICT)); + assertThat(e.getMessage(), equalTo("Cannot delete job 'foo' while it is RUNNING")); + } + public void testRemoveJobFromClusterState_jobMissing() { JobManager jobManager = createJobManager(); ClusterState clusterState = createClusterState();