Added open and close job APIs.

* A job now has the following statuses: OPENING, OPENED, CLOSING, CLOSED and FAILED.
* The open job and close job APIs wait until the job gets into a OPENED or CLOSED state.
* The post data api no longer lazily opens a job and fails if the job has not been opened.
* When a job gets into a failed state also the reason is recorded in the allocation.
* Removed pause and resume APIs.
* Made `max_running_jobs` setting dynamically updatedable.

Original commit: elastic/x-pack-elasticsearch@3485ec5317
This commit is contained in:
Martijn van Groningen 2016-12-05 08:29:57 +01:00
parent f960eea4b1
commit 570cde7a6a
44 changed files with 993 additions and 977 deletions

View File

@ -38,14 +38,13 @@ import org.elasticsearch.xpack.prelert.action.GetJobsAction;
import org.elasticsearch.xpack.prelert.action.GetListAction;
import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.prelert.action.GetRecordsAction;
import org.elasticsearch.xpack.prelert.action.PauseJobAction;
import org.elasticsearch.xpack.prelert.action.PostDataAction;
import org.elasticsearch.xpack.prelert.action.PostDataCloseAction;
import org.elasticsearch.xpack.prelert.action.CloseJobAction;
import org.elasticsearch.xpack.prelert.action.PostDataFlushAction;
import org.elasticsearch.xpack.prelert.action.PutJobAction;
import org.elasticsearch.xpack.prelert.action.PutListAction;
import org.elasticsearch.xpack.prelert.action.PutModelSnapshotDescriptionAction;
import org.elasticsearch.xpack.prelert.action.ResumeJobAction;
import org.elasticsearch.xpack.prelert.action.OpenJobAction;
import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.StopJobSchedulerAction;
@ -76,14 +75,13 @@ import org.elasticsearch.xpack.prelert.job.scheduler.http.HttpDataExtractorFacto
import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
import org.elasticsearch.xpack.prelert.rest.data.RestPostDataAction;
import org.elasticsearch.xpack.prelert.rest.data.RestPostDataCloseAction;
import org.elasticsearch.xpack.prelert.rest.job.RestCloseJobAction;
import org.elasticsearch.xpack.prelert.rest.data.RestPostDataFlushAction;
import org.elasticsearch.xpack.prelert.rest.results.RestGetInfluencersAction;
import org.elasticsearch.xpack.prelert.rest.job.RestDeleteJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsAction;
import org.elasticsearch.xpack.prelert.rest.job.RestPauseJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestPutJobsAction;
import org.elasticsearch.xpack.prelert.rest.job.RestResumeJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestOpenJobAction;
import org.elasticsearch.xpack.prelert.rest.list.RestGetListAction;
import org.elasticsearch.xpack.prelert.rest.list.RestPutListAction;
import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestDeleteModelSnapshotAction;
@ -142,7 +140,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING,
StatusReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
StatusReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
UsageReporter.UPDATE_INTERVAL_SETTING));
UsageReporter.UPDATE_INTERVAL_SETTING,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE));
}
@Override
@ -175,7 +174,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
}
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory);
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings());
ScheduledJobService scheduledJobService = new ScheduledJobService(threadPool, client, jobProvider, dataProcessor,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client),
@ -198,15 +197,14 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
RestGetJobsAction.class,
RestPutJobsAction.class,
RestDeleteJobAction.class,
RestPauseJobAction.class,
RestResumeJobAction.class,
RestOpenJobAction.class,
RestGetListAction.class,
RestPutListAction.class,
RestGetInfluencersAction.class,
RestGetRecordsAction.class,
RestGetBucketsAction.class,
RestPostDataAction.class,
RestPostDataCloseAction.class,
RestCloseJobAction.class,
RestPostDataFlushAction.class,
RestValidateDetectorAction.class,
RestValidateTransformAction.class,
@ -227,8 +225,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(GetJobsAction.INSTANCE, GetJobsAction.TransportAction.class),
new ActionHandler<>(PutJobAction.INSTANCE, PutJobAction.TransportAction.class),
new ActionHandler<>(DeleteJobAction.INSTANCE, DeleteJobAction.TransportAction.class),
new ActionHandler<>(PauseJobAction.INSTANCE, PauseJobAction.TransportAction.class),
new ActionHandler<>(ResumeJobAction.INSTANCE, ResumeJobAction.TransportAction.class),
new ActionHandler<>(OpenJobAction.INSTANCE, OpenJobAction.TransportAction.class),
new ActionHandler<>(UpdateJobStatusAction.INSTANCE, UpdateJobStatusAction.TransportAction.class),
new ActionHandler<>(UpdateJobSchedulerStatusAction.INSTANCE, UpdateJobSchedulerStatusAction.TransportAction.class),
new ActionHandler<>(GetListAction.INSTANCE, GetListAction.TransportAction.class),
@ -237,7 +234,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(GetInfluencersAction.INSTANCE, GetInfluencersAction.TransportAction.class),
new ActionHandler<>(GetRecordsAction.INSTANCE, GetRecordsAction.TransportAction.class),
new ActionHandler<>(PostDataAction.INSTANCE, PostDataAction.TransportAction.class),
new ActionHandler<>(PostDataCloseAction.INSTANCE, PostDataCloseAction.TransportAction.class),
new ActionHandler<>(CloseJobAction.INSTANCE, CloseJobAction.TransportAction.class),
new ActionHandler<>(PostDataFlushAction.INSTANCE, PostDataFlushAction.TransportAction.class),
new ActionHandler<>(ValidateDetectorAction.INSTANCE, ValidateDetectorAction.TransportAction.class),
new ActionHandler<>(ValidateTransformAction.INSTANCE, ValidateTransformAction.TransportAction.class),

View File

@ -38,13 +38,12 @@ import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
public class PostDataCloseAction extends Action<PostDataCloseAction.Request, PostDataCloseAction.Response,
PostDataCloseAction.RequestBuilder> {
public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobAction.Response, CloseJobAction.RequestBuilder> {
public static final PostDataCloseAction INSTANCE = new PostDataCloseAction();
public static final String NAME = "cluster:admin/prelert/data/post/close";
public static final CloseJobAction INSTANCE = new CloseJobAction();
public static final String NAME = "cluster:admin/prelert/job/close";
private PostDataCloseAction() {
private CloseJobAction() {
super(NAME);
}
@ -61,6 +60,7 @@ public class PostDataCloseAction extends Action<PostDataCloseAction.Request, Pos
public static class Request extends AcknowledgedRequest<Request> {
private String jobId;
private TimeValue closeTimeout = TimeValue.timeValueMinutes(30);
Request() {}
@ -72,6 +72,14 @@ public class PostDataCloseAction extends Action<PostDataCloseAction.Request, Pos
return jobId;
}
public TimeValue getCloseTimeout() {
return closeTimeout;
}
public void setCloseTimeout(TimeValue closeTimeout) {
this.closeTimeout = closeTimeout;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -81,17 +89,19 @@ public class PostDataCloseAction extends Action<PostDataCloseAction.Request, Pos
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
closeTimeout = new TimeValue(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
closeTimeout.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(jobId);
return Objects.hash(jobId, closeTimeout);
}
@Override
@ -103,13 +113,14 @@ public class PostDataCloseAction extends Action<PostDataCloseAction.Request, Pos
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId);
return Objects.equals(jobId, other.jobId) &&
Objects.equals(closeTimeout, other.closeTimeout);
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, PostDataCloseAction action) {
public RequestBuilder(ElasticsearchClient client, CloseJobAction action) {
super(client, action, new Request());
}
}
@ -144,7 +155,7 @@ public class PostDataCloseAction extends Action<PostDataCloseAction.Request, Pos
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager) {
super(settings, PostDataCloseAction.NAME, transportService, clusterService, threadPool, actionFilters,
super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
}

View File

@ -0,0 +1,279 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.Response, OpenJobAction.RequestBuilder> {
public static final OpenJobAction INSTANCE = new OpenJobAction();
public static final String NAME = "cluster:admin/prelert/job/open";
private OpenJobAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> {
private String jobId;
private boolean ignoreDowntime;
private TimeValue openTimeout = TimeValue.timeValueMinutes(30);
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
Request() {}
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public boolean isIgnoreDowntime() {
return ignoreDowntime;
}
public void setIgnoreDowntime(boolean ignoreDowntime) {
this.ignoreDowntime = ignoreDowntime;
}
public TimeValue getOpenTimeout() {
return openTimeout;
}
public void setOpenTimeout(TimeValue openTimeout) {
this.openTimeout = openTimeout;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
ignoreDowntime = in.readBoolean();
openTimeout = new TimeValue(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeBoolean(ignoreDowntime);
openTimeout.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(jobId, ignoreDowntime, openTimeout);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
OpenJobAction.Request other = (OpenJobAction.Request) obj;
return Objects.equals(jobId, other.jobId) &&
Objects.equals(ignoreDowntime, other.ignoreDowntime) &&
Objects.equals(openTimeout, other.openTimeout);
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, OpenJobAction action) {
super(client, action, new Request());
}
}
public static class Response extends AcknowledgedResponse {
public Response(boolean acknowledged) {
super(acknowledged);
}
private Response() {}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final JobManager jobManager;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager) {
super(settings, OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
ActionListener<Response> delegateListener = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
respondWhenJobIsOpened(request, listener);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
jobManager.openJob(request, delegateListener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
private void respondWhenJobIsOpened(Request request, ActionListener<Response> listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
String jobId = request.getJobId();
PrelertMetadata metadata = state.getMetaData().custom(PrelertMetadata.TYPE);
if (metadata != null) {
Allocation allocation = metadata.getAllocations().get(jobId);
if (allocation != null) {
if (allocation.getStatus() == JobStatus.OPENED) {
listener.onResponse(new Response(true));
} else {
String message = "[" + jobId + "] expected job status [" + JobStatus.OPENED + "], but got [" +
allocation.getStatus() + "], reason [" + allocation.getStatusReason() + "]";
listener.onFailure(new ElasticsearchStatusException(message, RestStatus.CONFLICT));
}
}
}
listener.onFailure(new IllegalStateException("no allocation for job [" + jobId + "]"));
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new IllegalStateException("Cluster service closed while waiting for job [" + request
+ "] status to change to [" + JobStatus.OPENED + "]"));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException(
"Timeout expired while waiting for job [" + request + "] status to change to [" + JobStatus.OPENED + "]"));
}
}, new JobOpenedChangePredicate(request.getJobId()), request.openTimeout);
}
private class JobOpenedChangePredicate implements ClusterStateObserver.ChangePredicate {
private final String jobId;
JobOpenedChangePredicate(String jobId) {
this.jobId = jobId;
}
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState,
ClusterState.ClusterStateStatus newStatus) {
return apply(newState);
}
@Override
public boolean apply(ClusterChangedEvent changedEvent) {
return apply(changedEvent.state());
}
boolean apply(ClusterState newState) {
PrelertMetadata metadata = newState.getMetaData().custom(PrelertMetadata.TYPE);
if (metadata != null) {
Allocation allocation = metadata.getAllocations().get(jobId);
if (allocation != null) {
return allocation.getStatus().isAnyOf(JobStatus.OPENED, JobStatus.FAILED);
}
}
return false;
}
}
}
}

