Integrate DBQ into job deletion process (elastic/elasticsearch#691)

A JobStorageDeletionTask is created, which supervises the physical deletion of the job.  This
task is a child of the DeleteJob action.  After the DBQ finishes, the normal flow
resumes (physical index deleted, job removed from CS)

Original commit: elastic/x-pack-elasticsearch@5d6f694408
This commit is contained in:
Zachary Tong 2017-02-06 14:34:36 -05:00 committed by GitHub
parent a0b37a2510
commit a1a5d590b6
5 changed files with 244 additions and 54 deletions

View File

@ -51,6 +51,7 @@ import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.ml.action.InternalOpenJobAction;
import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
@ -326,7 +327,8 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class),
new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class)
);
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -79,7 +80,7 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new Task(id, type, action, "delete-job-" + jobId, parentTaskId);
return new JobStorageDeletionTask(id, type, action, "delete-job-" + jobId, parentTaskId);
}
@Override
@ -110,6 +111,7 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
DeleteJobAction.Request other = (DeleteJobAction.Request) obj;
return Objects.equals(jobId, other.jobId);
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
@ -167,7 +169,7 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.deleteJob(client, request, listener);
jobManager.deleteJob(request, client, (JobStorageDeletionTask) task, taskManager, listener);
}
@Override

View File

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder;
import org.elasticsearch.action.bulk.byscroll.AsyncDeleteByQueryAction;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollParallelizationHelper;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
import org.elasticsearch.action.bulk.byscroll.ParentBulkByScrollTask;
import org.elasticsearch.action.bulk.byscroll.WorkingBulkByScrollTask;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
public class MlDeleteByQueryAction extends Action<DeleteByQueryRequest, BulkByScrollResponse,
MlDeleteByQueryAction.MlDeleteByQueryRequestBuilder> {
public static final MlDeleteByQueryAction INSTANCE = new MlDeleteByQueryAction();
public static final String NAME = "indices:data/write/delete/mlbyquery";
private MlDeleteByQueryAction() {
super(NAME);
}
@Override
public MlDeleteByQueryRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new MlDeleteByQueryRequestBuilder(client, this);
}
@Override
public BulkByScrollResponse newResponse() {
return new BulkByScrollResponse();
}
public static class MlDeleteByQueryRequestBuilder extends
AbstractBulkByScrollRequestBuilder<DeleteByQueryRequest, MlDeleteByQueryRequestBuilder> {
public MlDeleteByQueryRequestBuilder(ElasticsearchClient client,
Action<DeleteByQueryRequest, BulkByScrollResponse, MlDeleteByQueryRequestBuilder> action) {
this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE));
}
private MlDeleteByQueryRequestBuilder(ElasticsearchClient client,
Action<DeleteByQueryRequest, BulkByScrollResponse, MlDeleteByQueryRequestBuilder> action,
SearchRequestBuilder search) {
super(client, action, search, new DeleteByQueryRequest(search.request()));
}
@Override
protected MlDeleteByQueryRequestBuilder self() {
return this;
}
@Override
public MlDeleteByQueryRequestBuilder abortOnVersionConflict(boolean abortOnVersionConflict) {
request.setAbortOnVersionConflict(abortOnVersionConflict);
return this;
}
}
public static class TransportAction extends HandledTransportAction<DeleteByQueryRequest, BulkByScrollResponse> {
private final Client client;
private final ScriptService scriptService;
private final ClusterService clusterService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver resolver, Client client, TransportService transportService,
ScriptService scriptService, ClusterService clusterService) {
super(settings, MlDeleteByQueryAction.NAME, threadPool, transportService, actionFilters, resolver, DeleteByQueryRequest::new);
this.client = client;
this.scriptService = scriptService;
this.clusterService = clusterService;
}
@Override
public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
if (request.getSlices() > 1) {
BulkByScrollParallelizationHelper.startSlices(client, taskManager, MlDeleteByQueryAction.INSTANCE,
clusterService.localNode().getId(), (ParentBulkByScrollTask) task, request, listener);
} else {
ClusterState state = clusterService.state();
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
new AsyncDeleteByQueryAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
listener).start();
}
}
@Override
protected void doExecute(DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
@ -18,7 +17,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
@ -34,6 +32,7 @@ import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -229,14 +228,17 @@ public class JobManager extends AbstractComponent {
return buildNewClusterState(currentState, builder);
}
public void deleteJob(Client client, DeleteJobAction.Request request, ActionListener<DeleteJobAction.Response> actionListener) {
public void deleteJob(DeleteJobAction.Request request, Client client, JobStorageDeletionTask task,
ActionListener<DeleteJobAction.Response> actionListener) {
String jobId = request.getJobId();
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
logger.debug("Deleting job '" + jobId + "'");
// Step 3. Listen for the Cluster State job state change
// Chain acknowledged state onto original actionListener
CheckedConsumer<Boolean, Exception> deleteStateConsumer = jobDeleted -> {
// Step 3. When the job has been removed from the cluster state, return a response
// -------
CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {
if (jobDeleted) {
logger.info("Job [" + jobId + "] deleted.");
actionListener.onResponse(new DeleteJobAction.Response(true));
@ -246,64 +248,41 @@ public class JobManager extends AbstractComponent {
}
};
// Step 2. When the physical storage has been deleted, remove from Cluster State
// -------
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> clusterService.submitStateUpdateTask("delete-job-" + jobId,
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(apiResponseHandler, actionListener::onFailure)) {
// Step 2. Listen for the Deleted Index response
// If successful, delete from cluster state and chain onto deleteStateListener
CheckedConsumer<Boolean, Exception> deleteIndexConsumer = response -> {
clusterService.submitStateUpdateTask("delete-job-" + jobId,
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(deleteStateConsumer, actionListener::onFailure)) {
@Override
protected Boolean newResponse(boolean acknowledged) {
return acknowledged && response;
}
@Override
protected Boolean newResponse(boolean acknowledged) {
return acknowledged && response;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return removeJobFromState(jobId, currentState);
}
});
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return removeJobFromState(jobId, currentState);
}
});
};
// Step 1. Update the CS to DELETING
// If successful, attempt to delete the physical index and chain
// onto deleteIndexConsumer
CheckedConsumer<UpdateJobStateAction.Response, Exception> updateConsumer = response -> {
// Sucessfully updated the state to DELETING, begin actually deleting
// Step 1. When the job's status updates to DELETING, begin deleting the physical storage
// -------
CheckedConsumer<UpdateJobStateAction.Response, Exception> updateHandler = response -> {
// Successfully updated the status to DELETING, begin actually deleting
if (response.isAcknowledged()) {
logger.info("Job [" + jobId + "] set to [" + JobState.DELETING + "]");
} else {
logger.warn("Job [" + jobId + "] change to [" + JobState.DELETING + "] was not acknowledged.");
}
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> {
logger.info("Deleting index [" + indexName + "] successful");
// This task manages the physical deletion of the job (removing the results, then the index)
task.delete(jobId, indexName, client, deleteJobStateHandler::accept, actionListener::onFailure);
if (deleteIndexResponse.isAcknowledged()) {
logger.info("Index deletion acknowledged");
} else {
logger.warn("Index deletion not acknowledged");
}
deleteIndexConsumer.accept(deleteIndexResponse.isAcknowledged());
}, e -> {
if (e instanceof IndexNotFoundException) {
logger.warn("Physical index [" + indexName + "] not found. Continuing to delete job.");
try {
deleteIndexConsumer.accept(false);
} catch (Exception e1) {
actionListener.onFailure(e1);
}
} else {
// all other exceptions should die
actionListener.onFailure(e);
}
}));
};
// Step 0. Kick off the chain of callbacks with the initial UpdateStatus call
// -------
UpdateJobStateAction.Request updateStateListener = new UpdateJobStateAction.Request(jobId, JobState.DELETING);
setJobState(updateStateListener, ActionListener.wrap(updateConsumer, actionListener::onFailure));
setJobState(updateStateListener, ActionListener.wrap(updateHandler, actionListener::onFailure));
}

