Make close job a master node action (elastic/elasticsearch#362)
* Make close job a master node action The close job action now: - Is a master node action - Sets the job status to CLOSING - Waits for the job status to change to CLOSED before it responds JobLifeCycleService picks up on a job status change to CLOSING and closes the job. At the end, it sets the job status to CLOSED. * Assert job status is closed after close in integration test This also correctly passes an ActionListener to JobManager.setJobStatus in order to ensure the job status request has completed and to properly propagate failures. Original commit: elastic/x-pack-elasticsearch@1546c77fca
This commit is contained in:
parent
3f0b13cda9
commit
26e3ca9155
|
@ -7,28 +7,38 @@ package org.elasticsearch.xpack.prelert.action;
|
|||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.prelert.job.JobStatus;
|
||||
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
|
||||
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class PostDataCloseAction extends Action<PostDataCloseAction.Request, PostDataCloseAction.Response,
|
||||
PostDataCloseAction.RequestBuilder> {
|
||||
PostDataCloseAction.RequestBuilder> {
|
||||
|
||||
public static final PostDataCloseAction INSTANCE = new PostDataCloseAction();
|
||||
public static final String NAME = "cluster:admin/prelert/data/post/close";
|
||||
|
@ -47,7 +57,7 @@ PostDataCloseAction.RequestBuilder> {
|
|||
return new Response();
|
||||
}
|
||||
|
||||
public static class Request extends ActionRequest {
|
||||
public static class Request extends AcknowledgedRequest<Request> {
|
||||
|
||||
private String jobId;
|
||||
|
||||
|
@ -96,7 +106,7 @@ PostDataCloseAction.RequestBuilder> {
|
|||
}
|
||||
}
|
||||
|
||||
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
|
||||
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
public RequestBuilder(ElasticsearchClient client, PostDataCloseAction action) {
|
||||
super(client, action, new Request());
|
||||
|
@ -125,25 +135,104 @@ PostDataCloseAction.RequestBuilder> {
|
|||
}
|
||||
}
|
||||
|
||||
// NORELEASE This should be a master node operation that updates the job's state
|
||||
public static class TransportAction extends HandledTransportAction<Request, Response> {
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
private final AutodetectProcessManager processManager;
|
||||
private final JobManager jobManager;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, AutodetectProcessManager processManager) {
|
||||
super(settings, PostDataCloseAction.NAME, false, threadPool, transportService, actionFilters,
|
||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobManager jobManager) {
|
||||
super(settings, PostDataCloseAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, Request::new);
|
||||
|
||||
this.processManager = processManager;
|
||||
this.jobManager = jobManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void doExecute(Request request, ActionListener<Response> listener) {
|
||||
processManager.closeJob(request.getJobId());
|
||||
listener.onResponse(new Response(true));
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
UpdateJobStatusAction.Request updateStatusRequest = new UpdateJobStatusAction.Request(request.getJobId(), JobStatus.CLOSING);
|
||||
|
||||
ActionListener<UpdateJobStatusAction.Response> delegateListener = new ActionListener<UpdateJobStatusAction.Response>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(UpdateJobStatusAction.Response response) {
|
||||
respondWhenJobIsClosed(request.getJobId(), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
};
|
||||
|
||||
jobManager.setJobStatus(updateStatusRequest, delegateListener);
|
||||
}
|
||||
|
||||
private void respondWhenJobIsClosed(String jobId, ActionListener<Response> listener) {
|
||||
ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new IllegalStateException("Cluster service closed while waiting for job [" + jobId
|
||||
+ "] status to change to [" + JobStatus.CLOSED + "]"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new IllegalStateException(
|
||||
"Timeout expired while waiting for job [" + jobId + "] status to change to [" + JobStatus.CLOSED + "]"));
|
||||
}
|
||||
}, new JobClosedChangePredicate(jobId), TimeValue.timeValueMinutes(30));
|
||||
}
|
||||
|
||||
private class JobClosedChangePredicate implements ClusterStateObserver.ChangePredicate {
|
||||
|
||||
private final String jobId;
|
||||
|
||||
JobClosedChangePredicate(String jobId) {
|
||||
this.jobId = jobId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState,
|
||||
ClusterState.ClusterStateStatus newStatus) {
|
||||
return apply(newState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(ClusterChangedEvent changedEvent) {
|
||||
return apply(changedEvent.state());
|
||||
}
|
||||
|
||||
boolean apply(ClusterState newState) {
|
||||
PrelertMetadata metadata = newState.getMetaData().custom(PrelertMetadata.TYPE);
|
||||
if (metadata != null) {
|
||||
Allocation allocation = metadata.getAllocations().get(jobId);
|
||||
return allocation != null && allocation.getStatus() == JobStatus.CLOSED;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -172,7 +172,7 @@ public class UpdateJobStatusAction
|
|||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
jobManager.setJobStatus(request.getJobId(), request.getStatus());
|
||||
jobManager.setJobStatus(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.xpack.prelert.action.ResumeJobAction;
|
|||
import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction;
|
||||
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
|
||||
import org.elasticsearch.xpack.prelert.action.StopJobSchedulerAction;
|
||||
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
|
||||
import org.elasticsearch.xpack.prelert.job.DataCounts;
|
||||
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
|
@ -525,19 +526,20 @@ public class JobManager {
|
|||
return getJobAllocation(jobId).getStatus();
|
||||
}
|
||||
|
||||
public void setJobStatus(String jobId, JobStatus newStatus) {
|
||||
clusterService.submitStateUpdateTask("set-paused-status-job-" + jobId, new ClusterStateUpdateTask() {
|
||||
public void setJobStatus(UpdateJobStatusAction.Request request, ActionListener<UpdateJobStatusAction.Response> actionListener) {
|
||||
clusterService.submitStateUpdateTask("set-paused-status-job-" + request.getJobId(),
|
||||
new AckedClusterStateUpdateTask<UpdateJobStatusAction.Response>(request, actionListener) {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return innerSetJobStatus(jobId, newStatus, currentState);
|
||||
}
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return innerSetJobStatus(request.getJobId(), request.getStatus(), currentState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
LOGGER.error("Error updating job status: source=[" + source + "], new status [" + newStatus + "]", e);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
protected UpdateJobStatusAction.Response newResponse(boolean acknowledged) {
|
||||
return new UpdateJobStatusAction.Response(acknowledged);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private ClusterState innerSetJobStatus(String jobId, JobStatus newStatus, ClusterState currentState) {
|
||||
|
|
|
@ -93,6 +93,7 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
|
|||
case RUNNING:
|
||||
break;
|
||||
case CLOSING:
|
||||
executor.execute(() -> closeJob(job));
|
||||
break;
|
||||
case CLOSED:
|
||||
break;
|
||||
|
@ -143,6 +144,18 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
|
|||
localAllocatedJobs = newSet;
|
||||
}
|
||||
|
||||
private void closeJob(Job job) {
|
||||
try {
|
||||
// NORELEASE Ensure this also removes the job auto-close timeout task
|
||||
dataProcessor.closeJob(job.getId());
|
||||
} catch (ElasticsearchException e) {
|
||||
logger.error("Failed to close job [" + job.getId() + "]", e);
|
||||
updateJobStatus(job.getId(), JobStatus.FAILED);
|
||||
return;
|
||||
}
|
||||
updateJobStatus(job.getId(), JobStatus.CLOSED);
|
||||
}
|
||||
|
||||
private void pauseJob(Job job) {
|
||||
try {
|
||||
// NORELEASE Ensure this also removes the job auto-close timeout task
|
||||
|
|
|
@ -45,6 +45,12 @@ setup:
|
|||
jobId: farequote
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
xpack.prelert.get_jobs:
|
||||
job_id: farequote
|
||||
metric: status
|
||||
- match: { jobs.0.status: "CLOSED" }
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: prelertresults-farequote
|
||||
|
|
Loading…
Reference in New Issue