View File

@ -1,169 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
public class PauseJobAction extends Action<PauseJobAction.Request, PauseJobAction.Response, PauseJobAction.RequestBuilder> {
public static final PauseJobAction INSTANCE = new PauseJobAction();
public static final String NAME = "cluster:admin/prelert/job/pause";
private PauseJobAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> {
private String jobId;
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
Request() {}
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
}
@Override
public int hashCode() {
return Objects.hash(jobId);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId);
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, PauseJobAction action) {
super(client, action, new Request());
}
}
public static class Response extends AcknowledgedResponse {
public Response(boolean acknowledged) {
super(acknowledged);
}
private Response() {}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final JobManager jobManager;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager) {
super(settings, PauseJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.info("Pausing job " + request.getJobId());
jobManager.pauseJob(request, listener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -1,169 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
public class ResumeJobAction extends Action<ResumeJobAction.Request, ResumeJobAction.Response, ResumeJobAction.RequestBuilder> {
public static final ResumeJobAction INSTANCE = new ResumeJobAction();
public static final String NAME = "cluster:admin/prelert/job/resume";
private ResumeJobAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> {
private String jobId;
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
Request() {}
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
}
@Override
public int hashCode() {
return Objects.hash(jobId);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
ResumeJobAction.Request other = (ResumeJobAction.Request) obj;
return Objects.equals(jobId, other.jobId);
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, ResumeJobAction action) {
super(client, action, new Request());
}
}
public static class Response extends AcknowledgedResponse {
public Response(boolean acknowledged) {
super(acknowledged);
}
private Response() {}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final JobManager jobManager;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager) {
super(settings, ResumeJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.info("Resuming job " + request.getJobId());
jobManager.resumeJob(request, listener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -58,6 +58,7 @@ public class UpdateJobStatusAction
private String jobId;
private JobStatus status;
private String reason;
public Request(String jobId, JobStatus status) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@ -82,6 +83,14 @@ public class UpdateJobStatusAction
this.status = status;
}
public String getReason() {
return reason;
}
public void setReason(String reason) {
this.reason = reason;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -92,6 +101,7 @@ public class UpdateJobStatusAction
super.readFrom(in);
jobId = in.readString();
status = JobStatus.fromStream(in);
reason = in.readOptionalString();
}
@Override
@ -99,6 +109,7 @@ public class UpdateJobStatusAction
super.writeTo(out);
out.writeString(jobId);
status.writeTo(out);
out.writeOptionalString(reason);
}
@Override

View File

@ -39,7 +39,7 @@ import java.util.regex.Pattern;
/**
* This class represents a configured and created Job. The creation time is set
* to the time the object was constructed, Status is set to
* {@link JobStatus#RUNNING} and the finished time and last data time fields are
* {@link JobStatus#OPENING} and the finished time and last data time fields are
* {@code null} until the job has seen some data or it is finished respectively.
* If the job was created to read data from a list of files FileUrls will be a
* non-empty list else the expects data to be streamed to it.

View File

@ -20,7 +20,7 @@ import java.util.Locale;
*/
public enum JobStatus implements Writeable {
RUNNING, CLOSING, CLOSED, FAILED, PAUSING, PAUSED;
CLOSING, CLOSED, OPENING, OPENED, FAILED;
public static JobStatus fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));

View File

@ -29,7 +29,7 @@ public class SchedulerState extends ToXContentToBytes implements Writeable {
public static final ParseField END_TIME_MILLIS = new ParseField("end");
public static final ConstructingObjectParser<SchedulerState, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
TYPE_FIELD.getPreferredName(), a -> new SchedulerState((JobSchedulerStatus) a[0], (long) a[1], (Long) a[2]));
TYPE_FIELD.getPreferredName(), a -> new SchedulerState((JobSchedulerStatus) a[0], (Long) a[1], (Long) a[2]));
static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> JobSchedulerStatus.fromString(p.text()), STATUS,

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.job.data;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
@ -47,11 +46,12 @@ public interface DataProcessor {
*/
void flushJob(String jobId, InterimResultsParams interimResultsParams);
void openJob(String jobId, boolean ignoreDowntime);
/**
* Stop the running job and mark it as finished.<br>
* @param jobId The job to stop
*
* @param jobId The job to stop
* @param nextStatus The final status to set when analytical process has stopped
*/
void closeJob(String jobId, JobStatus nextStatus);
void closeJob(String jobId);
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
@ -53,12 +54,11 @@ import java.util.function.Supplier;
public class AutodetectProcessManager extends AbstractComponent implements DataProcessor {
// TODO (norelease) to be reconsidered
// TODO (norelease) default needs to be reconsidered
public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE =
Setting.intSetting("max_running_jobs", 10, Setting.Property.NodeScope);
Setting.intSetting("max_running_jobs", 10, 1, 128, Setting.Property.NodeScope, Setting.Property.Dynamic);
private final Client client;
private final int maxRunningJobs;
private final ThreadPool threadPool;
private final JobManager jobManager;
private final JobProvider jobProvider;
@ -72,14 +72,16 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob;
private volatile int maxAllowedRunningJobs;
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager,
JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory) {
JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory, ClusterSettings clusterSettings) {
super(settings);
this.client = client;
this.threadPool = threadPool;
this.maxRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.parser = parser;
this.autodetectProcessFactory = autodetectProcessFactory;
this.jobManager = jobManager;
@ -91,20 +93,21 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
this.jobDataCountsPersister = jobDataCountsPersister;
this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>();
clusterSettings.addSettingsUpdateConsumer(MAX_RUNNING_JOBS_PER_NODE, val -> maxAllowedRunningJobs = val);
}
@Override
public DataCounts processData(String jobId, InputStream input, DataLoadParams params, Supplier<Boolean> cancelled) {
Allocation allocation = jobManager.getJobAllocation(jobId);
if (allocation.getStatus().isAnyOf(JobStatus.PAUSING, JobStatus.PAUSED)) {
return new DataCounts(jobId);
if (allocation.getStatus() != JobStatus.OPENED) {
throw new IllegalArgumentException("job [" + jobId + "] status is [" + allocation.getStatus() + "], but must be ["
+ JobStatus.OPENED + "] for processing data");
}
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> {
AutodetectCommunicator c = create(id, params.isIgnoreDowntime());
setJobStatus(jobId, JobStatus.RUNNING);
return c;
});
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
throw new IllegalStateException("job [" + jobId + "] with status [" + allocation.getStatus() + "] hasn't been started");
}
try {
return communicator.writeToJob(input, params, cancelled);
// TODO check for errors from autodetect
@ -120,10 +123,47 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
}
}
@Override
public void flushJob(String jobId, InterimResultsParams params) {
logger.debug("Flushing job {}", jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot flush: no active autodetect process for job {}", jobId);
return;
}
try {
communicator.flushJob(params);
// TODO check for errors from autodetect
} catch (IOException ioe) {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId);
logger.warn(msg);
throw ExceptionsHelper.serverError(msg, ioe);
}
}
public void writeUpdateConfigMessage(String jobId, String config) throws IOException {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot update config: no active autodetect process for job {}", jobId);
return;
}
communicator.writeUpdateConfigMessage(config);
// TODO check for errors from autodetect
}
@Override
public void openJob(String jobId, boolean ignoreDowntime) {
autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> {
AutodetectCommunicator communicator = create(id, ignoreDowntime);
setJobStatus(jobId, JobStatus.OPENED);
return communicator;
});
}
AutodetectCommunicator create(String jobId, boolean ignoreDowntime) {
if (autoDetectCommunicatorByJob.size() == maxRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + maxRunningJobs + "] reached",
RestStatus.FORBIDDEN);
if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
RestStatus.CONFLICT);
}
// TODO norelease, once we remove black hole process
@ -154,35 +194,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
}
@Override
public void flushJob(String jobId, InterimResultsParams params) {
logger.debug("Flushing job {}", jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot flush: no active autodetect process for job {}", jobId);
return;
}
try {
communicator.flushJob(params);
// TODO check for errors from autodetect
} catch (IOException ioe) {
String msg = String.format(Locale.ROOT, "Exception flushing process for job %s", jobId);
logger.warn(msg);
throw ExceptionsHelper.serverError(msg, ioe);
}
}
public void writeUpdateConfigMessage(String jobId, String config) throws IOException {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot update config: no active autodetect process for job {}", jobId);
return;
}
communicator.writeUpdateConfigMessage(config);
// TODO check for errors from autodetect
}
@Override
public void closeJob(String jobId, JobStatus nextStatus) {
public void closeJob(String jobId) {
logger.debug("Closing job {}", jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobId);
if (communicator == null) {
@ -192,14 +204,14 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
try {
communicator.close();
setJobStatus(jobId, nextStatus);
setJobStatus(jobId, JobStatus.CLOSED);
} catch (Exception e) {
logger.warn("Exception closing stopped process input stream", e);
throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e);
}
}
int numberOfRunningJobs() {
int numberOfOpenJobs() {
return autoDetectCommunicatorByJob.size();
}

View File

@ -16,9 +16,8 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.prelert.action.DeleteJobAction;
import org.elasticsearch.xpack.prelert.action.PauseJobAction;
import org.elasticsearch.xpack.prelert.action.PutJobAction;
import org.elasticsearch.xpack.prelert.action.ResumeJobAction;
import org.elasticsearch.xpack.prelert.action.OpenJobAction;
import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.StopJobSchedulerAction;
@ -284,7 +283,7 @@ public class JobManager extends AbstractComponent {
if (schedulerState != null && schedulerState.getStatus() != JobSchedulerStatus.STOPPED) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId));
}
if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.PAUSED, JobStatus.FAILED)) {
if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(
Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, allocation.getStatus()));
}
@ -446,52 +445,21 @@ public class JobManager extends AbstractComponent {
});
}
public void pauseJob(PauseJobAction.Request request, ActionListener<PauseJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("pause-job-" + request.getJobId(),
new AckedClusterStateUpdateTask<PauseJobAction.Response>(request, actionListener) {
@Override
protected PauseJobAction.Response newResponse(boolean acknowledged) {
return new PauseJobAction.Response(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Job job = getJobOrThrowIfUnknown(currentState, request.getJobId());
Allocation allocation = getAllocation(currentState, job.getId());
checkJobIsNotScheduled(job);
if (!allocation.getStatus().isAnyOf(JobStatus.RUNNING, JobStatus.CLOSED)) {
throw ExceptionsHelper.conflictStatusException(
Messages.getMessage(Messages.JOB_CANNOT_PAUSE, job.getId(), allocation.getStatus()));
}
ClusterState newState = innerSetJobStatus(job.getId(), JobStatus.PAUSING, currentState);
Job.Builder jobBuilder = new Job.Builder(job);
jobBuilder.setIgnoreDowntime(IgnoreDowntime.ONCE);
return innerPutJob(jobBuilder.build(), true, newState);
}
});
}
public void resumeJob(ResumeJobAction.Request request, ActionListener<ResumeJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("resume-job-" + request.getJobId(),
new AckedClusterStateUpdateTask<ResumeJobAction.Response>(request, actionListener) {
public void openJob(OpenJobAction.Request request, ActionListener<OpenJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("open-job-" + request.getJobId(),
new AckedClusterStateUpdateTask<OpenJobAction.Response>(request, actionListener) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
getJobOrThrowIfUnknown(request.getJobId());
Allocation allocation = getJobAllocation(request.getJobId());
if (allocation.getStatus() != JobStatus.PAUSED) {
throw ExceptionsHelper.conflictStatusException(
Messages.getMessage(Messages.JOB_CANNOT_RESUME, request.getJobId(), allocation.getStatus()));
}
Allocation.Builder builder = new Allocation.Builder(allocation);
builder.setStatus(JobStatus.CLOSED);
return innerUpdateAllocation(builder.build(), currentState);
PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentState.metaData().custom(PrelertMetadata.TYPE));
builder.createAllocation(request.getJobId(), request.isIgnoreDowntime());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData()).putCustom(PrelertMetadata.TYPE, builder.build()))
.build();
}
@Override
protected ResumeJobAction.Response newResponse(boolean acknowledged) {
return new ResumeJobAction.Response(acknowledged);
protected OpenJobAction.Response newResponse(boolean acknowledged) {
return new OpenJobAction.Response(acknowledged);
}
});
}
@ -506,7 +474,11 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerSetJobStatus(request.getJobId(), request.getStatus(), currentState);
PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentState.metaData().custom(PrelertMetadata.TYPE));
builder.updateStatus(request.getJobId(), request.getStatus(), request.getReason());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData()).putCustom(PrelertMetadata.TYPE, builder.build()))
.build();
}
@Override
@ -542,16 +514,4 @@ public class JobManager extends AbstractComponent {
jobResultsPersister.commitWrites(jobId);
}
private ClusterState innerSetJobStatus(String jobId, JobStatus newStatus, ClusterState currentState) {
Allocation allocation = getJobAllocation(jobId);
Allocation.Builder builder = new Allocation.Builder(allocation);
builder.setStatus(newStatus);
return innerUpdateAllocation(builder.build(), currentState);
}
private void checkJobIsNotScheduled(Job job) {
if (job.getSchedulerConfig() != null) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_ACTION_NOT_ALLOWED_FOR_SCHEDULED_JOB));
}
}
}