View File

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction;
import java.util.function.Consumer;
public class JobStorageDeletionTask extends Task {
private final Logger logger;
public JobStorageDeletionTask(long id, String type, String action, String description, TaskId parentTask) {
super(id, type, action, description, parentTask);
this.logger = Loggers.getLogger(getClass());
}
public void delete(String jobId, String indexName, Client client,
CheckedConsumer<Boolean, Exception> finishedHandler,
Consumer<Exception> failureHandler) {
// Step 2. Regardless of if the DBQ succeeds, we delete the physical index
// -------
CheckedConsumer<BulkByScrollResponse, Exception> dbqHandler = bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("DeleteByQuery for index [" + indexName + "] timed out. Continuing to delete index.");
}
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
logger.warn("[" + bulkByScrollResponse.getBulkFailures().size()
+ "] failures encountered while running DeleteByQuery on index [" + indexName + "]. "
+ "Continuing to delete index");
}
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> {
logger.info("Deleting index [" + indexName + "] successful");
if (deleteIndexResponse.isAcknowledged()) {
logger.info("Index deletion acknowledged");
} else {
logger.warn("Index deletion not acknowledged");
}
finishedHandler.accept(deleteIndexResponse.isAcknowledged());
}, missingIndexHandler(indexName, finishedHandler, failureHandler)));
};
// Step 1. DeleteByQuery on the index, matching all docs with the right job_id
// -------
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder("job_id", jobId)));
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
request.setSlices(5);
client.execute(MlDeleteByQueryAction.INSTANCE, request,
ActionListener.wrap(dbqHandler, missingIndexHandler(indexName, finishedHandler, failureHandler)));
}
// If the index doesn't exist, we need to catch the exception and carry onwards so that the cluster
// state is properly updated
private Consumer<Exception> missingIndexHandler(String indexName, CheckedConsumer<Boolean, Exception> finishedHandler,
Consumer<Exception> failureHandler) {
return e -> {
if (e instanceof IndexNotFoundException) {
logger.warn("Physical index [" + indexName + "] not found. Continuing to delete job.");
try {
finishedHandler.accept(false);
} catch (Exception e1) {
failureHandler.accept(e1);
}
} else {
// all other exceptions should die
failureHandler.accept(e);
}
};
}
}