Allow job delete only when job is not running (elastic/elasticsearch#357)

Original commit: elastic/x-pack-elasticsearch@f2959fe2ba
This commit is contained in:
Dimitris Athanasiou 2016-11-23 16:00:36 +00:00 committed by GitHub
parent 4dc20467cb
commit 9fc3c77905
5 changed files with 47 additions and 4 deletions

View File

@ -245,7 +245,7 @@ public class JobManager {
public void deleteJob(DeleteJobAction.Request request, ActionListener<DeleteJobAction.Response> actionListener) {
String jobId = request.getJobId();
LOGGER.debug("Deleting job '" + jobId + "'");
// NORELEASE: Should first gracefully stop any running process
ActionListener<Boolean> delegateListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean jobDeleted) {
@ -293,6 +293,10 @@ public class JobManager {
if (schedulerState != null && schedulerState.getStatus() != JobSchedulerStatus.STOPPED) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId));
}
if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.PAUSED, JobStatus.FAILED)) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(
Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, allocation.getStatus()));
}
}
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build());

View File

@ -62,6 +62,7 @@ public final class Messages
public static final String SYSTEM_AUDIT_STARTED = "system.audit.started";
public static final String SYSTEM_AUDIT_SHUTDOWN = "system.audit.shutdown";
public static final String JOB_CANNOT_DELETE_WHILE_RUNNING = "job.cannot.delete.while.running";
public static final String JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS = "job.cannot.delete.while.scheduler.runs";
public static final String JOB_CANNOT_PAUSE = "job.cannot.pause";
public static final String JOB_CANNOT_RESUME = "job.cannot.resume";
@ -198,7 +199,6 @@ public final class Messages
public static final String JOB_CONFIG_SCHEDULER_MULTIPLE_AGGREGATIONS = "job.config.scheduler.multiple.aggregations";
public static final String JOB_DATA_CONCURRENT_USE_CLOSE = "job.data.concurrent.use.close";
public static final String JOB_DATA_CONCURRENT_USE_DELETE = "job.data.concurrent.use.delete";
public static final String JOB_DATA_CONCURRENT_USE_FLUSH = "job.data.concurrent.use.flush";
public static final String JOB_DATA_CONCURRENT_USE_PAUSE = "job.data.concurrent.use.pause";
public static final String JOB_DATA_CONCURRENT_USE_RESUME = "job.data.concurrent.use.resume";

View File

@ -43,7 +43,8 @@ job.audit.scheduler.recovered = Scheduler has recovered data extraction and anal
system.audit.started = System started
system.audit.shutdown = System shut down
job.cannot.delete.while.scheduler.runs = Cannot delete job {0} while the scheduler is running
job.cannot.delete.while.running = Cannot delete job ''{0}'' while it is {1}
job.cannot.delete.while.scheduler.runs = Cannot delete job ''{0}'' while the scheduler is running
job.cannot.pause = Cannot pause job ''{0}'' while its status is {1}
job.cannot.resume = Cannot resume job ''{0}'' while its status is {1}
@ -149,7 +150,6 @@ job.config.scheduler.multiple.passwords = Both password and encryptedPassword we
job.config.scheduler.multiple.aggregations = Both aggregations and aggs were specified - please just specify one
job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job
job.data.concurrent.use.delete = Cannot delete job {0} while another connection {2}is {1} the job
job.data.concurrent.use.flush = Cannot flush job {0} while another connection {2}is {1} the job
job.data.concurrent.use.pause = Cannot pause job {0} while another connection {2}is {1} the job
job.data.concurrent.use.resume = Cannot resume job {0} while another connection {2}is {1} the job

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.prelert.integration;
import org.apache.http.HttpHost;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
@ -89,12 +91,21 @@ public class ScheduledJobIT extends ESRestTestCase {
}
});
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/scheduled"));
response = e.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(409));
assertThat(responseEntityToString(response), containsString("Cannot delete job 'scheduled' while the scheduler is running"));
response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/scheduled/_stop");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
waitForSchedulerToBeStopped();
response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/scheduled");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
}
private void createAirlineDataIndex() throws Exception {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.prelert.job.manager;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.ClusterName;
@ -13,9 +14,14 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
@ -84,6 +90,28 @@ public class JobManagerTests extends ESTestCase {
assertThat(prelertMetadata.getJobs().containsKey("foo"), is(false));
}
public void testRemoveJobFromClusterState_GivenJobIsRunning() {
JobManager jobManager = createJobManager();
ClusterState clusterState = createClusterState();
Job job = buildJobBuilder("foo").build();
clusterState = jobManager.innerPutJob(job, false, clusterState);
Allocation.Builder allocation = new Allocation.Builder();
allocation.setNodeId("myNode");
allocation.setJobId(job.getId());
allocation.setStatus(JobStatus.RUNNING);
PrelertMetadata.Builder newMetadata = new PrelertMetadata.Builder(clusterState.metaData().custom(PrelertMetadata.TYPE));
newMetadata.putAllocation("myNode", job.getId());
newMetadata.updateAllocation(job.getId(), allocation.build());
ClusterState jobRunningClusterState = new ClusterState.Builder(clusterState)
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, newMetadata.build())).build();
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> jobManager.removeJobFromClusterState("foo", jobRunningClusterState));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
assertThat(e.getMessage(), equalTo("Cannot delete job 'foo' while it is RUNNING"));
}
public void testRemoveJobFromClusterState_jobMissing() {
JobManager jobManager = createJobManager();
ClusterState clusterState = createClusterState();