View File

@ -27,36 +27,47 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
private static final ParseField NODE_ID_FIELD = new ParseField("node_id");
private static final ParseField JOB_ID_FIELD = new ParseField("job_id");
private static final ParseField IGNORE_DOWNTIME_FIELD = new ParseField("ignore_downtime");
public static final ParseField STATUS = new ParseField("status");
public static final ParseField STATUS_REASON = new ParseField("status_reason");
public static final ParseField SCHEDULER_STATE = new ParseField("scheduler_state");
static final Allocation PROTO = new Allocation(null, null, null, null);
static final Allocation PROTO = new Allocation(null, null, false, null, null, null);
static final ObjectParser<Builder, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>("allocation", Builder::new);
static {
PARSER.declareString(Builder::setNodeId, NODE_ID_FIELD);
PARSER.declareString(Builder::setJobId, JOB_ID_FIELD);
PARSER.declareBoolean(Builder::setIgnoreDowntime, IGNORE_DOWNTIME_FIELD);
PARSER.declareField(Builder::setStatus, (p, c) -> JobStatus.fromString(p.text()), STATUS, ObjectParser.ValueType.STRING);
PARSER.declareString(Builder::setStatusReason, STATUS_REASON);
PARSER.declareObject(Builder::setSchedulerState, SchedulerState.PARSER, SCHEDULER_STATE);
}
private final String nodeId;
private final String jobId;
private final boolean ignoreDowntime;
private final JobStatus status;
private final String statusReason;
private final SchedulerState schedulerState;
public Allocation(String nodeId, String jobId, JobStatus status, SchedulerState schedulerState) {
public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason,
SchedulerState schedulerState) {
this.nodeId = nodeId;
this.jobId = jobId;
this.ignoreDowntime = ignoreDowntime;
this.status = status;
this.statusReason = statusReason;
this.schedulerState = schedulerState;
}
public Allocation(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.jobId = in.readString();
this.ignoreDowntime = in.readBoolean();
this.status = JobStatus.fromStream(in);
this.statusReason = in.readOptionalString();
this.schedulerState = in.readOptionalWriteable(SchedulerState::new);
}
@ -68,10 +79,23 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
return jobId;
}
/**
* @return Whether to ignore downtime at startup.
*
* When the job status is set to STARTED, to ignoreDowntime will be set to false.
*/
public boolean isIgnoreDowntime() {
return ignoreDowntime;
}
public JobStatus getStatus() {
return status;
}
public String getStatusReason() {
return statusReason;
}
public SchedulerState getSchedulerState() {
return schedulerState;
}
@ -85,7 +109,9 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
out.writeString(jobId);
out.writeBoolean(ignoreDowntime);
status.writeTo(out);
out.writeOptionalString(statusReason);
out.writeOptionalWriteable(schedulerState);
}
@ -94,7 +120,11 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
builder.startObject();
builder.field(NODE_ID_FIELD.getPreferredName(), nodeId);
builder.field(JOB_ID_FIELD.getPreferredName(), jobId);
builder.field(IGNORE_DOWNTIME_FIELD.getPreferredName(), ignoreDowntime);
builder.field(STATUS.getPreferredName(), status);
if (statusReason != null) {
builder.field(STATUS_REASON.getPreferredName(), statusReason);
}
if (schedulerState != null) {
builder.field(SCHEDULER_STATE.getPreferredName(), schedulerState);
}
@ -109,13 +139,15 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
Allocation that = (Allocation) o;
return Objects.equals(nodeId, that.nodeId) &&
Objects.equals(jobId, that.jobId) &&
Objects.equals(ignoreDowntime, that.ignoreDowntime) &&
Objects.equals(status, that.status) &&
Objects.equals(statusReason, that.statusReason) &&
Objects.equals(schedulerState, that.schedulerState);
}
@Override
public int hashCode() {
return Objects.hash(nodeId, jobId, status, schedulerState);
return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState);
}
// Class alreadt extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@ -137,7 +169,9 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
private String nodeId;
private String jobId;
private boolean ignoreDowntime;
private JobStatus status;
private String statusReason;
private SchedulerState schedulerState;
public Builder() {
@ -146,7 +180,9 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
public Builder(Allocation allocation) {
this.nodeId = allocation.nodeId;
this.jobId = allocation.jobId;
this.ignoreDowntime = allocation.ignoreDowntime;
this.status = allocation.status;
this.statusReason = allocation.statusReason;
this.schedulerState = allocation.schedulerState;
}
@ -158,35 +194,39 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
this.jobId = jobId;
}
public void setIgnoreDowntime(boolean ignoreDownTime) {
this.ignoreDowntime = ignoreDownTime;
}
@SuppressWarnings("incomplete-switch")
public void setStatus(JobStatus newStatus) {
switch (newStatus) {
case CLOSING:
if (this.status == JobStatus.CLOSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closed");
}
if (this.status == JobStatus.CLOSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closing");
}
break;
case PAUSING:
if (this.status == JobStatus.CLOSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closed");
}
if (this.status == JobStatus.CLOSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closing");
}
if (this.status == JobStatus.PAUSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is pausing");
}
if (this.status == JobStatus.PAUSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is already paused");
if (this.status != null) {
switch (newStatus) {
case CLOSING:
if (this.status != JobStatus.OPENED) {
throw new IllegalArgumentException("[" + jobId + "] expected status [" + JobStatus.OPENED
+ "], but got [" + status +"]");
}
break;
case OPENING:
if (this.status.isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) {
throw new IllegalArgumentException("[" + jobId + "] expected status [" + JobStatus.CLOSED
+ "] or [" + JobStatus.FAILED + "], but got [" + status +"]");
}
break;
case OPENED:
ignoreDowntime = false;
break;
}
}
this.status = newStatus;
}
public void setStatusReason(String statusReason) {
this.statusReason = statusReason;
}
public void setSchedulerState(SchedulerState schedulerState) {
JobSchedulerStatus currentSchedulerStatus = this.schedulerState == null ?
JobSchedulerStatus.STOPPED : this.schedulerState.getStatus();
@ -225,10 +265,7 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
}
public Allocation build() {
if (status == null) {
status = JobStatus.CLOSED;
}
return new Allocation(nodeId, jobId, status, schedulerState);
return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState);
}
}

View File

