DELETING job status cluster state changes (elastic/elasticsearch#612)

Deleting a job now starts a three-step process:

1. Job status updated to DELETING
2. Physical index is deleted
3. Job removed from cluster state

When jobs are in DELETING, they cannot be modified/updated/changed at all.  Only jobs that are DELETING can actually be removed from the CS.

Original commit: elastic/x-pack-elasticsearch@2cd99a240c
This commit is contained in:
Zachary Tong 2017-01-04 16:43:24 -05:00 committed by GitHub
parent 090c7e792a
commit 23aef2679d
11 changed files with 171 additions and 56 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
@ -211,6 +212,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client, searchRequestParsers),
System::currentTimeMillis);
TaskManager taskManager = new TaskManager(settings);
JobLifeCycleService jobLifeCycleService =
new JobLifeCycleService(settings, client, clusterService, dataProcessor, threadPool.generic());
@ -235,7 +237,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
dataProcessor,
new PrelertInitializationService(settings, threadPool, clusterService, jobProvider),
jobDataCountsPersister,
scheduledJobRunner
scheduledJobRunner,
taskManager
);
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -23,6 +24,9 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
@ -74,6 +78,11 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
return null;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new Task(id, type, action, "delete-job-" + jobId, parentTaskId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -135,14 +144,18 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final JobManager jobManager;
private final Client client;
private final TaskManager taskManager;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager) {
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager,
Client client, TaskManager taskManager) {
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
this.client = client;
this.taskManager = taskManager;
}
@Override
@ -155,14 +168,27 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
return new Response();
}
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.deleteJob(client, request, ActionListener.wrap(response -> {
taskManager.unregister(task);
listener.onResponse(response);
}, e -> {
taskManager.unregister(task);
listener.onFailure(e);
}));
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.deleteJob(request, listener);
throw new UnsupportedOperationException("the Task parameter is required");
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -20,7 +20,7 @@ import java.util.Locale;
*/
public enum JobStatus implements Writeable {
CLOSING, CLOSED, OPENING, OPENED, FAILED;
CLOSING, CLOSED, OPENING, OPENED, FAILED, DELETING;
public static JobStatus fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));

View File