@ -32,7 +32,7 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe
clusterService.add(this);
}
ClusterState allocateJobs(ClusterState current) {
ClusterState assignJobsToNodes(ClusterState current) {
if (shouldAllocate(current) == false) {
// returning same instance, so no cluster state update is performed
return current;
@ -48,16 +48,8 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe
PrelertMetadata.Builder builder = new PrelertMetadata.Builder(prelertMetadata);
DiscoveryNode prelertNode = nodes.getMasterNode(); // prelert is now always master node
for (String jobId : prelertMetadata.getJobs().keySet()) {
if (prelertMetadata.getAllocations().containsKey(jobId) == false) {
boolean addSchedulderState = prelertMetadata.getJobs().get(jobId).getSchedulerConfig() != null;
if (addSchedulderState) {
builder.putAllocationWithScheduler(prelertNode.getId(), jobId);
}
else {
builder.putAllocation(prelertNode.getId(), jobId);
}
}
for (String jobId : prelertMetadata.getAllocations().keySet()) {
builder.assignToNode(jobId, prelertNode.getId());
}
return ClusterState.builder(current)
@ -71,8 +63,8 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe
return false;
}
for (String jobId : prelertMetadata.getJobs().keySet()) {
if (prelertMetadata.getAllocations().containsKey(jobId) == false) {
for (Allocation allocation : prelertMetadata.getAllocations().values()) {
if (allocation.getNodeId() == null) {
return true;
}
}
@ -87,7 +79,7 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe
clusterService.submitStateUpdateTask("allocate_jobs", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return allocateJobs(currentState);
return assignJobsToNodes(currentState);
}
@Override

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.metadata;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -21,7 +20,6 @@ import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
@ -29,10 +27,10 @@ import java.util.concurrent.Executor;
public class JobLifeCycleService extends AbstractComponent implements ClusterStateListener {
volatile Set<String> localAllocatedJobs = Collections.emptySet();
volatile Set<String> localAssignedJobs = new HashSet<>();
private final Client client;
private final ScheduledJobService scheduledJobService;
private DataProcessor dataProcessor;
private final DataProcessor dataProcessor;
private final Executor executor;
public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, ScheduledJobService scheduledJobService,
@ -54,7 +52,7 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
}
// Single volatile read:
Set<String> localAllocatedJobs = this.localAllocatedJobs;
Set<String> localAssignedJobs = this.localAssignedJobs;
DiscoveryNode localNode = event.state().nodes().getLocalNode();
for (Allocation allocation : prelertMetadata.getAllocations().values()) {
@ -63,10 +61,10 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
}
}
for (String localAllocatedJob : localAllocatedJobs) {
for (String localAllocatedJob : localAssignedJobs) {
Allocation allocation = prelertMetadata.getAllocations().get(localAllocatedJob);
if (allocation != null) {
if (localNode.getId().equals(allocation.getNodeId()) == false) {
if (localNode.getId().equals(allocation.getNodeId()) && allocation.getStatus() == JobStatus.CLOSING) {
stopJob(localAllocatedJob);
}
} else {
@ -77,35 +75,15 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
private void handleLocallyAllocatedJob(PrelertMetadata prelertMetadata, Allocation allocation) {
Job job = prelertMetadata.getJobs().get(allocation.getJobId());
if (localAllocatedJobs.contains(allocation.getJobId()) == false) {
startJob(job);
if (localAssignedJobs.contains(allocation.getJobId()) == false) {
if (allocation.getStatus() == JobStatus.OPENING) {
startJob(allocation);
}
}
handleJobStatusChange(job, allocation.getStatus());
handleSchedulerStatusChange(job, allocation);
}
private void handleJobStatusChange(Job job, JobStatus status) {
switch (status) {
case PAUSING:
executor.execute(() -> pauseJob(job));
break;
case RUNNING:
break;
case CLOSING:
executor.execute(() -> closeJob(job));
break;
case CLOSED:
break;
case PAUSED:
break;
case FAILED:
break;
default:
throw new IllegalStateException("Unknown job status [" + status + "]");
}
}
private void handleSchedulerStatusChange(Job job, Allocation allocation) {
SchedulerState schedulerState = allocation.getSchedulerState();
if (schedulerState != null) {
@ -126,54 +104,47 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
}
}
void startJob(Job job) {
logger.info("Starting job [" + job.getId() + "]");
// noop now, but should delegate to a task / ProcessManager that actually starts the job
void startJob(Allocation allocation) {
logger.info("Starting job [" + allocation.getJobId() + "]");
executor.execute(() -> {
try {
dataProcessor.openJob(allocation.getJobId(), allocation.isIgnoreDowntime());
} catch (Exception e) {
logger.error("Failed to close job [" + allocation.getJobId() + "]", e);
updateJobStatus(allocation.getJobId(), JobStatus.FAILED, "failed to open, " + e.getMessage());
}
});
// update which jobs are now allocated locally
Set<String> newSet = new HashSet<>(localAllocatedJobs);
newSet.add(job.getId());
localAllocatedJobs = newSet;
Set<String> newSet = new HashSet<>(localAssignedJobs);
newSet.add(allocation.getJobId());
localAssignedJobs = newSet;
}
void stopJob(String jobId) {
logger.info("Stopping job [" + jobId + "]");
// noop now, but should delegate to a task / ProcessManager that actually stops the job
executor.execute(() -> {
try {
dataProcessor.closeJob(jobId);
} catch (Exception e) {
logger.error("Failed to close job [" + jobId + "]", e);
updateJobStatus(jobId, JobStatus.FAILED, "failed to close, " + e.getMessage());
}
});
// update which jobs are now allocated locally
Set<String> newSet = new HashSet<>(localAllocatedJobs);
Set<String> newSet = new HashSet<>(localAssignedJobs);
newSet.remove(jobId);
localAllocatedJobs = newSet;
localAssignedJobs = newSet;
}
private void closeJob(Job job) {
try {
// NORELEASE Ensure this also removes the job auto-close timeout task
dataProcessor.closeJob(job.getId(), JobStatus.CLOSED);
} catch (ElasticsearchException e) {
logger.error("Failed to close job [" + job.getId() + "]", e);
updateJobStatus(job.getId(), JobStatus.FAILED);
}
}
private void pauseJob(Job job) {
try {
// NORELEASE Ensure this also removes the job auto-close timeout task
dataProcessor.closeJob(job.getId(), JobStatus.PAUSED);
} catch (ElasticsearchException e) {
logger.error("Failed to close job [" + job.getId() + "] while pausing", e);
updateJobStatus(job.getId(), JobStatus.FAILED);
}
}
private void updateJobStatus(String jobId, JobStatus status) {
private void updateJobStatus(String jobId, JobStatus status, String reason) {
UpdateJobStatusAction.Request request = new UpdateJobStatusAction.Request(jobId, status);
request.setReason(reason);
client.execute(UpdateJobStatusAction.INSTANCE, request, new ActionListener<UpdateJobStatusAction.Response>() {
@Override
public void onResponse(UpdateJobStatusAction.Response response) {
logger.info("Successfully set job status to [{}] for job [{}]", status, jobId);
// NORELEASE Audit job paused
// audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_PAUSED));
}
@Override

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
@ -215,23 +216,6 @@ public class PrelertMetadata implements MetaData.Custom {
return this;
}
public Builder putAllocation(String nodeId, String jobId) {
Allocation.Builder builder = new Allocation.Builder();
builder.setJobId(jobId);
builder.setNodeId(nodeId);
this.allocations.put(jobId, builder.build());
return this;
}
public Builder putAllocationWithScheduler(String nodeId, String jobId) {
Allocation.Builder builder = new Allocation.Builder();
builder.setJobId(jobId);
builder.setNodeId(nodeId);
builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
this.allocations.put(jobId, builder.build());
return this;
}
public Builder updateAllocation(String jobId, Allocation updated) {
Allocation previous = this.allocations.put(jobId, updated);
if (previous == null) {
@ -264,6 +248,65 @@ public class PrelertMetadata implements MetaData.Custom {
public PrelertMetadata build() {
return new PrelertMetadata(jobs, allocations);
}
public Builder createAllocation(String jobId, boolean ignoreDowntime) {
Job job = jobs.get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
}
Allocation allocation = allocations.get(jobId);
Allocation.Builder builder;
if (allocation == null) {
builder = new Allocation.Builder();
builder.setJobId(jobId);
boolean addSchedulderState = job.getSchedulerConfig() != null;
if (addSchedulderState) {
builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
}
} else {
if (allocation.getStatus() != JobStatus.CLOSED) {
throw ExceptionsHelper.conflictStatusException("[" + jobId + "] expected status [" + JobStatus.CLOSED
+ "], but got [" + allocation.getStatus() +"]");
}
builder = new Allocation.Builder(allocation);
}
builder.setStatus(JobStatus.OPENING);
builder.setIgnoreDowntime(ignoreDowntime);
allocations.put(jobId, builder.build());
return this;
}
public Builder assignToNode(String jobId, String nodeId) {
Allocation allocation = allocations.get(jobId);
if (allocation == null) {
throw new IllegalStateException("[" + jobId + "] no allocation to assign to node [" + nodeId + "]");
}
Allocation.Builder builder = new Allocation.Builder(allocation);
builder.setNodeId(nodeId);
allocations.put(jobId, builder.build());
return this;
}
public Builder updateStatus(String jobId, JobStatus jobStatus, @Nullable String reason) {
Allocation previous = allocations.get(jobId);
if (previous == null) {
throw new IllegalStateException("[" + jobId + "] no allocation exist to update the status to [" + jobStatus + "]");
}
Allocation.Builder builder = new Allocation.Builder(previous);
builder.setStatus(jobStatus);
if (reason != null) {
builder.setStatusReason(reason);
}
if (previous.getStatus() != jobStatus && jobStatus == JobStatus.CLOSED) {
Job.Builder job = new Job.Builder(this.jobs.get(jobId));
job.setFinishedTime(new Date());
this.jobs.put(job.getId(), job.build());
}
allocations.put(jobId, builder.build());
return this;
}
}
}

View File

@ -34,7 +34,7 @@ public class AutodetectResultsParser extends AbstractComponent {
this.parseFieldMatcherSupplier = parseFieldMatcherSupplier;
}
CloseableIterator<AutodetectResult> parseResults(InputStream in) throws ElasticsearchParseException {
public CloseableIterator<AutodetectResult> parseResults(InputStream in) throws ElasticsearchParseException {
try {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(in);
XContentParser.Token token = parser.nextToken();

View File

@ -1,41 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.rest.data;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.PostDataCloseAction;
import org.elasticsearch.xpack.prelert.job.Job;
import java.io.IOException;
public class RestPostDataCloseAction extends BaseRestHandler {
private final PostDataCloseAction.TransportAction transportPostDataCloseAction;
@Inject
public RestPostDataCloseAction(Settings settings, RestController controller,
PostDataCloseAction.TransportAction transportPostDataCloseAction) {
super(settings);
this.transportPostDataCloseAction = transportPostDataCloseAction;
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH
+ "data/{" + Job.ID.getPreferredName() + "}/_close", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
PostDataCloseAction.Request postDataCloseRequest = new PostDataCloseAction.Request(
restRequest.param(Job.ID.getPreferredName()));
return channel -> transportPostDataCloseAction.execute(postDataCloseRequest, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -8,31 +8,35 @@ package org.elasticsearch.xpack.prelert.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.PauseJobAction;
import org.elasticsearch.xpack.prelert.action.CloseJobAction;
import org.elasticsearch.xpack.prelert.job.Job;
import java.io.IOException;
public class RestPauseJobAction extends BaseRestHandler {
public class RestCloseJobAction extends BaseRestHandler {
private final PauseJobAction.TransportAction transportPauseJobAction;
private final CloseJobAction.TransportAction closeJobAction;
@Inject
public RestPauseJobAction(Settings settings, RestController controller, PauseJobAction.TransportAction transportPauseJobAction) {
public RestCloseJobAction(Settings settings, RestController controller, CloseJobAction.TransportAction closeJobAction) {
super(settings);
this.transportPauseJobAction = transportPauseJobAction;
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "jobs/{" + Job.ID.getPreferredName() + "}/_pause",
this);
this.closeJobAction = closeJobAction;
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH
+ "data/{" + Job.ID.getPreferredName() + "}/_close", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
PauseJobAction.Request request = new PauseJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
return channel -> transportPauseJobAction.execute(request, new AcknowledgedRestListener<>(channel));
CloseJobAction.Request request = new CloseJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
if (restRequest.hasParam("close_timeout")) {
request.setCloseTimeout(TimeValue.parseTimeValue(restRequest.param("close_timeout"), "close_timeout"));
}
return channel -> closeJobAction.execute(request, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -8,31 +8,36 @@ package org.elasticsearch.xpack.prelert.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.ResumeJobAction;
import org.elasticsearch.xpack.prelert.action.OpenJobAction;
import org.elasticsearch.xpack.prelert.job.Job;
import java.io.IOException;
public class RestResumeJobAction extends BaseRestHandler {
public class RestOpenJobAction extends BaseRestHandler {
private final ResumeJobAction.TransportAction transportResumeJobAction;
private final OpenJobAction.TransportAction openJobAction;
@Inject
public RestResumeJobAction(Settings settings, RestController controller, ResumeJobAction.TransportAction transportResumeJobAction) {
public RestOpenJobAction(Settings settings, RestController controller, OpenJobAction.TransportAction openJobAction) {
super(settings);
this.transportResumeJobAction = transportResumeJobAction;
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "jobs/{" + Job.ID.getPreferredName() + "}/_resume",
this.openJobAction = openJobAction;
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "data/{" + Job.ID.getPreferredName() + "}/_open",
this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
ResumeJobAction.Request request = new ResumeJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
return channel -> transportResumeJobAction.execute(request, new AcknowledgedRestListener<>(channel));
OpenJobAction.Request request = new OpenJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
request.setIgnoreDowntime(restRequest.paramAsBoolean("ignore_downtime", false));
if (restRequest.hasParam("open_timeout")) {
request.setOpenTimeout(TimeValue.parseTimeValue(restRequest.param("open_timeout"), "open_timeout"));
}
return channel -> openJobAction.execute(request, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -5,10 +5,10 @@
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.PauseJobAction.Request;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import org.elasticsearch.xpack.prelert.action.CloseJobAction.Request;
public class PauseJobRequestTests extends AbstractStreamableTestCase<Request> {
public class CloseJobActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {

View File

@ -5,10 +5,10 @@
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.OpenJobAction.Request;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import org.elasticsearch.xpack.prelert.action.PostDataCloseAction.Request;
public class PostDataCloseRequestTests extends AbstractStreamableTestCase<Request> {
public class OpenJobActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {

View File

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.ResumeJobAction.Request;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
public class ResumeJobRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAsciiOfLengthBetween(1, 20));
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -72,6 +72,8 @@ public class ScheduledJobsIT extends ESIntegTestCase {
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
assertTrue(openJobResponse.isAcknowledged());
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, now);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState);
@ -102,6 +104,8 @@ public class ScheduledJobsIT extends ESIntegTestCase {
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
assertTrue(openJobResponse.isAcknowledged());
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, null);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState);
@ -211,8 +215,8 @@ public class ScheduledJobsIT extends ESIntegTestCase {
// ignore
}
try {
PostDataCloseAction.Response response =
client.execute(PostDataCloseAction.INSTANCE, new PostDataCloseAction.Request(jobId)).get();
CloseJobAction.Response response =
client.execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId)).get();
assertTrue(response.isAcknowledged());
} catch (Exception e) {
// ignore

View File

@ -241,74 +241,6 @@ public class PrelertJobIT extends ESRestTestCase {
assertThat(responseAsString, containsString("\"count\":1"));
}
public void testPauseAndResumeJob() throws Exception {
createFarequoteJob();
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause"));
assertThat(e.getMessage(), containsString("[farequote][CLOSED] can't pause a job that is closed"));
client().performRequest("post", PrelertPlugin.BASE_PATH + "data/farequote/", Collections.emptyMap(),
new StringEntity("time,airline,responsetime,sourcetype\n" +
"2014-06-23 00:00:00Z,AAL,132.2046,farequote"));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote",
Collections.singletonMap("metric", "status"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\""));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause");
assertBusy(() -> {
try {
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote",
Collections.singletonMap("metric", "config,status"));
String responseEntityToString = responseEntityToString(response);
assertThat(responseEntityToString, containsString("\"ignore_downtime\":\"ONCE\""));
assertThat(responseEntityToString, containsString("\"status\":\"PAUSED\""));
} catch (Exception e1) {
fail();
}
});
e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409));
assertThat(e.getMessage(), containsString("Cannot pause job 'farequote' while its status is PAUSED"));
client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume");
client().performRequest("post", PrelertPlugin.BASE_PATH + "data/farequote/", Collections.emptyMap(),
new StringEntity("time,airline,responsetime,sourcetype\n" +
"2014-06-23 00:00:00Z,AAL,132.2046,farequote"));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote",
Collections.singletonMap("metric", "status"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\""));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409));
assertThat(e.getMessage(), containsString("Cannot resume job 'farequote' while its status is RUNNING"));
}
public void testResumeJob_GivenJobIsClosed() throws Exception {
createFarequoteJob();
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409));
assertThat(e.getMessage(), containsString("Cannot resume job 'farequote' while its status is CLOSED"));
}
private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception {
try {
client().performRequest("put", "prelertresults-" + jobId, Collections.emptyMap(), new StringEntity(RESULT_MAPPING));

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.ScheduledJobsIT;
import org.junit.After;
import java.io.BufferedReader;
@ -52,6 +53,7 @@ public class ScheduledJobIT extends ESRestTestCase {
String jobId = "_id2";
createAirlineDataIndex();
createScheduledJob(jobId);
openJob(client(), jobId);
Response startSchedulerRequest = client().performRequest("post",
PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z");
@ -76,6 +78,7 @@ public class ScheduledJobIT extends ESRestTestCase {
String jobId = "_id3";
createAirlineDataIndex();
createScheduledJob(jobId);
openJob(client(), jobId);
Response response = client().performRequest("post",
PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z");
@ -85,9 +88,9 @@ public class ScheduledJobIT extends ESRestTestCase {
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId,
Collections.singletonMap("metric", "status,data_counts"));
Collections.singletonMap("metric", "data_counts,status"));
String responseAsString = responseEntityToString(getJobResponse);
assertThat(responseAsString, containsString("\"status\":\"RUNNING\""));
assertThat(responseAsString, containsString("\"status\":\"OPENED\""));
assertThat(responseAsString, containsString("\"input_record_count\":2"));
} catch (Exception e1) {
throw new RuntimeException(e1);
@ -215,11 +218,17 @@ public class ScheduledJobIT extends ESRestTestCase {
// ignore
}
try {
client.performRequest("POST", "/_xpack/prelert/data/" + jobId + "/_close");
Response response = client.performRequest("POST", "/_xpack/prelert/data/" + jobId + "/_close");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
} catch (Exception e) {
// ignore
}
client.performRequest("DELETE", "/_xpack/prelert/jobs/" + jobId);
}
}
public static void openJob(RestClient client, String jobId) throws IOException {
Response response = client.performRequest("post", PrelertPlugin.BASE_PATH + "data/" + jobId + "/_open");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
}
}

View File

@ -7,20 +7,20 @@ package org.elasticsearch.xpack.prelert.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.GetJobsAction;
import org.elasticsearch.xpack.prelert.action.PostDataAction;
import org.elasticsearch.xpack.prelert.action.OpenJobAction;
import org.elasticsearch.xpack.prelert.action.PutJobAction;
import org.elasticsearch.xpack.prelert.action.ScheduledJobsIT;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
import org.junit.After;
import java.util.Collection;
@ -42,70 +42,55 @@ public class TooManyJobsIT extends ESIntegTestCase {
@After
public void clearPrelertMetadata() throws Exception {
ScheduledJobsIT.clearPrelertMetadata(client());
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), (String) null)
).get();
}
public void testCannotStartTooManyAnalyticalProcesses() throws Exception {
String jsonLine = "{\"time\": \"0\"}";
int maxNumJobs = 1000;
for (int i = 1; i <= maxNumJobs; i++) {
int maxRunningJobsPerNode = randomIntBetween(1, 16);
logger.info("Setting [{}] to [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode);
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder()
.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode)
).get();
for (int i = 1; i <= (maxRunningJobsPerNode + 1); i++) {
Job.Builder job = createJob(Integer.toString(i));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
assertBusy(() -> {
try {
GetJobsAction.Request getJobRequest = new GetJobsAction.Request();
getJobRequest.setJobId(job.getId());
getJobRequest.status(true);
GetJobsAction.Response response = client().execute(GetJobsAction.INSTANCE, getJobRequest).get();
GetJobsAction.Response.JobInfo jobInfo = response.getResponse().results().get(0);
assertNotNull(jobInfo);
assertEquals(JobStatus.CLOSED, jobInfo.getStatus());
} catch (Exception e) {
fail("failure " + e.getMessage());
}
});
// triggers creating autodetect process:
PostDataAction.Request postDataRequest = new PostDataAction.Request(job.getId());
postDataRequest.setContent(new BytesArray(jsonLine));
try {
PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).get();
assertEquals(1, postDataResponse.getDataCounts().getInputRecordCount());
logger.info("Posted data {} times", i);
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
openJobRequest.setOpenTimeout(TimeValue.timeValueSeconds(10));
OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, openJobRequest)
.get();
assertTrue(openJobResponse.isAcknowledged());
logger.info("Opened {}th job", i);
} catch (Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e.getCause());
if (ElasticsearchStatusException.class.equals(cause.getClass()) == false) {
logger.warn("Unexpected cause", e);
}
assertEquals(ElasticsearchStatusException.class, cause.getClass());
assertEquals(RestStatus.FORBIDDEN, ((ElasticsearchStatusException) cause).status());
logger.info("good news everybody --> reached threadpool capacity after starting {}th analytical process", i);
assertEquals(RestStatus.CONFLICT, ((ElasticsearchStatusException) cause).status());
assertEquals("[" + (maxRunningJobsPerNode + 1) + "] expected job status [OPENED], but got [FAILED], reason " +
"[failed to open, max running job capacity [" + maxRunningJobsPerNode + "] reached]", cause.getMessage());
logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i);
// now manually clean things up and see if we can succeed to start one new job
clearPrelertMetadata();
putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
assertBusy(() -> {
try {
GetJobsAction.Request getJobRequest = new GetJobsAction.Request();
getJobRequest.setJobId(job.getId());
getJobRequest.status(true);
GetJobsAction.Response response = client().execute(GetJobsAction.INSTANCE, getJobRequest).get();
GetJobsAction.Response.JobInfo jobInfo = response.getResponse().results().get(0);
assertNotNull(jobInfo);
assertEquals(JobStatus.CLOSED, jobInfo.getStatus());
} catch (Exception e1) {
fail("failure " + e1.getMessage());
}
});
PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).get();
assertEquals(1, postDataResponse.getDataCounts().getInputRecordCount());
OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()))
.get();
assertTrue(openJobResponse.isAcknowledged());
return;
}
}
fail("shouldn't be able to add [" + maxNumJobs + "] jobs");
fail("shouldn't be able to add more than [" + maxRunningJobsPerNode + "] jobs");
}
private Job.Builder createJob(String id) {

View File

@ -13,30 +13,27 @@ public class JobStatusTests extends ESTestCase {
assertEquals(JobStatus.fromString("closed"), JobStatus.CLOSED);
assertEquals(JobStatus.fromString("closing"), JobStatus.CLOSING);
assertEquals(JobStatus.fromString("failed"), JobStatus.FAILED);
assertEquals(JobStatus.fromString("paused"), JobStatus.PAUSED);
assertEquals(JobStatus.fromString("pausing"), JobStatus.PAUSING);
assertEquals(JobStatus.fromString("running"), JobStatus.RUNNING);
assertEquals(JobStatus.fromString("opening"), JobStatus.OPENING);
assertEquals(JobStatus.fromString("opened"), JobStatus.OPENED);
}
public void testValidOrdinals() {
assertEquals(0, JobStatus.RUNNING.ordinal());
assertEquals(1, JobStatus.CLOSING.ordinal());
assertEquals(2, JobStatus.CLOSED.ordinal());
assertEquals(3, JobStatus.FAILED.ordinal());
assertEquals(4, JobStatus.PAUSING.ordinal());
assertEquals(5, JobStatus.PAUSED.ordinal());
assertEquals(0, JobStatus.CLOSING.ordinal());
assertEquals(1, JobStatus.CLOSED.ordinal());
assertEquals(2, JobStatus.OPENING.ordinal());
assertEquals(3, JobStatus.OPENED.ordinal());
assertEquals(4, JobStatus.FAILED.ordinal());
}
public void testIsAnyOf() {
assertFalse(JobStatus.RUNNING.isAnyOf());
assertFalse(JobStatus.RUNNING.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING, JobStatus.FAILED,
JobStatus.PAUSED, JobStatus.PAUSING));
assertFalse(JobStatus.CLOSED.isAnyOf(JobStatus.RUNNING, JobStatus.CLOSING, JobStatus.FAILED,
JobStatus.PAUSED, JobStatus.PAUSING));
assertFalse(JobStatus.OPENED.isAnyOf());
assertFalse(JobStatus.OPENED.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING, JobStatus.FAILED,
JobStatus.OPENING));
assertFalse(JobStatus.CLOSED.isAnyOf(JobStatus.CLOSING, JobStatus.FAILED, JobStatus.OPENING, JobStatus.OPENED));
assertTrue(JobStatus.RUNNING.isAnyOf(JobStatus.RUNNING));
assertTrue(JobStatus.RUNNING.isAnyOf(JobStatus.RUNNING, JobStatus.CLOSED));
assertTrue(JobStatus.PAUSED.isAnyOf(JobStatus.PAUSED, JobStatus.PAUSING));
assertTrue(JobStatus.PAUSING.isAnyOf(JobStatus.PAUSED, JobStatus.PAUSING));
assertTrue(JobStatus.OPENED.isAnyOf(JobStatus.OPENED));
assertTrue(JobStatus.OPENED.isAnyOf(JobStatus.OPENED, JobStatus.CLOSED));
assertTrue(JobStatus.CLOSING.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING));
assertTrue(JobStatus.CLOSED.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING));
}
}

View File

@ -6,11 +6,16 @@
package org.elasticsearch.xpack.prelert.job.manager;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.DataDescription;
@ -29,6 +34,8 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectR
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
import org.junit.Before;
import org.mockito.Mockito;
@ -37,9 +44,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.doThrow;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify;
@ -48,6 +58,7 @@ import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -70,17 +81,83 @@ public class AutodetectProcessManagerTests extends ESTestCase {
jobProvider = mock(JobProvider.class);
jobResultsPersister = mock(JobResultsPersister.class);
jobDataCountsPersister = mock(JobDataCountsPersister.class);
givenAllocationWithStatus(JobStatus.CLOSED);
givenAllocationWithStatus(JobStatus.OPENED);
}
public void testCreateProcessBySubmittingData() {
public void testOpenJob() {
Client client = mock(Client.class);
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
AutodetectProcessManager manager = createManager(communicator, client);
manager.openJob("foo", false);
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("foo", JobStatus.OPENED);
verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any());
}
public void testOpenJob_exceedMaxNumJobs() {
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
when(jobProvider.dataCounts("foo")).thenReturn(new DataCounts("foo"));
when(jobManager.getJobOrThrowIfUnknown("bar")).thenReturn(createJobDetails("bar"));
when(jobProvider.dataCounts("bar")).thenReturn(new DataCounts("bar"));
when(jobManager.getJobOrThrowIfUnknown("baz")).thenReturn(createJobDetails("baz"));
when(jobProvider.dataCounts("baz")).thenReturn(new DataCounts("baz"));
when(jobManager.getJobOrThrowIfUnknown("foobar")).thenReturn(createJobDetails("foobar"));
when(jobProvider.dataCounts("foobar")).thenReturn(new DataCounts("foobar"));
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class);
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME)).thenReturn(executorService);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
@SuppressWarnings("unchecked")
CloseableIterator<AutodetectResult> iterator = mock(CloseableIterator.class);
when(iterator.hasNext()).thenReturn(false);
when(parser.parseResults(any())).thenReturn(iterator);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
when(autodetectProcess.isProcessAlive()).thenReturn(true);
when(autodetectProcess.getPersistStream()).thenReturn(new ByteArrayInputStream(new byte[0]));
AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess;
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
Set<Setting<?>> settingSet = new HashSet<>();
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
AutodetectProcessManager manager = new AutodetectProcessManager(settings.build(), client, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings);
manager.openJob("foo", false);
manager.openJob("bar", false);
manager.openJob("baz", false);
assertEquals(3, manager.numberOfOpenJobs());
Exception e = expectThrows(ElasticsearchStatusException.class, () -> manager.openJob("foobar", false));
assertEquals("max running job capacity [3] reached", e.getMessage());
manager.closeJob("baz");
assertEquals(2, manager.numberOfOpenJobs());
manager.openJob("foobar", false);
assertEquals(3, manager.numberOfOpenJobs());
}
public void testProcessData() {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
assertEquals(0, manager.numberOfRunningJobs());
assertEquals(0, manager.numberOfOpenJobs());
DataLoadParams params = new DataLoadParams(TimeRange.builder().build());
manager.openJob("foo", false);
manager.processData("foo", createInputStream(""), params, () -> false);
assertEquals(1, manager.numberOfRunningJobs());
assertEquals(1, manager.numberOfOpenJobs());
}
public void testProcessDataThrowsElasticsearchStatusException_onIoException() throws Exception {
@ -92,6 +169,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Supplier<Boolean> cancellable = () -> false;
doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params, cancellable);
manager.openJob("foo", false);
ESTestCase.expectThrows(ElasticsearchException.class,
() -> manager.processData("foo", inputStream, params, cancellable));
}
@ -100,14 +178,15 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
AutodetectProcessManager manager = createManager(communicator);
assertEquals(0, manager.numberOfRunningJobs());
assertEquals(0, manager.numberOfOpenJobs());
manager.openJob("foo", false);
manager.processData("foo", createInputStream(""), mock(DataLoadParams.class), () -> false);
// job is created
assertEquals(1, manager.numberOfRunningJobs());
manager.closeJob("foo", JobStatus.CLOSED);
assertEquals(0, manager.numberOfRunningJobs());
assertEquals(1, manager.numberOfOpenJobs());
manager.closeJob("foo");
assertEquals(0, manager.numberOfOpenJobs());
}
public void testBucketResetMessageIsSent() throws IOException {
@ -117,6 +196,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Supplier<Boolean> cancellable = () -> false;
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true);
InputStream inputStream = createInputStream("");
manager.openJob("foo", false);
manager.processData("foo", inputStream, params, cancellable);
verify(communicator).writeToJob(inputStream, params, cancellable);
}
@ -127,6 +207,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
InputStream inputStream = createInputStream("");
manager.openJob("foo", false);
manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false);
InterimResultsParams params = InterimResultsParams.builder().build();
@ -143,7 +224,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
doThrow(new IOException("blah")).when(communicator).flushJob(params);
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> manager.flushJob("foo", params));
assertEquals("Exception flushing process for job foo", e.getMessage());
assertEquals("[foo] exception while flushing job", e.getMessage());
}
public void testWriteUpdateConfigMessage() throws IOException {
@ -158,34 +239,25 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
assertFalse(manager.jobHasActiveAutodetectProcess("foo"));
manager.openJob("foo", false);
manager.processData("foo", createInputStream(""), mock(DataLoadParams.class), () -> false);
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
assertFalse(manager.jobHasActiveAutodetectProcess("bar"));
}
public void testProcessData_GivenPausingJob() {
public void testProcessData_GivenStatusNotStarted() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(communicator.writeToJob(any(), any(), any())).thenReturn(new DataCounts("foo"));
AutodetectProcessManager manager = createManager(communicator);
Job job = createJobDetails("foo");
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(job);
givenAllocationWithStatus(JobStatus.PAUSING);
givenAllocationWithStatus(JobStatus.OPENED);
InputStream inputStream = createInputStream("");
DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false);
assertThat(dataCounts, equalTo(new DataCounts("foo")));
}
public void testProcessData_GivenPausedJob() {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
Job job = createJobDetails("foo");
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(job);
givenAllocationWithStatus(JobStatus.PAUSED);
AutodetectProcessManager manager = createManager(communicator);
InputStream inputStream = createInputStream("");
manager.openJob("foo", false);
DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false);
assertThat(dataCounts, equalTo(new DataCounts("foo")));
@ -200,11 +272,15 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown("_id")).thenReturn(createJobDetails("_id"));
when(jobProvider.dataCounts("_id")).thenReturn(new DataCounts("_id"));
Set<Setting<?>> settingSet = new HashSet<>();
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory);
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings);
expectThrows(EsRejectedExecutionException.class, () -> manager.create("_id", false));
verify(autodetectProcess, times(1)).close();
@ -212,18 +288,25 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private void givenAllocationWithStatus(JobStatus status) {
Allocation.Builder allocation = new Allocation.Builder();
allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses
allocation.setStatus(status);
when(jobManager.getJobAllocation("foo")).thenReturn(allocation.build());
}
private AutodetectProcessManager createManager(AutodetectCommunicator communicator) {
Client client = mock(Client.class);
return createManager(communicator, client);
}
private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) {
ThreadPool threadPool = mock(ThreadPool.class);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
Set<Setting<?>> settingSet = new HashSet<>();
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager,
jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory);
jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings);
manager = spy(manager);
doReturn(communicator).when(manager).create(any(), anyBoolean());
return manager;
@ -231,6 +314,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommunicator communicator, String jobId) {
AutodetectProcessManager manager = createManager(communicator);
manager.openJob(jobId, false);
manager.processData(jobId, createInputStream(""), mock(DataLoadParams.class), () -> false);
return manager;
}

View File

@ -43,13 +43,12 @@ public class JobManagerTests extends ESTestCase {
private ClusterService clusterService;
private JobProvider jobProvider;
private Auditor auditor;
@Before
public void setupMocks() {
clusterService = mock(ClusterService.class);
jobProvider = mock(JobProvider.class);
auditor = mock(Auditor.class);
Auditor auditor = mock(Auditor.class);
when(jobProvider.audit(anyString())).thenReturn(auditor);
}
@ -65,7 +64,7 @@ public class JobManagerTests extends ESTestCase {
}
public void testFilter() {
Set<String> running = new HashSet<String>(Arrays.asList("henry", "dim", "dave"));
Set<String> running = new HashSet<>(Arrays.asList("henry", "dim", "dave"));
Set<String> diff = new HashSet<>(Arrays.asList("dave", "tom")).stream().filter((s) -> !running.contains(s))
.collect(Collectors.toCollection(HashSet::new));
@ -85,7 +84,7 @@ public class JobManagerTests extends ESTestCase {
assertThat(prelertMetadata.getJobs().containsKey("foo"), is(false));
}
public void testRemoveJobFromClusterState_GivenJobIsRunning() {
public void testRemoveJobFromClusterState_GivenJobIsOpened() {
JobManager jobManager = createJobManager();
ClusterState clusterState = createClusterState();
Job job = buildJobBuilder("foo").build();
@ -93,9 +92,10 @@ public class JobManagerTests extends ESTestCase {
Allocation.Builder allocation = new Allocation.Builder();
allocation.setNodeId("myNode");
allocation.setJobId(job.getId());
allocation.setStatus(JobStatus.RUNNING);
allocation.setStatus(JobStatus.OPENING);
PrelertMetadata.Builder newMetadata = new PrelertMetadata.Builder(clusterState.metaData().custom(PrelertMetadata.TYPE));
newMetadata.putAllocation("myNode", job.getId());
newMetadata.createAllocation(job.getId(), false);
newMetadata.assignToNode(job.getId(), "myNode");
newMetadata.updateAllocation(job.getId(), allocation.build());
ClusterState jobRunningClusterState = new ClusterState.Builder(clusterState)
@ -104,7 +104,7 @@ public class JobManagerTests extends ESTestCase {
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> jobManager.removeJobFromClusterState("foo", jobRunningClusterState));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
assertThat(e.getMessage(), equalTo("Cannot delete job 'foo' while it is RUNNING"));
assertThat(e.getMessage(), equalTo("Cannot delete job 'foo' while it is OPENING"));
}
public void testRemoveJobFromClusterState_jobMissing() {
@ -137,7 +137,8 @@ public class JobManagerTests extends ESTestCase {
Job job = buildJobBuilder("foo").build();
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder()
.putJob(job, false)
.putAllocation("nodeId", "foo")
.createAllocation("foo", false)
.assignToNode("foo", "nodeId")
.build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)).build();

View File

@ -19,9 +19,11 @@ public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
protected Allocation createTestInstance() {
String nodeId = randomAsciiOfLength(10);
String jobId = randomAsciiOfLength(10);
boolean ignoreDowntime = randomBoolean();
JobStatus jobStatus = randomFrom(JobStatus.values());
String statusReason = randomBoolean() ? randomAsciiOfLength(10) : null;
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, randomPositiveLong(), randomPositiveLong());
return new Allocation(nodeId, jobId, jobStatus, schedulerState);
return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason, schedulerState);
}
@Override
@ -34,4 +36,13 @@ public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
return Allocation.PARSER.apply(parser, () -> matcher).build();
}
public void testUnsetIgnoreDownTime() {
Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null, null);
assertTrue(allocation.isIgnoreDowntime());
Allocation.Builder builder = new Allocation.Builder(allocation);
builder.setStatus(JobStatus.OPENED);
allocation = builder.build();
assertFalse(allocation.isIgnoreDowntime());
}
}