@ -9,10 +9,14 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
@ -196,7 +200,8 @@ public class JobManager extends AbstractComponent {
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState cs = updateClusterState(job, request.isOverwrite(), currentState);
if (currentState.metaData().index(AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName())) != null) {
throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS, job.getIndexName()));
throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS,
AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName())));
}
return cs;
}
@ -209,51 +214,79 @@ public class JobManager extends AbstractComponent {
return buildNewClusterState(currentState, builder);
}
/**
* Deletes a job.
*
* The clean-up involves:
* <ul>
* <li>Deleting the index containing job results</li>
* <li>Deleting the job logs</li>
* <li>Removing the job from the cluster state</li>
* </ul>
*
* @param request
* the delete job request
* @param actionListener
* the action listener
*/
public void deleteJob(DeleteJobAction.Request request, ActionListener<DeleteJobAction.Response> actionListener) {
public void deleteJob(Client client, DeleteJobAction.Request request, ActionListener<DeleteJobAction.Response> actionListener) {
String jobId = request.getJobId();
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.debug("Deleting job '" + jobId + "'");
ActionListener<Boolean> delegateListener = ActionListener.wrap(jobDeleted -> {
// Step 3. Listen for the Cluster State status change
// Chain acknowledged status onto original actionListener
CheckedConsumer<Boolean, Exception> deleteStatusConsumer = jobDeleted -> {
if (jobDeleted) {
jobProvider.deleteJobRelatedIndices(request.getJobId(), actionListener);
audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_DELETED));
logger.info("Job [" + jobId + "] deleted.");
actionListener.onResponse(new DeleteJobAction.Response(true));
//nocommit: needs #626, because otherwise the audit message re-creates the index
// we just deleted. :)
//audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_DELETED));
} else {
actionListener.onResponse(new DeleteJobAction.Response(false));
}
}, actionListener::onFailure);
clusterService.submitStateUpdateTask("delete-job-" + jobId,
new AckedClusterStateUpdateTask<Boolean>(request, delegateListener) {
};
@Override
protected Boolean newResponse(boolean acknowledged) {
return acknowledged;
// Step 2. Listen for the Deleted Index response
// If successful, delete from cluster state and chain onto deleteStatusListener
CheckedConsumer<DeleteIndexResponse, Exception> deleteIndexConsumer = response -> {
logger.info("Deleting index [" + indexName + "] successful");
if (response.isAcknowledged()) {
logger.info("Index deletion acknowledged");
} else {
logger.warn("Index deletion not acknowledged");
}
clusterService.submitStateUpdateTask("delete-job-" + jobId,
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(deleteStatusConsumer, actionListener::onFailure)) {
@Override
protected Boolean newResponse(boolean acknowledged) {
return acknowledged && response.isAcknowledged();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return removeJobFromState(jobId, currentState);
}
});
};
// Step 1. Update the CS to DELETING
// If successful, attempt to delete the physical index and chain
// onto deleteIndexConsumer
CheckedConsumer<UpdateJobStatusAction.Response, Exception> updateConsumer = response -> {
// Sucessfully updated the status to DELETING, begin actually deleting
if (response.isAcknowledged()) {
logger.info("Job [" + jobId + "] set to [" + JobStatus.DELETING + "]");
} else {
logger.warn("Job [" + jobId + "] change to [" + JobStatus.DELETING + "] was not acknowledged.");
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return removeJobFromClusterState(jobId, currentState);
}
});
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexConsumer, actionListener::onFailure));
};
UpdateJobStatusAction.Request updateStatusListener = new UpdateJobStatusAction.Request(jobId, JobStatus.DELETING);
setJobStatus(updateStatusListener, ActionListener.wrap(updateConsumer, actionListener::onFailure));
}
ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) {
ClusterState removeJobFromState(String jobId, ClusterState currentState) {
PrelertMetadata.Builder builder = createPrelertMetadataBuilder(currentState);
builder.removeJob(jobId);
builder.deleteJob(jobId);
return buildNewClusterState(currentState, builder);
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.prelert.job.metadata;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.Diff;
@ -19,6 +20,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
@ -234,8 +236,10 @@ public class PrelertMetadata implements MetaData.Custom {
return this;
}
public Builder removeJob(String jobId) {
if (jobs.remove(jobId) == null) {
public Builder deleteJob(String jobId) {
Job job = jobs.remove(jobId);
if (job == null) {
throw new ResourceNotFoundException("job [" + jobId + "] does not exist");
}
@ -247,10 +251,12 @@ public class PrelertMetadata implements MetaData.Custom {
Allocation previousAllocation = this.allocations.remove(jobId);
if (previousAllocation != null) {
if (!previousAllocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(
Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, previousAllocation.getStatus()));
if (!previousAllocation.getStatus().equals(JobStatus.DELETING)) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because it is in ["
+ previousAllocation.getStatus() + "] state. Must be in [" + JobStatus.DELETING + "] state.");
}
} else {
throw new ResourceNotFoundException("No Cluster State found for job [" + jobId + "]");
}
return this;
@ -347,6 +353,22 @@ public class PrelertMetadata implements MetaData.Custom {
if (previous == null) {
throw new IllegalStateException("[" + jobId + "] no allocation exist to update the status to [" + jobStatus + "]");
}
// Cannot update the status to DELETING if there are schedulers attached
if (jobStatus.equals(JobStatus.DELETING)) {
Optional<Scheduler> scheduler = getSchedulerByJobId(jobId);
if (scheduler.isPresent()) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while scheduler ["
+ scheduler.get().getId() + "] refers to it");
}
}
// Once a job goes into Deleting, it cannot be changed
if (previous.getStatus().equals(JobStatus.DELETING)) {
throw new ElasticsearchStatusException("Cannot change status of job [" + jobId + "] to [" + jobStatus + "] because " +
"it is currently in [" + JobStatus.DELETING + "] status.", RestStatus.CONFLICT);
}
Allocation.Builder builder = new Allocation.Builder(previous);
builder.setStatus(jobStatus);
if (reason != null) {
@ -404,4 +426,4 @@ public class PrelertMetadata implements MetaData.Custom {
}
}
}
}

View File

@ -107,7 +107,8 @@ public class PrelertJobIT extends ESRestTestCase {
"\"time_field\":\"time\",\n"
+ " \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n" + " }\n" + "}";
return client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors", Collections.emptyMap(), new StringEntity(job));
return client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors" ,
Collections.emptyMap(), new StringEntity(job));
}
public void testGetBucketResults() throws Exception {

View File

@ -115,7 +115,8 @@ public class ScheduledJobIT extends ESRestTestCase {
+ " \"time_field\":\"time\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n"
+ "}";
return client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors", Collections.emptyMap(), new StringEntity(job));
return client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors",
Collections.emptyMap(), new StringEntity(job));
}
private Response createScheduler(String schedulerId, String jobId) throws IOException {
@ -137,7 +138,7 @@ public class ScheduledJobIT extends ESRestTestCase {
}
@After
public void clearPrelertState() throws IOException {
public void clearPrelertState() throws Exception {
new PrelertRestTestStateCleaner(client(), this).clearPrelertMetadata();
}
}

View File

@ -185,7 +185,7 @@ public class JobManagerTests extends ESTestCase {
}
}));
assertEquals("Cannot create index 'my-special-place' as it already exists", e.getMessage());
assertEquals("Cannot create index '.ml-anomalies-my-special-place' as it already exists", e.getMessage());
}
private JobManager createJobManager() {

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.junit.Before;
import java.util.concurrent.ExecutorService;
@ -99,7 +100,8 @@ public class JobAllocatorTests extends ESTestCase {
expectThrows(IllegalStateException.class, () -> jobAllocator.assignJobsToNodes(cs3));
pmBuilder = new PrelertMetadata.Builder(result1.getMetaData().custom(PrelertMetadata.TYPE));
pmBuilder.removeJob("my_job_id");
pmBuilder.updateStatus("my_job_id", JobStatus.DELETING, null);
pmBuilder.deleteJob("my_job_id");
ClusterState cs4 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.prelert.job.metadata;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -131,12 +132,12 @@ public class JobLifeCycleServiceTests extends ESTestCase {
verify(dataProcessor, times(1)).closeJob("my_job_id");
}
public void testClusterChanged_allocationRemovedStopJob() {
public void testClusterChanged_allocationDeletingJob() {
jobLifeCycleService.localAssignedJobs.add("my_job_id");
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("my_job_id").build(), false);
pmBuilder.removeJob("my_job_id");
pmBuilder.updateStatus("my_job_id", JobStatus.DELETING, null);
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
@ -144,10 +145,31 @@ public class JobLifeCycleServiceTests extends ESTestCase {
.localNodeId("_node_id"))
.build();
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
assertEquals(jobLifeCycleService.localAssignedJobs.size(), 1);
pmBuilder.deleteJob("my_job_id");
ClusterState cs2 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.localNodeId("_node_id"))
.build();
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs2, cs1));
assertEquals(jobLifeCycleService.localAssignedJobs.size(), 0);
verify(dataProcessor, times(1)).closeJob("my_job_id");
}
public void testClusterChanged_allocationDeletingClosedJob() {
jobLifeCycleService.localAssignedJobs.add("my_job_id");
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("my_job_id").build(), false);
expectThrows(ElasticsearchStatusException.class, () -> pmBuilder.deleteJob("my_job_id"));
}
public void testStart_openJobFails() {
doThrow(new RuntimeException("error")).when(dataProcessor).openJob("my_job_id", false);
Allocation.Builder allocation = new Allocation.Builder();

View File

@ -131,7 +131,12 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
assertThat(result.getSchedulers().get("1"), nullValue());
builder = new PrelertMetadata.Builder(result);
builder.removeJob("1");
builder.updateStatus("1", JobStatus.DELETING, null);
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getAllocations().get("1").getStatus(), equalTo(JobStatus.CLOSED));
assertThat(result.getSchedulers().get("1"), nullValue());
builder.deleteJob("1");
result = builder.build();
assertThat(result.getJobs().get("1"), nullValue());
assertThat(result.getAllocations().get("1"), nullValue());
@ -151,7 +156,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
assertThat(result.getSchedulers().get("1"), nullValue());
PrelertMetadata.Builder builder2 = new PrelertMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder2.removeJob("1"));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder2.deleteJob("1"));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
}
@ -162,7 +167,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
builder.putJob(job1, false);
builder.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.removeJob(job1.getId()));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.deleteJob(job1.getId()));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
String expectedMsg = "Cannot delete job [" + job1.getId() + "] while scheduler [" + schedulerConfig1.getId() + "] refers to it";
assertThat(e.getMessage(), equalTo(expectedMsg));
@ -170,7 +175,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
public void testRemoveJob_failBecauseJobDoesNotExist() {
PrelertMetadata.Builder builder1 = new PrelertMetadata.Builder();
expectThrows(ResourceNotFoundException.class, () -> builder1.removeJob("1"));
expectThrows(ResourceNotFoundException.class, () -> builder1.deleteJob("1"));
}
public void testCrudScheduler() {