View File

@ -56,33 +56,35 @@ public class JobAllocatorTests extends ESTestCase {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(cs.metaData().custom(PrelertMetadata.TYPE));
pmBuilder.putJob((buildJobBuilder("_job_id").build()), false);
pmBuilder.createAllocation("_job_id", false);
cs = ClusterState.builder(cs).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.build();
assertTrue("A unassigned job, so we should allocate", jobAllocator.shouldAllocate(cs));
pmBuilder = new PrelertMetadata.Builder(cs.metaData().custom(PrelertMetadata.TYPE));
pmBuilder.putAllocation("_node_id", "_job_id");
pmBuilder.assignToNode("_job_id", "_node_id");
cs = ClusterState.builder(cs).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.build();
assertFalse("Job is allocate, so nothing to allocate", jobAllocator.shouldAllocate(cs));
}
public void testAllocateJobs() {
public void testAssignJobsToNodes() {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.masterNodeId("_node_id"))
.build();
ClusterState result1 = jobAllocator.allocateJobs(cs1);
ClusterState result1 = jobAllocator.assignJobsToNodes(cs1);
PrelertMetadata pm = result1.metaData().custom(PrelertMetadata.TYPE);
assertEquals("_job_id must be allocated to _node_id", pm.getAllocations().get("_job_id").getNodeId(), "_node_id");
ClusterState result2 = jobAllocator.allocateJobs(result1);
ClusterState result2 = jobAllocator.assignJobsToNodes(result1);
assertSame("job has been allocated, same instance must be returned", result1, result2);
ClusterState cs2 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
@ -95,13 +97,13 @@ public class JobAllocatorTests extends ESTestCase {
)
.build();
// should fail, prelert only support single node for now
expectThrows(IllegalStateException.class, () -> jobAllocator.allocateJobs(cs2));
expectThrows(IllegalStateException.class, () -> jobAllocator.assignJobsToNodes(cs2));
ClusterState cs3 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.build();
// we need to have at least one node
expectThrows(IllegalStateException.class, () -> jobAllocator.allocateJobs(cs3));
expectThrows(IllegalStateException.class, () -> jobAllocator.assignJobsToNodes(cs3));
pmBuilder = new PrelertMetadata.Builder(result1.getMetaData().custom(PrelertMetadata.TYPE));
pmBuilder.removeJob("_job_id");
@ -111,7 +113,7 @@ public class JobAllocatorTests extends ESTestCase {
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.masterNodeId("_node_id"))
.build();
ClusterState result3 = jobAllocator.allocateJobs(cs4);
ClusterState result3 = jobAllocator.assignJobsToNodes(cs4);
pm = result3.metaData().custom(PrelertMetadata.TYPE);
assertNull("_job_id must be unallocated, because job has been removed", pm.getAllocations().get("_job_id"));
}
@ -152,7 +154,8 @@ public class JobAllocatorTests extends ESTestCase {
// add an allocated job
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_id").build(), false);
pmBuilder.putAllocation("_id", "_id");
pmBuilder.createAllocation("_id", false);
pmBuilder.assignToNode("_id", "_node_id");
cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT))
@ -168,6 +171,7 @@ public class JobAllocatorTests extends ESTestCase {
// make job not allocated
pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT))
@ -194,6 +198,7 @@ public class JobAllocatorTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescriptionBuilder);
pmBuilder.putJob(jobBuilder.build(), false);
pmBuilder.createAllocation("_job_id", false);
ClusterState cs = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
@ -204,7 +209,7 @@ public class JobAllocatorTests extends ESTestCase {
.build();
ClusterState clusterStateWithAllocation = jobAllocator.allocateJobs(cs);
ClusterState clusterStateWithAllocation = jobAllocator.assignJobsToNodes(cs);
PrelertMetadata metadata = clusterStateWithAllocation.metaData().custom(PrelertMetadata.TYPE);
assertEquals(JobSchedulerStatus.STOPPED, metadata.getAllocations().get("_job_id").getSchedulerState().getStatus());
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.metadata;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -19,48 +18,51 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService;
import org.junit.Before;
import org.mockito.Mockito;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class JobLifeCycleServiceTests extends ESTestCase {
private ClusterService clusterService;
private ScheduledJobService scheduledJobService;
private DataProcessor dataProcessor;
private Client client;
private JobLifeCycleService jobLifeCycleService;
@Before
public void instantiateJobAllocator() {
clusterService = Mockito.mock(ClusterService.class);
scheduledJobService = Mockito.mock(ScheduledJobService.class);
dataProcessor = Mockito.mock(DataProcessor.class);
client = Mockito.mock(Client.class);
ClusterService clusterService = mock(ClusterService.class);
ScheduledJobService scheduledJobService = mock(ScheduledJobService.class);
dataProcessor = mock(DataProcessor.class);
client = mock(Client.class);
jobLifeCycleService = new JobLifeCycleService(Settings.EMPTY, client, clusterService, scheduledJobService, dataProcessor,
Runnable::run);
}
public void testStartStop() {
jobLifeCycleService.startJob(buildJobBuilder("_job_id").build());
assertTrue(jobLifeCycleService.localAllocatedJobs.contains("_job_id"));
Allocation.Builder allocation = new Allocation.Builder();
allocation.setJobId("_job_id");
jobLifeCycleService.startJob(allocation.build());
assertTrue(jobLifeCycleService.localAssignedJobs.contains("_job_id"));
verify(dataProcessor).openJob("_job_id", false);
jobLifeCycleService.stopJob("_job_id");
assertTrue(jobLifeCycleService.localAllocatedJobs.isEmpty());
assertTrue(jobLifeCycleService.localAssignedJobs.isEmpty());
verify(dataProcessor).closeJob("_job_id");
}
public void testClusterChanged() {
public void testClusterChanged_startJob() {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.putAllocation("_node_id", "_job_id");
pmBuilder.createAllocation("_job_id", false);
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
@ -68,66 +70,110 @@ public class JobLifeCycleServiceTests extends ESTestCase {
.localNodeId("_node_id"))
.build();
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
assertFalse("not allocated to a node",
jobLifeCycleService.localAssignedJobs.contains("_job_id"));
pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
pmBuilder.updateStatus("_job_id", JobStatus.OPENED, null);
cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.localNodeId("_node_id"))
.build();
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
assertFalse("Status not started",
jobLifeCycleService.localAssignedJobs.contains("_job_id"));
pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
pmBuilder.assignToNode("_job_id", "_node_id");
cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.localNodeId("_node_id"))
.build();
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
assertTrue("Expect allocation, because job allocation says _job_id should be allocated locally",
jobLifeCycleService.localAllocatedJobs.contains("_job_id"));
jobLifeCycleService.localAssignedJobs.contains("_job_id"));
verify(dataProcessor, times(1)).openJob("_job_id", false);
pmBuilder.removeJob("_job_id");
ClusterState cs2 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.localNodeId("_node_id"))
.build();
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs2, cs1));
assertFalse("Expect no allocation, because the job has been removed", jobLifeCycleService.localAllocatedJobs.contains("_job_id"));
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
verify(dataProcessor, times(1)).openJob("_job_id", false);
}
public void testClusterChanged_GivenJobIsPausing() {
public void testClusterChanged_stopJob() {
jobLifeCycleService.localAssignedJobs.add("_job_id");
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
Job.Builder job = buildJobBuilder("foo");
pmBuilder.putJob(job.build(), false);
pmBuilder.putAllocation("_node_id", "foo");
Allocation.Builder allocation = new Allocation.Builder();
allocation.setJobId("foo");
allocation.setNodeId("_node_id");
allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses
allocation.setStatus(JobStatus.PAUSING);
pmBuilder.updateAllocation("foo", allocation.build());
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.localNodeId("_node_id"))
.build();
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
assertEquals("Status is not closing, so nothing happened", jobLifeCycleService.localAssignedJobs.size(), 1);
verify(dataProcessor).closeJob("foo", JobStatus.PAUSED);
pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
pmBuilder.updateStatus("_job_id", JobStatus.OPENED, null);
pmBuilder.updateStatus("_job_id", JobStatus.CLOSING, null);
pmBuilder.assignToNode("_job_id", "_node_id");
cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.localNodeId("_node_id"))
.build();
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
assertEquals(jobLifeCycleService.localAssignedJobs.size(), 0);
verify(dataProcessor, times(1)).closeJob("_job_id");
}
public void testClusterChanged_GivenJobIsPausingAndCloseJobThrows() {
public void testClusterChanged_allocationRemovedStopJob() {
jobLifeCycleService.localAssignedJobs.add("_job_id");
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
Job.Builder job = buildJobBuilder("foo");
pmBuilder.putJob(job.build(), false);
pmBuilder.putAllocation("_node_id", "foo");
Allocation.Builder allocation = new Allocation.Builder();
allocation.setJobId("foo");
allocation.setNodeId("_node_id");
allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses
allocation.setStatus(JobStatus.PAUSING);
pmBuilder.updateAllocation("foo", allocation.build());
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.localNodeId("_node_id"))
.build();
doThrow(new ElasticsearchException("")).when(dataProcessor).closeJob("foo", JobStatus.PAUSED);
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
assertEquals(jobLifeCycleService.localAssignedJobs.size(), 0);
verify(dataProcessor, times(1)).closeJob("_job_id");
}
verify(dataProcessor).closeJob("foo", JobStatus.PAUSED);
UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("foo", JobStatus.FAILED);
public void testStart_openJobFails() {
doThrow(new RuntimeException("error")).when(dataProcessor).openJob("_job_id", false);
Allocation.Builder allocation = new Allocation.Builder();
allocation.setJobId("_job_id");
jobLifeCycleService.startJob(allocation.build());
assertTrue(jobLifeCycleService.localAssignedJobs.contains("_job_id"));
verify(dataProcessor).openJob("_job_id", false);
UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("_job_id", JobStatus.FAILED);
expectedRequest.setReason("failed to open, error");
verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any());
}
public void testStart_closeJobFails() {
jobLifeCycleService.localAssignedJobs.add("_job_id");
doThrow(new RuntimeException("error")).when(dataProcessor).closeJob("_job_id");
jobLifeCycleService.stopJob("_job_id");
assertEquals(jobLifeCycleService.localAssignedJobs.size(), 0);
verify(dataProcessor).closeJob("_job_id");
UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("_job_id", JobStatus.FAILED);
expectedRequest.setReason("failed to close, error");
verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any());
}
}

View File

@ -41,9 +41,12 @@ public class PrelertMetadataTests extends ESTestCase {
builder.putJob(job2, false);
builder.putJob(job3, false);
builder.putAllocation(job1.getId(), "node1");
builder.putAllocation(job2.getId(), "node1");
builder.putAllocation(job3.getId(), "node1");
builder.createAllocation(job1.getId(), false);
builder.assignToNode(job1.getId(), "node1");
builder.createAllocation(job2.getId(), false);
builder.assignToNode(job2.getId(), "node1");
builder.createAllocation(job3.getId(), false);
builder.assignToNode(job3.getId(), "node1");
PrelertMetadata expected = builder.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -65,9 +68,12 @@ public class PrelertMetadataTests extends ESTestCase {
builder.putJob(job2, false);
builder.putJob(job3, false);
builder.putAllocation(job1.getId(), "node1");
builder.putAllocation(job2.getId(), "node1");
builder.putAllocation(job3.getId(), "node1");
builder.createAllocation(job1.getId(), false);
builder.assignToNode(job1.getId(), "node1");
builder.createAllocation(job2.getId(), false);
builder.assignToNode(job2.getId(), "node1");
builder.createAllocation(job3.getId(), false);
builder.assignToNode(job3.getId(), "node1");
PrelertMetadata expected = builder.build();
@ -104,19 +110,15 @@ public class PrelertMetadataTests extends ESTestCase {
public void testUpdateAllocation_setFinishedTime() {
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(buildJobBuilder("_job_id").build(), false);
builder.putAllocation("_node_id", "_job_id");
PrelertMetadata prelertMetadata = builder.build();
builder.createAllocation("_job_id", false);
builder = new PrelertMetadata.Builder(prelertMetadata);
Allocation.Builder allocation = new Allocation.Builder();
allocation.setJobId("_job_id");
allocation.setNodeId("_node_id");
allocation.setStatus(JobStatus.RUNNING);
builder.updateAllocation("_job_id", allocation.build());
assertThat(builder.build().getJobs().get("_job_id").getFinishedTime(), nullValue());
allocation.setStatus(JobStatus.CLOSED);
builder.updateAllocation("_job_id", allocation.build());
assertThat(builder.build().getJobs().get("_job_id").getFinishedTime(), notNullValue());
builder.updateStatus("_job_id", JobStatus.OPENED, null);
PrelertMetadata prelertMetadata = builder.build();
assertThat(prelertMetadata.getJobs().get("_job_id").getFinishedTime(), nullValue());
builder.updateStatus("_job_id", JobStatus.CLOSED, null);
prelertMetadata = builder.build();
assertThat(prelertMetadata.getJobs().get("_job_id").getFinishedTime(), notNullValue());
}
}

View File

@ -92,8 +92,8 @@ public class ScheduledJobServiceTests extends ESTestCase {
public void testStart_GivenNewlyCreatedJobLoopBack() throws IOException {
Job.Builder builder = createScheduledJob();
Allocation allocation =
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTING, 0L, 60000L));
Allocation allocation = new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null,
new SchedulerState(JobSchedulerStatus.STARTING, 0L, 60000L));
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(jobManager.getJobAllocation("foo")).thenReturn(allocation);
@ -115,8 +115,8 @@ public class ScheduledJobServiceTests extends ESTestCase {
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws IOException {
Job.Builder builder = createScheduledJob();
Allocation allocation =
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTING, 0L, null));
Allocation allocation = new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null,
new SchedulerState(JobSchedulerStatus.STARTING, 0L, null));
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(jobManager.getJobAllocation("foo")).thenReturn(allocation);
@ -132,8 +132,8 @@ public class ScheduledJobServiceTests extends ESTestCase {
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any());
allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), allocation.getStatus(),
new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L));
allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), false, allocation.getStatus(),
null, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L));
scheduledJobService.stop(allocation);
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
}
@ -146,7 +146,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
public void testStop_GivenStartedScheduledJob() throws IOException {
Job.Builder builder = createScheduledJob();
Allocation allocation1 =
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTED, 0L, null));
new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, 0L, null));
when(jobManager.getJobAllocation("foo")).thenReturn(allocation1);
DataExtractor dataExtractor = mock(DataExtractor.class);
@ -160,7 +160,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
// Properly stop it to avoid leaking threads in the test
Allocation allocation2 =
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, null));
new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, null));
scheduledJobService.registry.put("foo", scheduledJobService.createJobScheduler(builder.build()));
scheduledJobService.stop(allocation2);

View File

@ -1,5 +1,5 @@
{
"xpack.prelert.close_data": {
"xpack.prelert.close_job": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/prelert/data/{job_id}/_close",
@ -9,6 +9,10 @@
"type": "string",
"required": true,
"description": "The name of the job to close"
},
"close_timeout": {
"type": "time",
"description": "Controls the time to wait until a job has closed. Default to 30 minutes"
}
}
}

View File

@ -0,0 +1,25 @@
{
"xpack.prelert.open_job": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/prelert/data/{job_id}/_open",
"paths": [ "/_xpack/prelert/data/{job_id}/_open" ],
"parts": {
"job_id": {
"type": "string",
"required": true,
"description": "The ID of the job to open"
},
"ignore_downtime": {
"type": "boolean",
"description": "Controls if gaps in data are treated as anomalous or as a maintenance window after a job re-start"
},
"open_timeout": {
"type": "time",
"description": "Controls the time to wait until a job has opened. Default to 30 minutes"
}
}
},
"body": null
}
}

View File

@ -1,17 +0,0 @@
{
"xpack.prelert.pause_job": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/prelert/jobs/{job_id}/_pause",
"paths": [ "/_xpack/prelert/jobs/{job_id}/_pause" ],
"parts": {
"job_id": {
"type": "string",
"required": true,
"description": "The ID of the job to pause"
}
}
},
"body": null
}
}

View File

@ -12,10 +12,6 @@
}
},
"params": {
"ignore_downtime": {
"type": "boolean",
"description": "Controls if gaps in data are treated as anomalous or as a maintenance window after a job re-start"
},
"reset_start": {
"type": "string",
"description": "Optional parameter to specify the start of the bucket resetting range"

View File

@ -1,17 +0,0 @@
{
"xpack.prelert.resume_job": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/prelert/jobs/{job_id}/_resume",
"paths": [ "/_xpack/prelert/jobs/{job_id}/_resume" ],
"parts": {
"job_id": {
"type": "string",
"required": true,
"description": "The ID of the job to resume"
}
}
},
"body": null
}
}

View File

@ -16,6 +16,14 @@ setup:
}
}
- do:
xpack.prelert.open_job:
job_id: foo
- do:
xpack.prelert.close_job:
job_id: foo
- do:
index:
index: prelertresults-foo

View File

@ -16,6 +16,10 @@ setup:
}
}
- do:
xpack.prelert.open_job:
job_id: job-stats-test
- do:
xpack.prelert.put_job:
body: >
@ -36,6 +40,9 @@ setup:
"types":["response"]
}
}
- do:
xpack.prelert.open_job:
job_id: scheduled-job
- do:
index:
@ -119,7 +126,7 @@ setup:
- is_false: jobs.0.data_counts
- is_false: jobs.0.model_size_stats
- is_false: jobs.0.scheduler_state
- match: { jobs.0.status: RUNNING }
- match: { jobs.0.status: OPENED }
- do:
xpack.prelert.get_jobs:
@ -129,7 +136,7 @@ setup:
- is_true: jobs.0.data_counts
- is_false: jobs.0.scheduler_state
- match: { jobs.0.job_id: job-stats-test }
- match: { jobs.0.status: RUNNING }
- match: { jobs.0.status: OPENED }
- do:
xpack.prelert.get_jobs:

View File

@ -16,6 +16,10 @@ setup:
}
}
- do:
xpack.prelert.open_job:
job_id: farequote
---
"Test POST data job api, flush, close and verify DataCounts doc":
- do:
@ -41,7 +45,7 @@ setup:
- match: { acknowledged: true }
- do:
xpack.prelert.close_data:
xpack.prelert.close_job:
job_id: farequote
- match: { acknowledged: true }

View File

@ -15,6 +15,15 @@ setup:
"time_format":"yyyy-MM-dd HH:mm:ssX"
}
}
- do:
xpack.prelert.open_job:
job_id: foo
- do:
xpack.prelert.close_job:
job_id: foo
- do:
index:
index: prelertresults-foo