From 570cde7a6ade9790aa32e79f1c1d71ff042f9a4f Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 5 Dec 2016 08:29:57 +0100 Subject: [PATCH] Added open and close job APIs. * A job now has the following statuses: OPENING, OPENED, CLOSING, CLOSED and FAILED. * The open job and close job APIs wait until the job gets into a OPENED or CLOSED state. * The post data api no longer lazily opens a job and fails if the job has not been opened. * When a job gets into a failed state also the reason is recorded in the allocation. * Removed pause and resume APIs. * Made `max_running_jobs` setting dynamically updatedable. Original commit: elastic/x-pack-elasticsearch@3485ec5317f2d9c621f9b8dbf2675455c6ae6f91 --- .../xpack/prelert/PrelertPlugin.java | 25 +- ...taCloseAction.java => CloseJobAction.java} | 29 +- .../xpack/prelert/action/OpenJobAction.java | 279 ++++++++++++++++++ .../xpack/prelert/action/PauseJobAction.java | 169 ----------- .../xpack/prelert/action/ResumeJobAction.java | 169 ----------- .../prelert/action/UpdateJobStatusAction.java | 11 + .../elasticsearch/xpack/prelert/job/Job.java | 2 +- .../xpack/prelert/job/JobStatus.java | 2 +- .../xpack/prelert/job/SchedulerState.java | 2 +- .../xpack/prelert/job/data/DataProcessor.java | 8 +- .../job/manager/AutodetectProcessManager.java | 108 ++++--- .../xpack/prelert/job/manager/JobManager.java | 74 ++--- .../prelert/job/metadata/Allocation.java | 93 ++++-- .../prelert/job/metadata/JobAllocator.java | 20 +- .../job/metadata/JobLifeCycleService.java | 97 +++--- .../prelert/job/metadata/PrelertMetadata.java | 77 +++-- .../output/AutodetectResultsParser.java | 2 +- .../rest/data/RestPostDataCloseAction.java | 41 --- ...JobAction.java => RestCloseJobAction.java} | 22 +- ...eJobAction.java => RestOpenJobAction.java} | 21 +- ...s.java => CloseJobActionRequestTests.java} | 4 +- ...ts.java => OpenJobActionRequestTests.java} | 4 +- .../prelert/action/ResumeJobRequestTests.java | 22 -- .../xpack/prelert/action/ScheduledJobsIT.java | 8 +- .../prelert/integration/PrelertJobIT.java | 68 ----- .../prelert/integration/ScheduledJobIT.java | 15 +- .../prelert/integration/TooManyJobsIT.java | 73 ++--- .../xpack/prelert/job/JobStatusTests.java | 33 +-- .../AutodetectProcessManagerTests.java | 136 +++++++-- .../prelert/job/manager/JobManagerTests.java | 17 +- .../prelert/job/metadata/AllocationTests.java | 13 +- .../job/metadata/JobAllocatorTests.java | 23 +- .../metadata/JobLifeCycleServiceTests.java | 146 +++++---- .../job/metadata/PrelertMetadataTests.java | 38 +-- .../scheduler/ScheduledJobServiceTests.java | 16 +- ...data.json => xpack.prelert.close_job.json} | 6 +- .../api/xpack.prelert.open_job.json | 25 ++ .../api/xpack.prelert.pause_job.json | 17 -- .../api/xpack.prelert.post_data.json | 4 - .../api/xpack.prelert.resume_job.json | 17 -- .../test/delete_model_snapshot.yaml | 8 + .../rest-api-spec/test/jobs_get_stats.yaml | 11 +- .../rest-api-spec/test/post_data.yaml | 6 +- .../test/revert_model_snapshot.yaml | 9 + 44 files changed, 993 insertions(+), 977 deletions(-) rename elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/{PostDataCloseAction.java => CloseJobAction.java} (88%) create mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/OpenJobAction.java delete mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PauseJobAction.java delete mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/ResumeJobAction.java delete mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/data/RestPostDataCloseAction.java rename elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/{RestPauseJobAction.java => RestCloseJobAction.java} (55%) rename elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/{RestResumeJobAction.java => RestOpenJobAction.java} (52%) rename elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/{PauseJobRequestTests.java => CloseJobActionRequestTests.java} (80%) rename elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/{PostDataCloseRequestTests.java => OpenJobActionRequestTests.java} (81%) delete mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ResumeJobRequestTests.java rename elasticsearch/src/test/resources/rest-api-spec/api/{xpack.prelert.close_data.json => xpack.prelert.close_job.json} (63%) create mode 100644 elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.open_job.json delete mode 100644 elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.pause_job.json delete mode 100644 elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.resume_job.json diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 87e94f6c753..3b59b9c62ac 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -38,14 +38,13 @@ import org.elasticsearch.xpack.prelert.action.GetJobsAction; import org.elasticsearch.xpack.prelert.action.GetListAction; import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.prelert.action.GetRecordsAction; -import org.elasticsearch.xpack.prelert.action.PauseJobAction; import org.elasticsearch.xpack.prelert.action.PostDataAction; -import org.elasticsearch.xpack.prelert.action.PostDataCloseAction; +import org.elasticsearch.xpack.prelert.action.CloseJobAction; import org.elasticsearch.xpack.prelert.action.PostDataFlushAction; import org.elasticsearch.xpack.prelert.action.PutJobAction; import org.elasticsearch.xpack.prelert.action.PutListAction; import org.elasticsearch.xpack.prelert.action.PutModelSnapshotDescriptionAction; -import org.elasticsearch.xpack.prelert.action.ResumeJobAction; +import org.elasticsearch.xpack.prelert.action.OpenJobAction; import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.action.StopJobSchedulerAction; @@ -76,14 +75,13 @@ import org.elasticsearch.xpack.prelert.job.scheduler.http.HttpDataExtractorFacto import org.elasticsearch.xpack.prelert.job.status.StatusReporter; import org.elasticsearch.xpack.prelert.job.usage.UsageReporter; import org.elasticsearch.xpack.prelert.rest.data.RestPostDataAction; -import org.elasticsearch.xpack.prelert.rest.data.RestPostDataCloseAction; +import org.elasticsearch.xpack.prelert.rest.job.RestCloseJobAction; import org.elasticsearch.xpack.prelert.rest.data.RestPostDataFlushAction; import org.elasticsearch.xpack.prelert.rest.results.RestGetInfluencersAction; import org.elasticsearch.xpack.prelert.rest.job.RestDeleteJobAction; import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsAction; -import org.elasticsearch.xpack.prelert.rest.job.RestPauseJobAction; import org.elasticsearch.xpack.prelert.rest.job.RestPutJobsAction; -import org.elasticsearch.xpack.prelert.rest.job.RestResumeJobAction; +import org.elasticsearch.xpack.prelert.rest.job.RestOpenJobAction; import org.elasticsearch.xpack.prelert.rest.list.RestGetListAction; import org.elasticsearch.xpack.prelert.rest.list.RestPutListAction; import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestDeleteModelSnapshotAction; @@ -142,7 +140,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING, StatusReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, StatusReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING, - UsageReporter.UPDATE_INTERVAL_SETTING)); + UsageReporter.UPDATE_INTERVAL_SETTING, + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE)); } @Override @@ -175,7 +174,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { } AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, - jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory); + jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings()); ScheduledJobService scheduledJobService = new ScheduledJobService(threadPool, client, jobProvider, dataProcessor, // norelease: we will no longer need to pass the client here after we switch to a client based data extractor new HttpDataExtractorFactory(client), @@ -198,15 +197,14 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { RestGetJobsAction.class, RestPutJobsAction.class, RestDeleteJobAction.class, - RestPauseJobAction.class, - RestResumeJobAction.class, + RestOpenJobAction.class, RestGetListAction.class, RestPutListAction.class, RestGetInfluencersAction.class, RestGetRecordsAction.class, RestGetBucketsAction.class, RestPostDataAction.class, - RestPostDataCloseAction.class, + RestCloseJobAction.class, RestPostDataFlushAction.class, RestValidateDetectorAction.class, RestValidateTransformAction.class, @@ -227,8 +225,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(GetJobsAction.INSTANCE, GetJobsAction.TransportAction.class), new ActionHandler<>(PutJobAction.INSTANCE, PutJobAction.TransportAction.class), new ActionHandler<>(DeleteJobAction.INSTANCE, DeleteJobAction.TransportAction.class), - new ActionHandler<>(PauseJobAction.INSTANCE, PauseJobAction.TransportAction.class), - new ActionHandler<>(ResumeJobAction.INSTANCE, ResumeJobAction.TransportAction.class), + new ActionHandler<>(OpenJobAction.INSTANCE, OpenJobAction.TransportAction.class), new ActionHandler<>(UpdateJobStatusAction.INSTANCE, UpdateJobStatusAction.TransportAction.class), new ActionHandler<>(UpdateJobSchedulerStatusAction.INSTANCE, UpdateJobSchedulerStatusAction.TransportAction.class), new ActionHandler<>(GetListAction.INSTANCE, GetListAction.TransportAction.class), @@ -237,7 +234,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(GetInfluencersAction.INSTANCE, GetInfluencersAction.TransportAction.class), new ActionHandler<>(GetRecordsAction.INSTANCE, GetRecordsAction.TransportAction.class), new ActionHandler<>(PostDataAction.INSTANCE, PostDataAction.TransportAction.class), - new ActionHandler<>(PostDataCloseAction.INSTANCE, PostDataCloseAction.TransportAction.class), + new ActionHandler<>(CloseJobAction.INSTANCE, CloseJobAction.TransportAction.class), new ActionHandler<>(PostDataFlushAction.INSTANCE, PostDataFlushAction.TransportAction.class), new ActionHandler<>(ValidateDetectorAction.INSTANCE, ValidateDetectorAction.TransportAction.class), new ActionHandler<>(ValidateTransformAction.INSTANCE, ValidateTransformAction.TransportAction.class), diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PostDataCloseAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/CloseJobAction.java similarity index 88% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PostDataCloseAction.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/CloseJobAction.java index 6795cff036b..0426af06210 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PostDataCloseAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/CloseJobAction.java @@ -38,13 +38,12 @@ import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; -public class PostDataCloseAction extends Action { +public class CloseJobAction extends Action { - public static final PostDataCloseAction INSTANCE = new PostDataCloseAction(); - public static final String NAME = "cluster:admin/prelert/data/post/close"; + public static final CloseJobAction INSTANCE = new CloseJobAction(); + public static final String NAME = "cluster:admin/prelert/job/close"; - private PostDataCloseAction() { + private CloseJobAction() { super(NAME); } @@ -61,6 +60,7 @@ public class PostDataCloseAction extends Action { private String jobId; + private TimeValue closeTimeout = TimeValue.timeValueMinutes(30); Request() {} @@ -72,6 +72,14 @@ public class PostDataCloseAction extends Action { - public RequestBuilder(ElasticsearchClient client, PostDataCloseAction action) { + public RequestBuilder(ElasticsearchClient client, CloseJobAction action) { super(client, action, new Request()); } } @@ -144,7 +155,7 @@ public class PostDataCloseAction extends Action { + + public static final OpenJobAction INSTANCE = new OpenJobAction(); + public static final String NAME = "cluster:admin/prelert/job/open"; + + private OpenJobAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends AcknowledgedRequest { + + private String jobId; + private boolean ignoreDowntime; + private TimeValue openTimeout = TimeValue.timeValueMinutes(30); + + public Request(String jobId) { + this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); + } + + Request() {} + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public boolean isIgnoreDowntime() { + return ignoreDowntime; + } + + public void setIgnoreDowntime(boolean ignoreDowntime) { + this.ignoreDowntime = ignoreDowntime; + } + + public TimeValue getOpenTimeout() { + return openTimeout; + } + + public void setOpenTimeout(TimeValue openTimeout) { + this.openTimeout = openTimeout; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + jobId = in.readString(); + ignoreDowntime = in.readBoolean(); + openTimeout = new TimeValue(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(jobId); + out.writeBoolean(ignoreDowntime); + openTimeout.writeTo(out); + } + + @Override + public int hashCode() { + return Objects.hash(jobId, ignoreDowntime, openTimeout); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || obj.getClass() != getClass()) { + return false; + } + OpenJobAction.Request other = (OpenJobAction.Request) obj; + return Objects.equals(jobId, other.jobId) && + Objects.equals(ignoreDowntime, other.ignoreDowntime) && + Objects.equals(openTimeout, other.openTimeout); + } + } + + static class RequestBuilder extends MasterNodeOperationRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, OpenJobAction action) { + super(client, action, new Request()); + } + } + + public static class Response extends AcknowledgedResponse { + + public Response(boolean acknowledged) { + super(acknowledged); + } + + private Response() {} + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + private final JobManager jobManager; + + @Inject + public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + JobManager jobManager) { + super(settings, OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, Request::new); + this.jobManager = jobManager; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + ActionListener delegateListener = new ActionListener() { + + @Override + public void onResponse(Response response) { + respondWhenJobIsOpened(request, listener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }; + jobManager.openJob(request, delegateListener); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + private void respondWhenJobIsOpened(Request request, ActionListener listener) { + ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext()); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + String jobId = request.getJobId(); + PrelertMetadata metadata = state.getMetaData().custom(PrelertMetadata.TYPE); + if (metadata != null) { + Allocation allocation = metadata.getAllocations().get(jobId); + if (allocation != null) { + if (allocation.getStatus() == JobStatus.OPENED) { + listener.onResponse(new Response(true)); + } else { + String message = "[" + jobId + "] expected job status [" + JobStatus.OPENED + "], but got [" + + allocation.getStatus() + "], reason [" + allocation.getStatusReason() + "]"; + listener.onFailure(new ElasticsearchStatusException(message, RestStatus.CONFLICT)); + } + } + } + listener.onFailure(new IllegalStateException("no allocation for job [" + jobId + "]")); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new IllegalStateException("Cluster service closed while waiting for job [" + request + + "] status to change to [" + JobStatus.OPENED + "]")); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new IllegalStateException( + "Timeout expired while waiting for job [" + request + "] status to change to [" + JobStatus.OPENED + "]")); + } + }, new JobOpenedChangePredicate(request.getJobId()), request.openTimeout); + } + + private class JobOpenedChangePredicate implements ClusterStateObserver.ChangePredicate { + + private final String jobId; + + JobOpenedChangePredicate(String jobId) { + this.jobId = jobId; + } + + @Override + public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState, + ClusterState.ClusterStateStatus newStatus) { + return apply(newState); + } + + @Override + public boolean apply(ClusterChangedEvent changedEvent) { + return apply(changedEvent.state()); + } + + boolean apply(ClusterState newState) { + PrelertMetadata metadata = newState.getMetaData().custom(PrelertMetadata.TYPE); + if (metadata != null) { + Allocation allocation = metadata.getAllocations().get(jobId); + if (allocation != null) { + return allocation.getStatus().isAnyOf(JobStatus.OPENED, JobStatus.FAILED); + } + } + return false; + } + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PauseJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PauseJobAction.java deleted file mode 100644 index 74ebfa24304..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PauseJobAction.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.manager.JobManager; -import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; - -import java.io.IOException; -import java.util.Objects; - -public class PauseJobAction extends Action { - - public static final PauseJobAction INSTANCE = new PauseJobAction(); - public static final String NAME = "cluster:admin/prelert/job/pause"; - - private PauseJobAction() { - super(NAME); - } - - @Override - public RequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new RequestBuilder(client, this); - } - - @Override - public Response newResponse() { - return new Response(); - } - - public static class Request extends AcknowledgedRequest { - - private String jobId; - - public Request(String jobId) { - this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); - } - - Request() {} - - public String getJobId() { - return jobId; - } - - public void setJobId(String jobId) { - this.jobId = jobId; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - jobId = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(jobId); - } - - @Override - public int hashCode() { - return Objects.hash(jobId); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || obj.getClass() != getClass()) { - return false; - } - Request other = (Request) obj; - return Objects.equals(jobId, other.jobId); - } - } - - static class RequestBuilder extends MasterNodeOperationRequestBuilder { - - public RequestBuilder(ElasticsearchClient client, PauseJobAction action) { - super(client, action, new Request()); - } - } - - public static class Response extends AcknowledgedResponse { - - public Response(boolean acknowledged) { - super(acknowledged); - } - - private Response() {} - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - readAcknowledged(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - writeAcknowledged(out); - } - } - - public static class TransportAction extends TransportMasterNodeAction { - - private final JobManager jobManager; - - @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager) { - super(settings, PauseJobAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, Request::new); - this.jobManager = jobManager; - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { - logger.info("Pausing job " + request.getJobId()); - jobManager.pauseJob(request, listener); - } - - @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); - } - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/ResumeJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/ResumeJobAction.java deleted file mode 100644 index a40a878dd89..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/ResumeJobAction.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.manager.JobManager; -import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; - -import java.io.IOException; -import java.util.Objects; - -public class ResumeJobAction extends Action { - - public static final ResumeJobAction INSTANCE = new ResumeJobAction(); - public static final String NAME = "cluster:admin/prelert/job/resume"; - - private ResumeJobAction() { - super(NAME); - } - - @Override - public RequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new RequestBuilder(client, this); - } - - @Override - public Response newResponse() { - return new Response(); - } - - public static class Request extends AcknowledgedRequest { - - private String jobId; - - public Request(String jobId) { - this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); - } - - Request() {} - - public String getJobId() { - return jobId; - } - - public void setJobId(String jobId) { - this.jobId = jobId; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - jobId = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(jobId); - } - - @Override - public int hashCode() { - return Objects.hash(jobId); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || obj.getClass() != getClass()) { - return false; - } - ResumeJobAction.Request other = (ResumeJobAction.Request) obj; - return Objects.equals(jobId, other.jobId); - } - } - - static class RequestBuilder extends MasterNodeOperationRequestBuilder { - - public RequestBuilder(ElasticsearchClient client, ResumeJobAction action) { - super(client, action, new Request()); - } - } - - public static class Response extends AcknowledgedResponse { - - public Response(boolean acknowledged) { - super(acknowledged); - } - - private Response() {} - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - readAcknowledged(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - writeAcknowledged(out); - } - } - - public static class TransportAction extends TransportMasterNodeAction { - - private final JobManager jobManager; - - @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager) { - super(settings, ResumeJobAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, Request::new); - this.jobManager = jobManager; - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { - logger.info("Resuming job " + request.getJobId()); - jobManager.resumeJob(request, listener); - } - - @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); - } - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobStatusAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobStatusAction.java index 7ac9cb73364..07480066da6 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobStatusAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobStatusAction.java @@ -58,6 +58,7 @@ public class UpdateJobStatusAction private String jobId; private JobStatus status; + private String reason; public Request(String jobId, JobStatus status) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -82,6 +83,14 @@ public class UpdateJobStatusAction this.status = status; } + public String getReason() { + return reason; + } + + public void setReason(String reason) { + this.reason = reason; + } + @Override public ActionRequestValidationException validate() { return null; @@ -92,6 +101,7 @@ public class UpdateJobStatusAction super.readFrom(in); jobId = in.readString(); status = JobStatus.fromStream(in); + reason = in.readOptionalString(); } @Override @@ -99,6 +109,7 @@ public class UpdateJobStatusAction super.writeTo(out); out.writeString(jobId); status.writeTo(out); + out.writeOptionalString(reason); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/Job.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/Job.java index 19a1777ea9f..6f1f30f5136 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/Job.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/Job.java @@ -39,7 +39,7 @@ import java.util.regex.Pattern; /** * This class represents a configured and created Job. The creation time is set * to the time the object was constructed, Status is set to - * {@link JobStatus#RUNNING} and the finished time and last data time fields are + * {@link JobStatus#OPENING} and the finished time and last data time fields are * {@code null} until the job has seen some data or it is finished respectively. * If the job was created to read data from a list of files FileUrls will be a * non-empty list else the expects data to be streamed to it. diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobStatus.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobStatus.java index 0de9c984e58..7874d952250 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobStatus.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobStatus.java @@ -20,7 +20,7 @@ import java.util.Locale; */ public enum JobStatus implements Writeable { - RUNNING, CLOSING, CLOSED, FAILED, PAUSING, PAUSED; + CLOSING, CLOSED, OPENING, OPENED, FAILED; public static JobStatus fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java index 82c2f18dc08..2f7f438d910 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java @@ -29,7 +29,7 @@ public class SchedulerState extends ToXContentToBytes implements Writeable { public static final ParseField END_TIME_MILLIS = new ParseField("end"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - TYPE_FIELD.getPreferredName(), a -> new SchedulerState((JobSchedulerStatus) a[0], (long) a[1], (Long) a[2])); + TYPE_FIELD.getPreferredName(), a -> new SchedulerState((JobSchedulerStatus) a[0], (Long) a[1], (Long) a[2])); static { PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> JobSchedulerStatus.fromString(p.text()), STATUS, diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataProcessor.java index f0d8d5ade81..57d5dd08e2a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataProcessor.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.prelert.job.data; import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams; @@ -47,11 +46,12 @@ public interface DataProcessor { */ void flushJob(String jobId, InterimResultsParams interimResultsParams); + void openJob(String jobId, boolean ignoreDowntime); + /** * Stop the running job and mark it as finished.
+ * @param jobId The job to stop * - * @param jobId The job to stop - * @param nextStatus The final status to set when analytical process has stopped */ - void closeJob(String jobId, JobStatus nextStatus); + void closeJob(String jobId); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java index ce95d6028f7..183ba64fa41 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestStatus; @@ -53,12 +54,11 @@ import java.util.function.Supplier; public class AutodetectProcessManager extends AbstractComponent implements DataProcessor { - // TODO (norelease) to be reconsidered + // TODO (norelease) default needs to be reconsidered public static final Setting MAX_RUNNING_JOBS_PER_NODE = - Setting.intSetting("max_running_jobs", 10, Setting.Property.NodeScope); + Setting.intSetting("max_running_jobs", 10, 1, 128, Setting.Property.NodeScope, Setting.Property.Dynamic); private final Client client; - private final int maxRunningJobs; private final ThreadPool threadPool; private final JobManager jobManager; private final JobProvider jobProvider; @@ -72,14 +72,16 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP private final ConcurrentMap autoDetectCommunicatorByJob; + private volatile int maxAllowedRunningJobs; + public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, - JobProvider jobProvider, JobResultsPersister jobResultsPersister, - JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser, - AutodetectProcessFactory autodetectProcessFactory) { + JobProvider jobProvider, JobResultsPersister jobResultsPersister, + JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser, + AutodetectProcessFactory autodetectProcessFactory, ClusterSettings clusterSettings) { super(settings); this.client = client; this.threadPool = threadPool; - this.maxRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings); + this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings); this.parser = parser; this.autodetectProcessFactory = autodetectProcessFactory; this.jobManager = jobManager; @@ -91,20 +93,21 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP this.jobDataCountsPersister = jobDataCountsPersister; this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); + clusterSettings.addSettingsUpdateConsumer(MAX_RUNNING_JOBS_PER_NODE, val -> maxAllowedRunningJobs = val); } @Override public DataCounts processData(String jobId, InputStream input, DataLoadParams params, Supplier cancelled) { Allocation allocation = jobManager.getJobAllocation(jobId); - if (allocation.getStatus().isAnyOf(JobStatus.PAUSING, JobStatus.PAUSED)) { - return new DataCounts(jobId); + if (allocation.getStatus() != JobStatus.OPENED) { + throw new IllegalArgumentException("job [" + jobId + "] status is [" + allocation.getStatus() + "], but must be [" + + JobStatus.OPENED + "] for processing data"); } - AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> { - AutodetectCommunicator c = create(id, params.isIgnoreDowntime()); - setJobStatus(jobId, JobStatus.RUNNING); - return c; - }); + AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); + if (communicator == null) { + throw new IllegalStateException("job [" + jobId + "] with status [" + allocation.getStatus() + "] hasn't been started"); + } try { return communicator.writeToJob(input, params, cancelled); // TODO check for errors from autodetect @@ -120,10 +123,47 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP } } + @Override + public void flushJob(String jobId, InterimResultsParams params) { + logger.debug("Flushing job {}", jobId); + AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); + if (communicator == null) { + logger.debug("Cannot flush: no active autodetect process for job {}", jobId); + return; + } + try { + communicator.flushJob(params); + // TODO check for errors from autodetect + } catch (IOException ioe) { + String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId); + logger.warn(msg); + throw ExceptionsHelper.serverError(msg, ioe); + } + } + + public void writeUpdateConfigMessage(String jobId, String config) throws IOException { + AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); + if (communicator == null) { + logger.debug("Cannot update config: no active autodetect process for job {}", jobId); + return; + } + communicator.writeUpdateConfigMessage(config); + // TODO check for errors from autodetect + } + + @Override + public void openJob(String jobId, boolean ignoreDowntime) { + autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> { + AutodetectCommunicator communicator = create(id, ignoreDowntime); + setJobStatus(jobId, JobStatus.OPENED); + return communicator; + }); + } + AutodetectCommunicator create(String jobId, boolean ignoreDowntime) { - if (autoDetectCommunicatorByJob.size() == maxRunningJobs) { - throw new ElasticsearchStatusException("max running job capacity [" + maxRunningJobs + "] reached", - RestStatus.FORBIDDEN); + if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) { + throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", + RestStatus.CONFLICT); } // TODO norelease, once we remove black hole process @@ -154,35 +194,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP } @Override - public void flushJob(String jobId, InterimResultsParams params) { - logger.debug("Flushing job {}", jobId); - AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); - if (communicator == null) { - logger.debug("Cannot flush: no active autodetect process for job {}", jobId); - return; - } - try { - communicator.flushJob(params); - // TODO check for errors from autodetect - } catch (IOException ioe) { - String msg = String.format(Locale.ROOT, "Exception flushing process for job %s", jobId); - logger.warn(msg); - throw ExceptionsHelper.serverError(msg, ioe); - } - } - - public void writeUpdateConfigMessage(String jobId, String config) throws IOException { - AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); - if (communicator == null) { - logger.debug("Cannot update config: no active autodetect process for job {}", jobId); - return; - } - communicator.writeUpdateConfigMessage(config); - // TODO check for errors from autodetect - } - - @Override - public void closeJob(String jobId, JobStatus nextStatus) { + public void closeJob(String jobId) { logger.debug("Closing job {}", jobId); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobId); if (communicator == null) { @@ -192,14 +204,14 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP try { communicator.close(); - setJobStatus(jobId, nextStatus); + setJobStatus(jobId, JobStatus.CLOSED); } catch (Exception e) { logger.warn("Exception closing stopped process input stream", e); throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e); } } - int numberOfRunningJobs() { + int numberOfOpenJobs() { return autoDetectCommunicatorByJob.size(); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java index da8a8dffa66..aea17699bae 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java @@ -16,9 +16,8 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.prelert.action.DeleteJobAction; -import org.elasticsearch.xpack.prelert.action.PauseJobAction; import org.elasticsearch.xpack.prelert.action.PutJobAction; -import org.elasticsearch.xpack.prelert.action.ResumeJobAction; +import org.elasticsearch.xpack.prelert.action.OpenJobAction; import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.action.StopJobSchedulerAction; @@ -284,7 +283,7 @@ public class JobManager extends AbstractComponent { if (schedulerState != null && schedulerState.getStatus() != JobSchedulerStatus.STOPPED) { throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId)); } - if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.PAUSED, JobStatus.FAILED)) { + if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) { throw ExceptionsHelper.conflictStatusException(Messages.getMessage( Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, allocation.getStatus())); } @@ -446,52 +445,21 @@ public class JobManager extends AbstractComponent { }); } - public void pauseJob(PauseJobAction.Request request, ActionListener actionListener) { - clusterService.submitStateUpdateTask("pause-job-" + request.getJobId(), - new AckedClusterStateUpdateTask(request, actionListener) { - - @Override - protected PauseJobAction.Response newResponse(boolean acknowledged) { - return new PauseJobAction.Response(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Job job = getJobOrThrowIfUnknown(currentState, request.getJobId()); - Allocation allocation = getAllocation(currentState, job.getId()); - checkJobIsNotScheduled(job); - if (!allocation.getStatus().isAnyOf(JobStatus.RUNNING, JobStatus.CLOSED)) { - throw ExceptionsHelper.conflictStatusException( - Messages.getMessage(Messages.JOB_CANNOT_PAUSE, job.getId(), allocation.getStatus())); - } - - ClusterState newState = innerSetJobStatus(job.getId(), JobStatus.PAUSING, currentState); - Job.Builder jobBuilder = new Job.Builder(job); - jobBuilder.setIgnoreDowntime(IgnoreDowntime.ONCE); - return innerPutJob(jobBuilder.build(), true, newState); - } - }); - } - - public void resumeJob(ResumeJobAction.Request request, ActionListener actionListener) { - clusterService.submitStateUpdateTask("resume-job-" + request.getJobId(), - new AckedClusterStateUpdateTask(request, actionListener) { + public void openJob(OpenJobAction.Request request, ActionListener actionListener) { + clusterService.submitStateUpdateTask("open-job-" + request.getJobId(), + new AckedClusterStateUpdateTask(request, actionListener) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - getJobOrThrowIfUnknown(request.getJobId()); - Allocation allocation = getJobAllocation(request.getJobId()); - if (allocation.getStatus() != JobStatus.PAUSED) { - throw ExceptionsHelper.conflictStatusException( - Messages.getMessage(Messages.JOB_CANNOT_RESUME, request.getJobId(), allocation.getStatus())); - } - Allocation.Builder builder = new Allocation.Builder(allocation); - builder.setStatus(JobStatus.CLOSED); - return innerUpdateAllocation(builder.build(), currentState); + PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentState.metaData().custom(PrelertMetadata.TYPE)); + builder.createAllocation(request.getJobId(), request.isIgnoreDowntime()); + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.metaData()).putCustom(PrelertMetadata.TYPE, builder.build())) + .build(); } @Override - protected ResumeJobAction.Response newResponse(boolean acknowledged) { - return new ResumeJobAction.Response(acknowledged); + protected OpenJobAction.Response newResponse(boolean acknowledged) { + return new OpenJobAction.Response(acknowledged); } }); } @@ -506,7 +474,11 @@ public class JobManager extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return innerSetJobStatus(request.getJobId(), request.getStatus(), currentState); + PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentState.metaData().custom(PrelertMetadata.TYPE)); + builder.updateStatus(request.getJobId(), request.getStatus(), request.getReason()); + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.metaData()).putCustom(PrelertMetadata.TYPE, builder.build())) + .build(); } @Override @@ -542,16 +514,4 @@ public class JobManager extends AbstractComponent { jobResultsPersister.commitWrites(jobId); } - private ClusterState innerSetJobStatus(String jobId, JobStatus newStatus, ClusterState currentState) { - Allocation allocation = getJobAllocation(jobId); - Allocation.Builder builder = new Allocation.Builder(allocation); - builder.setStatus(newStatus); - return innerUpdateAllocation(builder.build(), currentState); - } - - private void checkJobIsNotScheduled(Job job) { - if (job.getSchedulerConfig() != null) { - throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_ACTION_NOT_ALLOWED_FOR_SCHEDULED_JOB)); - } - } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java index ec3bdc9b8a0..223609f3146 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java @@ -27,36 +27,47 @@ public class Allocation extends AbstractDiffable implements ToXConte private static final ParseField NODE_ID_FIELD = new ParseField("node_id"); private static final ParseField JOB_ID_FIELD = new ParseField("job_id"); + private static final ParseField IGNORE_DOWNTIME_FIELD = new ParseField("ignore_downtime"); public static final ParseField STATUS = new ParseField("status"); + public static final ParseField STATUS_REASON = new ParseField("status_reason"); public static final ParseField SCHEDULER_STATE = new ParseField("scheduler_state"); - static final Allocation PROTO = new Allocation(null, null, null, null); + static final Allocation PROTO = new Allocation(null, null, false, null, null, null); static final ObjectParser PARSER = new ObjectParser<>("allocation", Builder::new); static { PARSER.declareString(Builder::setNodeId, NODE_ID_FIELD); PARSER.declareString(Builder::setJobId, JOB_ID_FIELD); + PARSER.declareBoolean(Builder::setIgnoreDowntime, IGNORE_DOWNTIME_FIELD); PARSER.declareField(Builder::setStatus, (p, c) -> JobStatus.fromString(p.text()), STATUS, ObjectParser.ValueType.STRING); + PARSER.declareString(Builder::setStatusReason, STATUS_REASON); PARSER.declareObject(Builder::setSchedulerState, SchedulerState.PARSER, SCHEDULER_STATE); } private final String nodeId; private final String jobId; + private final boolean ignoreDowntime; private final JobStatus status; + private final String statusReason; private final SchedulerState schedulerState; - public Allocation(String nodeId, String jobId, JobStatus status, SchedulerState schedulerState) { + public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason, + SchedulerState schedulerState) { this.nodeId = nodeId; this.jobId = jobId; + this.ignoreDowntime = ignoreDowntime; this.status = status; + this.statusReason = statusReason; this.schedulerState = schedulerState; } public Allocation(StreamInput in) throws IOException { this.nodeId = in.readString(); this.jobId = in.readString(); + this.ignoreDowntime = in.readBoolean(); this.status = JobStatus.fromStream(in); + this.statusReason = in.readOptionalString(); this.schedulerState = in.readOptionalWriteable(SchedulerState::new); } @@ -68,10 +79,23 @@ public class Allocation extends AbstractDiffable implements ToXConte return jobId; } + /** + * @return Whether to ignore downtime at startup. + * + * When the job status is set to STARTED, to ignoreDowntime will be set to false. + */ + public boolean isIgnoreDowntime() { + return ignoreDowntime; + } + public JobStatus getStatus() { return status; } + public String getStatusReason() { + return statusReason; + } + public SchedulerState getSchedulerState() { return schedulerState; } @@ -85,7 +109,9 @@ public class Allocation extends AbstractDiffable implements ToXConte public void writeTo(StreamOutput out) throws IOException { out.writeString(nodeId); out.writeString(jobId); + out.writeBoolean(ignoreDowntime); status.writeTo(out); + out.writeOptionalString(statusReason); out.writeOptionalWriteable(schedulerState); } @@ -94,7 +120,11 @@ public class Allocation extends AbstractDiffable implements ToXConte builder.startObject(); builder.field(NODE_ID_FIELD.getPreferredName(), nodeId); builder.field(JOB_ID_FIELD.getPreferredName(), jobId); + builder.field(IGNORE_DOWNTIME_FIELD.getPreferredName(), ignoreDowntime); builder.field(STATUS.getPreferredName(), status); + if (statusReason != null) { + builder.field(STATUS_REASON.getPreferredName(), statusReason); + } if (schedulerState != null) { builder.field(SCHEDULER_STATE.getPreferredName(), schedulerState); } @@ -109,13 +139,15 @@ public class Allocation extends AbstractDiffable implements ToXConte Allocation that = (Allocation) o; return Objects.equals(nodeId, that.nodeId) && Objects.equals(jobId, that.jobId) && + Objects.equals(ignoreDowntime, that.ignoreDowntime) && Objects.equals(status, that.status) && + Objects.equals(statusReason, that.statusReason) && Objects.equals(schedulerState, that.schedulerState); } @Override public int hashCode() { - return Objects.hash(nodeId, jobId, status, schedulerState); + return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState); } // Class alreadt extends from AbstractDiffable, so copied from ToXContentToBytes#toString() @@ -137,7 +169,9 @@ public class Allocation extends AbstractDiffable implements ToXConte private String nodeId; private String jobId; + private boolean ignoreDowntime; private JobStatus status; + private String statusReason; private SchedulerState schedulerState; public Builder() { @@ -146,7 +180,9 @@ public class Allocation extends AbstractDiffable implements ToXConte public Builder(Allocation allocation) { this.nodeId = allocation.nodeId; this.jobId = allocation.jobId; + this.ignoreDowntime = allocation.ignoreDowntime; this.status = allocation.status; + this.statusReason = allocation.statusReason; this.schedulerState = allocation.schedulerState; } @@ -158,35 +194,39 @@ public class Allocation extends AbstractDiffable implements ToXConte this.jobId = jobId; } + public void setIgnoreDowntime(boolean ignoreDownTime) { + this.ignoreDowntime = ignoreDownTime; + } + @SuppressWarnings("incomplete-switch") public void setStatus(JobStatus newStatus) { - switch (newStatus) { - case CLOSING: - if (this.status == JobStatus.CLOSED) { - throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closed"); - } - if (this.status == JobStatus.CLOSING) { - throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closing"); - } - break; - case PAUSING: - if (this.status == JobStatus.CLOSED) { - throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closed"); - } - if (this.status == JobStatus.CLOSING) { - throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closing"); - } - if (this.status == JobStatus.PAUSING) { - throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is pausing"); - } - if (this.status == JobStatus.PAUSED) { - throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is already paused"); + if (this.status != null) { + switch (newStatus) { + case CLOSING: + if (this.status != JobStatus.OPENED) { + throw new IllegalArgumentException("[" + jobId + "] expected status [" + JobStatus.OPENED + + "], but got [" + status +"]"); + } + break; + case OPENING: + if (this.status.isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) { + throw new IllegalArgumentException("[" + jobId + "] expected status [" + JobStatus.CLOSED + + "] or [" + JobStatus.FAILED + "], but got [" + status +"]"); + } + break; + case OPENED: + ignoreDowntime = false; + break; } } this.status = newStatus; } + public void setStatusReason(String statusReason) { + this.statusReason = statusReason; + } + public void setSchedulerState(SchedulerState schedulerState) { JobSchedulerStatus currentSchedulerStatus = this.schedulerState == null ? JobSchedulerStatus.STOPPED : this.schedulerState.getStatus(); @@ -225,10 +265,7 @@ public class Allocation extends AbstractDiffable implements ToXConte } public Allocation build() { - if (status == null) { - status = JobStatus.CLOSED; - } - return new Allocation(nodeId, jobId, status, schedulerState); + return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java index 8fe2e393a91..31b3a234736 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java @@ -32,7 +32,7 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe clusterService.add(this); } - ClusterState allocateJobs(ClusterState current) { + ClusterState assignJobsToNodes(ClusterState current) { if (shouldAllocate(current) == false) { // returning same instance, so no cluster state update is performed return current; @@ -48,16 +48,8 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe PrelertMetadata.Builder builder = new PrelertMetadata.Builder(prelertMetadata); DiscoveryNode prelertNode = nodes.getMasterNode(); // prelert is now always master node - for (String jobId : prelertMetadata.getJobs().keySet()) { - if (prelertMetadata.getAllocations().containsKey(jobId) == false) { - boolean addSchedulderState = prelertMetadata.getJobs().get(jobId).getSchedulerConfig() != null; - if (addSchedulderState) { - builder.putAllocationWithScheduler(prelertNode.getId(), jobId); - } - else { - builder.putAllocation(prelertNode.getId(), jobId); - } - } + for (String jobId : prelertMetadata.getAllocations().keySet()) { + builder.assignToNode(jobId, prelertNode.getId()); } return ClusterState.builder(current) @@ -71,8 +63,8 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe return false; } - for (String jobId : prelertMetadata.getJobs().keySet()) { - if (prelertMetadata.getAllocations().containsKey(jobId) == false) { + for (Allocation allocation : prelertMetadata.getAllocations().values()) { + if (allocation.getNodeId() == null) { return true; } } @@ -87,7 +79,7 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe clusterService.submitStateUpdateTask("allocate_jobs", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return allocateJobs(currentState); + return assignJobsToNodes(currentState); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java index 137aa1a8e84..e8d4035940a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.prelert.job.metadata; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -21,7 +20,6 @@ import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.data.DataProcessor; import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService; -import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Set; @@ -29,10 +27,10 @@ import java.util.concurrent.Executor; public class JobLifeCycleService extends AbstractComponent implements ClusterStateListener { - volatile Set localAllocatedJobs = Collections.emptySet(); + volatile Set localAssignedJobs = new HashSet<>(); private final Client client; private final ScheduledJobService scheduledJobService; - private DataProcessor dataProcessor; + private final DataProcessor dataProcessor; private final Executor executor; public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, ScheduledJobService scheduledJobService, @@ -54,7 +52,7 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta } // Single volatile read: - Set localAllocatedJobs = this.localAllocatedJobs; + Set localAssignedJobs = this.localAssignedJobs; DiscoveryNode localNode = event.state().nodes().getLocalNode(); for (Allocation allocation : prelertMetadata.getAllocations().values()) { @@ -63,10 +61,10 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta } } - for (String localAllocatedJob : localAllocatedJobs) { + for (String localAllocatedJob : localAssignedJobs) { Allocation allocation = prelertMetadata.getAllocations().get(localAllocatedJob); if (allocation != null) { - if (localNode.getId().equals(allocation.getNodeId()) == false) { + if (localNode.getId().equals(allocation.getNodeId()) && allocation.getStatus() == JobStatus.CLOSING) { stopJob(localAllocatedJob); } } else { @@ -77,35 +75,15 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta private void handleLocallyAllocatedJob(PrelertMetadata prelertMetadata, Allocation allocation) { Job job = prelertMetadata.getJobs().get(allocation.getJobId()); - if (localAllocatedJobs.contains(allocation.getJobId()) == false) { - startJob(job); + if (localAssignedJobs.contains(allocation.getJobId()) == false) { + if (allocation.getStatus() == JobStatus.OPENING) { + startJob(allocation); + } } - handleJobStatusChange(job, allocation.getStatus()); handleSchedulerStatusChange(job, allocation); } - private void handleJobStatusChange(Job job, JobStatus status) { - switch (status) { - case PAUSING: - executor.execute(() -> pauseJob(job)); - break; - case RUNNING: - break; - case CLOSING: - executor.execute(() -> closeJob(job)); - break; - case CLOSED: - break; - case PAUSED: - break; - case FAILED: - break; - default: - throw new IllegalStateException("Unknown job status [" + status + "]"); - } - } - private void handleSchedulerStatusChange(Job job, Allocation allocation) { SchedulerState schedulerState = allocation.getSchedulerState(); if (schedulerState != null) { @@ -126,54 +104,47 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta } } - void startJob(Job job) { - logger.info("Starting job [" + job.getId() + "]"); - // noop now, but should delegate to a task / ProcessManager that actually starts the job + void startJob(Allocation allocation) { + logger.info("Starting job [" + allocation.getJobId() + "]"); + executor.execute(() -> { + try { + dataProcessor.openJob(allocation.getJobId(), allocation.isIgnoreDowntime()); + } catch (Exception e) { + logger.error("Failed to close job [" + allocation.getJobId() + "]", e); + updateJobStatus(allocation.getJobId(), JobStatus.FAILED, "failed to open, " + e.getMessage()); + } + }); // update which jobs are now allocated locally - Set newSet = new HashSet<>(localAllocatedJobs); - newSet.add(job.getId()); - localAllocatedJobs = newSet; + Set newSet = new HashSet<>(localAssignedJobs); + newSet.add(allocation.getJobId()); + localAssignedJobs = newSet; } void stopJob(String jobId) { logger.info("Stopping job [" + jobId + "]"); - // noop now, but should delegate to a task / ProcessManager that actually stops the job + executor.execute(() -> { + try { + dataProcessor.closeJob(jobId); + } catch (Exception e) { + logger.error("Failed to close job [" + jobId + "]", e); + updateJobStatus(jobId, JobStatus.FAILED, "failed to close, " + e.getMessage()); + } + }); // update which jobs are now allocated locally - Set newSet = new HashSet<>(localAllocatedJobs); + Set newSet = new HashSet<>(localAssignedJobs); newSet.remove(jobId); - localAllocatedJobs = newSet; + localAssignedJobs = newSet; } - private void closeJob(Job job) { - try { - // NORELEASE Ensure this also removes the job auto-close timeout task - dataProcessor.closeJob(job.getId(), JobStatus.CLOSED); - } catch (ElasticsearchException e) { - logger.error("Failed to close job [" + job.getId() + "]", e); - updateJobStatus(job.getId(), JobStatus.FAILED); - } - } - - private void pauseJob(Job job) { - try { - // NORELEASE Ensure this also removes the job auto-close timeout task - dataProcessor.closeJob(job.getId(), JobStatus.PAUSED); - } catch (ElasticsearchException e) { - logger.error("Failed to close job [" + job.getId() + "] while pausing", e); - updateJobStatus(job.getId(), JobStatus.FAILED); - } - } - - private void updateJobStatus(String jobId, JobStatus status) { + private void updateJobStatus(String jobId, JobStatus status, String reason) { UpdateJobStatusAction.Request request = new UpdateJobStatusAction.Request(jobId, status); + request.setReason(reason); client.execute(UpdateJobStatusAction.INSTANCE, request, new ActionListener() { @Override public void onResponse(UpdateJobStatusAction.Response response) { logger.info("Successfully set job status to [{}] for job [{}]", status, jobId); - // NORELEASE Audit job paused - // audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_PAUSED)); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java index 887f1bc1ff6..e77bac9c1fa 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java @@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcherSupplier; @@ -215,23 +216,6 @@ public class PrelertMetadata implements MetaData.Custom { return this; } - public Builder putAllocation(String nodeId, String jobId) { - Allocation.Builder builder = new Allocation.Builder(); - builder.setJobId(jobId); - builder.setNodeId(nodeId); - this.allocations.put(jobId, builder.build()); - return this; - } - - public Builder putAllocationWithScheduler(String nodeId, String jobId) { - Allocation.Builder builder = new Allocation.Builder(); - builder.setJobId(jobId); - builder.setNodeId(nodeId); - builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); - this.allocations.put(jobId, builder.build()); - return this; - } - public Builder updateAllocation(String jobId, Allocation updated) { Allocation previous = this.allocations.put(jobId, updated); if (previous == null) { @@ -264,6 +248,65 @@ public class PrelertMetadata implements MetaData.Custom { public PrelertMetadata build() { return new PrelertMetadata(jobs, allocations); } + + public Builder createAllocation(String jobId, boolean ignoreDowntime) { + Job job = jobs.get(jobId); + if (job == null) { + throw ExceptionsHelper.missingJobException(jobId); + } + + Allocation allocation = allocations.get(jobId); + Allocation.Builder builder; + if (allocation == null) { + builder = new Allocation.Builder(); + builder.setJobId(jobId); + boolean addSchedulderState = job.getSchedulerConfig() != null; + if (addSchedulderState) { + builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + } + } else { + if (allocation.getStatus() != JobStatus.CLOSED) { + throw ExceptionsHelper.conflictStatusException("[" + jobId + "] expected status [" + JobStatus.CLOSED + + "], but got [" + allocation.getStatus() +"]"); + } + builder = new Allocation.Builder(allocation); + } + + builder.setStatus(JobStatus.OPENING); + builder.setIgnoreDowntime(ignoreDowntime); + allocations.put(jobId, builder.build()); + return this; + } + + public Builder assignToNode(String jobId, String nodeId) { + Allocation allocation = allocations.get(jobId); + if (allocation == null) { + throw new IllegalStateException("[" + jobId + "] no allocation to assign to node [" + nodeId + "]"); + } + Allocation.Builder builder = new Allocation.Builder(allocation); + builder.setNodeId(nodeId); + allocations.put(jobId, builder.build()); + return this; + } + + public Builder updateStatus(String jobId, JobStatus jobStatus, @Nullable String reason) { + Allocation previous = allocations.get(jobId); + if (previous == null) { + throw new IllegalStateException("[" + jobId + "] no allocation exist to update the status to [" + jobStatus + "]"); + } + Allocation.Builder builder = new Allocation.Builder(previous); + builder.setStatus(jobStatus); + if (reason != null) { + builder.setStatusReason(reason); + } + if (previous.getStatus() != jobStatus && jobStatus == JobStatus.CLOSED) { + Job.Builder job = new Job.Builder(this.jobs.get(jobId)); + job.setFinishedTime(new Date()); + this.jobs.put(job.getId(), job.build()); + } + allocations.put(jobId, builder.build()); + return this; + } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParser.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParser.java index 30d2004500b..f26c6c4d337 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParser.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParser.java @@ -34,7 +34,7 @@ public class AutodetectResultsParser extends AbstractComponent { this.parseFieldMatcherSupplier = parseFieldMatcherSupplier; } - CloseableIterator parseResults(InputStream in) throws ElasticsearchParseException { + public CloseableIterator parseResults(InputStream in) throws ElasticsearchParseException { try { XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(in); XContentParser.Token token = parser.nextToken(); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/data/RestPostDataCloseAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/data/RestPostDataCloseAction.java deleted file mode 100644 index 5d46b679041..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/data/RestPostDataCloseAction.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.rest.data; - -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.RestController; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.AcknowledgedRestListener; -import org.elasticsearch.xpack.prelert.PrelertPlugin; -import org.elasticsearch.xpack.prelert.action.PostDataCloseAction; -import org.elasticsearch.xpack.prelert.job.Job; - -import java.io.IOException; - -public class RestPostDataCloseAction extends BaseRestHandler { - - private final PostDataCloseAction.TransportAction transportPostDataCloseAction; - - @Inject - public RestPostDataCloseAction(Settings settings, RestController controller, - PostDataCloseAction.TransportAction transportPostDataCloseAction) { - super(settings); - this.transportPostDataCloseAction = transportPostDataCloseAction; - controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH - + "data/{" + Job.ID.getPreferredName() + "}/_close", this); - } - - @Override - protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - PostDataCloseAction.Request postDataCloseRequest = new PostDataCloseAction.Request( - restRequest.param(Job.ID.getPreferredName())); - - return channel -> transportPostDataCloseAction.execute(postDataCloseRequest, new AcknowledgedRestListener<>(channel)); - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestPauseJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestCloseJobAction.java similarity index 55% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestPauseJobAction.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestCloseJobAction.java index 9ca9678bc78..f567d87f1c3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestPauseJobAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestCloseJobAction.java @@ -8,31 +8,35 @@ package org.elasticsearch.xpack.prelert.rest.job; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.AcknowledgedRestListener; import org.elasticsearch.xpack.prelert.PrelertPlugin; -import org.elasticsearch.xpack.prelert.action.PauseJobAction; +import org.elasticsearch.xpack.prelert.action.CloseJobAction; import org.elasticsearch.xpack.prelert.job.Job; import java.io.IOException; -public class RestPauseJobAction extends BaseRestHandler { +public class RestCloseJobAction extends BaseRestHandler { - private final PauseJobAction.TransportAction transportPauseJobAction; + private final CloseJobAction.TransportAction closeJobAction; @Inject - public RestPauseJobAction(Settings settings, RestController controller, PauseJobAction.TransportAction transportPauseJobAction) { + public RestCloseJobAction(Settings settings, RestController controller, CloseJobAction.TransportAction closeJobAction) { super(settings); - this.transportPauseJobAction = transportPauseJobAction; - controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "jobs/{" + Job.ID.getPreferredName() + "}/_pause", - this); + this.closeJobAction = closeJobAction; + controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + + "data/{" + Job.ID.getPreferredName() + "}/_close", this); } @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - PauseJobAction.Request request = new PauseJobAction.Request(restRequest.param(Job.ID.getPreferredName())); - return channel -> transportPauseJobAction.execute(request, new AcknowledgedRestListener<>(channel)); + CloseJobAction.Request request = new CloseJobAction.Request(restRequest.param(Job.ID.getPreferredName())); + if (restRequest.hasParam("close_timeout")) { + request.setCloseTimeout(TimeValue.parseTimeValue(restRequest.param("close_timeout"), "close_timeout")); + } + return channel -> closeJobAction.execute(request, new AcknowledgedRestListener<>(channel)); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestResumeJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestOpenJobAction.java similarity index 52% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestResumeJobAction.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestOpenJobAction.java index 5490de4ce3d..1c2cc7b621b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestResumeJobAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestOpenJobAction.java @@ -8,31 +8,36 @@ package org.elasticsearch.xpack.prelert.rest.job; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.AcknowledgedRestListener; import org.elasticsearch.xpack.prelert.PrelertPlugin; -import org.elasticsearch.xpack.prelert.action.ResumeJobAction; +import org.elasticsearch.xpack.prelert.action.OpenJobAction; import org.elasticsearch.xpack.prelert.job.Job; import java.io.IOException; -public class RestResumeJobAction extends BaseRestHandler { +public class RestOpenJobAction extends BaseRestHandler { - private final ResumeJobAction.TransportAction transportResumeJobAction; + private final OpenJobAction.TransportAction openJobAction; @Inject - public RestResumeJobAction(Settings settings, RestController controller, ResumeJobAction.TransportAction transportResumeJobAction) { + public RestOpenJobAction(Settings settings, RestController controller, OpenJobAction.TransportAction openJobAction) { super(settings); - this.transportResumeJobAction = transportResumeJobAction; - controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "jobs/{" + Job.ID.getPreferredName() + "}/_resume", + this.openJobAction = openJobAction; + controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "data/{" + Job.ID.getPreferredName() + "}/_open", this); } @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - ResumeJobAction.Request request = new ResumeJobAction.Request(restRequest.param(Job.ID.getPreferredName())); - return channel -> transportResumeJobAction.execute(request, new AcknowledgedRestListener<>(channel)); + OpenJobAction.Request request = new OpenJobAction.Request(restRequest.param(Job.ID.getPreferredName())); + request.setIgnoreDowntime(restRequest.paramAsBoolean("ignore_downtime", false)); + if (restRequest.hasParam("open_timeout")) { + request.setOpenTimeout(TimeValue.parseTimeValue(restRequest.param("open_timeout"), "open_timeout")); + } + return channel -> openJobAction.execute(request, new AcknowledgedRestListener<>(channel)); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/PauseJobRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/CloseJobActionRequestTests.java similarity index 80% rename from elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/PauseJobRequestTests.java rename to elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/CloseJobActionRequestTests.java index dd50a11fed1..4ff88b1662c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/PauseJobRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/CloseJobActionRequestTests.java @@ -5,10 +5,10 @@ */ package org.elasticsearch.xpack.prelert.action; -import org.elasticsearch.xpack.prelert.action.PauseJobAction.Request; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; +import org.elasticsearch.xpack.prelert.action.CloseJobAction.Request; -public class PauseJobRequestTests extends AbstractStreamableTestCase { +public class CloseJobActionRequestTests extends AbstractStreamableTestCase { @Override protected Request createTestInstance() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/PostDataCloseRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/OpenJobActionRequestTests.java similarity index 81% rename from elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/PostDataCloseRequestTests.java rename to elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/OpenJobActionRequestTests.java index f938ddf047d..92082e96c24 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/PostDataCloseRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/OpenJobActionRequestTests.java @@ -5,10 +5,10 @@ */ package org.elasticsearch.xpack.prelert.action; +import org.elasticsearch.xpack.prelert.action.OpenJobAction.Request; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; -import org.elasticsearch.xpack.prelert.action.PostDataCloseAction.Request; -public class PostDataCloseRequestTests extends AbstractStreamableTestCase { +public class OpenJobActionRequestTests extends AbstractStreamableTestCase { @Override protected Request createTestInstance() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ResumeJobRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ResumeJobRequestTests.java deleted file mode 100644 index b0930604883..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ResumeJobRequestTests.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.action; - -import org.elasticsearch.xpack.prelert.action.ResumeJobAction.Request; -import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; - -public class ResumeJobRequestTests extends AbstractStreamableTestCase { - - @Override - protected Request createTestInstance() { - return new Request(randomAsciiOfLengthBetween(1, 20)); - } - - @Override - protected Request createBlankInstance() { - return new Request(); - } -} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java index 5339056e6f2..9ea51b4cc1b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java @@ -72,6 +72,8 @@ public class ScheduledJobsIT extends ESIntegTestCase { PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true)); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); + OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get(); + assertTrue(openJobResponse.isAcknowledged()); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, now); StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState); @@ -102,6 +104,8 @@ public class ScheduledJobsIT extends ESIntegTestCase { PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true)); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); + OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get(); + assertTrue(openJobResponse.isAcknowledged()); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, null); StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState); @@ -211,8 +215,8 @@ public class ScheduledJobsIT extends ESIntegTestCase { // ignore } try { - PostDataCloseAction.Response response = - client.execute(PostDataCloseAction.INSTANCE, new PostDataCloseAction.Request(jobId)).get(); + CloseJobAction.Response response = + client.execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId)).get(); assertTrue(response.isAcknowledged()); } catch (Exception e) { // ignore diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java index b41296bc754..2aa1521f3d7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java @@ -241,74 +241,6 @@ public class PrelertJobIT extends ESRestTestCase { assertThat(responseAsString, containsString("\"count\":1")); } - public void testPauseAndResumeJob() throws Exception { - createFarequoteJob(); - - ResponseException e = expectThrows(ResponseException.class, - () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause")); - assertThat(e.getMessage(), containsString("[farequote][CLOSED] can't pause a job that is closed")); - - client().performRequest("post", PrelertPlugin.BASE_PATH + "data/farequote/", Collections.emptyMap(), - new StringEntity("time,airline,responsetime,sourcetype\n" + - "2014-06-23 00:00:00Z,AAL,132.2046,farequote")); - assertBusy(() -> { - try { - Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote", - Collections.singletonMap("metric", "status")); - assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\"")); - } catch (Exception e1) { - throw new RuntimeException(e1); - } - }); - - client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause"); - assertBusy(() -> { - try { - Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote", - Collections.singletonMap("metric", "config,status")); - String responseEntityToString = responseEntityToString(response); - assertThat(responseEntityToString, containsString("\"ignore_downtime\":\"ONCE\"")); - assertThat(responseEntityToString, containsString("\"status\":\"PAUSED\"")); - } catch (Exception e1) { - fail(); - } - }); - - e = expectThrows(ResponseException.class, - () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause")); - assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409)); - assertThat(e.getMessage(), containsString("Cannot pause job 'farequote' while its status is PAUSED")); - - client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume"); - client().performRequest("post", PrelertPlugin.BASE_PATH + "data/farequote/", Collections.emptyMap(), - new StringEntity("time,airline,responsetime,sourcetype\n" + - "2014-06-23 00:00:00Z,AAL,132.2046,farequote")); - assertBusy(() -> { - try { - Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote", - Collections.singletonMap("metric", "status")); - assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\"")); - } catch (Exception e1) { - throw new RuntimeException(e1); - } - }); - - e = expectThrows(ResponseException.class, - () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume")); - assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409)); - assertThat(e.getMessage(), containsString("Cannot resume job 'farequote' while its status is RUNNING")); - } - - public void testResumeJob_GivenJobIsClosed() throws Exception { - createFarequoteJob(); - - ResponseException e = expectThrows(ResponseException.class, - () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume")); - - assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409)); - assertThat(e.getMessage(), containsString("Cannot resume job 'farequote' while its status is CLOSED")); - } - private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception { try { client().performRequest("put", "prelertresults-" + jobId, Collections.emptyMap(), new StringEntity(RESULT_MAPPING)); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java index e241b2f1be8..61f31f44c14 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.prelert.PrelertPlugin; +import org.elasticsearch.xpack.prelert.action.ScheduledJobsIT; import org.junit.After; import java.io.BufferedReader; @@ -52,6 +53,7 @@ public class ScheduledJobIT extends ESRestTestCase { String jobId = "_id2"; createAirlineDataIndex(); createScheduledJob(jobId); + openJob(client(), jobId); Response startSchedulerRequest = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); @@ -76,6 +78,7 @@ public class ScheduledJobIT extends ESRestTestCase { String jobId = "_id3"; createAirlineDataIndex(); createScheduledJob(jobId); + openJob(client(), jobId); Response response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z"); @@ -85,9 +88,9 @@ public class ScheduledJobIT extends ESRestTestCase { assertBusy(() -> { try { Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId, - Collections.singletonMap("metric", "status,data_counts")); + Collections.singletonMap("metric", "data_counts,status")); String responseAsString = responseEntityToString(getJobResponse); - assertThat(responseAsString, containsString("\"status\":\"RUNNING\"")); + assertThat(responseAsString, containsString("\"status\":\"OPENED\"")); assertThat(responseAsString, containsString("\"input_record_count\":2")); } catch (Exception e1) { throw new RuntimeException(e1); @@ -215,11 +218,17 @@ public class ScheduledJobIT extends ESRestTestCase { // ignore } try { - client.performRequest("POST", "/_xpack/prelert/data/" + jobId + "/_close"); + Response response = client.performRequest("POST", "/_xpack/prelert/data/" + jobId + "/_close"); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); } catch (Exception e) { // ignore } client.performRequest("DELETE", "/_xpack/prelert/jobs/" + jobId); } } + + public static void openJob(RestClient client, String jobId) throws IOException { + Response response = client.performRequest("post", PrelertPlugin.BASE_PATH + "data/" + jobId + "/_open"); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java index cb1b1743834..4ef4b4287ff 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java @@ -7,20 +7,20 @@ package org.elasticsearch.xpack.prelert.integration; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.prelert.PrelertPlugin; -import org.elasticsearch.xpack.prelert.action.GetJobsAction; -import org.elasticsearch.xpack.prelert.action.PostDataAction; +import org.elasticsearch.xpack.prelert.action.OpenJobAction; import org.elasticsearch.xpack.prelert.action.PutJobAction; import org.elasticsearch.xpack.prelert.action.ScheduledJobsIT; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobStatus; +import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager; import org.junit.After; import java.util.Collection; @@ -42,70 +42,55 @@ public class TooManyJobsIT extends ESIntegTestCase { @After public void clearPrelertMetadata() throws Exception { ScheduledJobsIT.clearPrelertMetadata(client()); + client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), (String) null) + ).get(); } public void testCannotStartTooManyAnalyticalProcesses() throws Exception { - String jsonLine = "{\"time\": \"0\"}"; - int maxNumJobs = 1000; - for (int i = 1; i <= maxNumJobs; i++) { + int maxRunningJobsPerNode = randomIntBetween(1, 16); + logger.info("Setting [{}] to [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode); + client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder() + .put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode) + ).get(); + for (int i = 1; i <= (maxRunningJobsPerNode + 1); i++) { Job.Builder job = createJob(Integer.toString(i)); PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true)); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); - assertBusy(() -> { - try { - GetJobsAction.Request getJobRequest = new GetJobsAction.Request(); - getJobRequest.setJobId(job.getId()); - getJobRequest.status(true); - GetJobsAction.Response response = client().execute(GetJobsAction.INSTANCE, getJobRequest).get(); - GetJobsAction.Response.JobInfo jobInfo = response.getResponse().results().get(0); - assertNotNull(jobInfo); - assertEquals(JobStatus.CLOSED, jobInfo.getStatus()); - } catch (Exception e) { - fail("failure " + e.getMessage()); - } - }); - // triggers creating autodetect process: - PostDataAction.Request postDataRequest = new PostDataAction.Request(job.getId()); - postDataRequest.setContent(new BytesArray(jsonLine)); try { - PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).get(); - assertEquals(1, postDataResponse.getDataCounts().getInputRecordCount()); - logger.info("Posted data {} times", i); + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); + openJobRequest.setOpenTimeout(TimeValue.timeValueSeconds(10)); + OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, openJobRequest) + .get(); + assertTrue(openJobResponse.isAcknowledged()); + logger.info("Opened {}th job", i); } catch (Exception e) { Throwable cause = ExceptionsHelper.unwrapCause(e.getCause()); if (ElasticsearchStatusException.class.equals(cause.getClass()) == false) { logger.warn("Unexpected cause", e); } assertEquals(ElasticsearchStatusException.class, cause.getClass()); - assertEquals(RestStatus.FORBIDDEN, ((ElasticsearchStatusException) cause).status()); - logger.info("good news everybody --> reached threadpool capacity after starting {}th analytical process", i); + assertEquals(RestStatus.CONFLICT, ((ElasticsearchStatusException) cause).status()); + assertEquals("[" + (maxRunningJobsPerNode + 1) + "] expected job status [OPENED], but got [FAILED], reason " + + "[failed to open, max running job capacity [" + maxRunningJobsPerNode + "] reached]", cause.getMessage()); + logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i); // now manually clean things up and see if we can succeed to start one new job clearPrelertMetadata(); putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); - assertBusy(() -> { - try { - GetJobsAction.Request getJobRequest = new GetJobsAction.Request(); - getJobRequest.setJobId(job.getId()); - getJobRequest.status(true); - GetJobsAction.Response response = client().execute(GetJobsAction.INSTANCE, getJobRequest).get(); - GetJobsAction.Response.JobInfo jobInfo = response.getResponse().results().get(0); - assertNotNull(jobInfo); - assertEquals(JobStatus.CLOSED, jobInfo.getStatus()); - } catch (Exception e1) { - fail("failure " + e1.getMessage()); - } - }); - PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).get(); - assertEquals(1, postDataResponse.getDataCounts().getInputRecordCount()); + OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())) + .get(); + assertTrue(openJobResponse.isAcknowledged()); return; } } - fail("shouldn't be able to add [" + maxNumJobs + "] jobs"); + fail("shouldn't be able to add more than [" + maxRunningJobsPerNode + "] jobs"); } private Job.Builder createJob(String id) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobStatusTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobStatusTests.java index afcd067bbdb..8ef72c1648f 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobStatusTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobStatusTests.java @@ -13,30 +13,27 @@ public class JobStatusTests extends ESTestCase { assertEquals(JobStatus.fromString("closed"), JobStatus.CLOSED); assertEquals(JobStatus.fromString("closing"), JobStatus.CLOSING); assertEquals(JobStatus.fromString("failed"), JobStatus.FAILED); - assertEquals(JobStatus.fromString("paused"), JobStatus.PAUSED); - assertEquals(JobStatus.fromString("pausing"), JobStatus.PAUSING); - assertEquals(JobStatus.fromString("running"), JobStatus.RUNNING); + assertEquals(JobStatus.fromString("opening"), JobStatus.OPENING); + assertEquals(JobStatus.fromString("opened"), JobStatus.OPENED); } public void testValidOrdinals() { - assertEquals(0, JobStatus.RUNNING.ordinal()); - assertEquals(1, JobStatus.CLOSING.ordinal()); - assertEquals(2, JobStatus.CLOSED.ordinal()); - assertEquals(3, JobStatus.FAILED.ordinal()); - assertEquals(4, JobStatus.PAUSING.ordinal()); - assertEquals(5, JobStatus.PAUSED.ordinal()); + assertEquals(0, JobStatus.CLOSING.ordinal()); + assertEquals(1, JobStatus.CLOSED.ordinal()); + assertEquals(2, JobStatus.OPENING.ordinal()); + assertEquals(3, JobStatus.OPENED.ordinal()); + assertEquals(4, JobStatus.FAILED.ordinal()); } public void testIsAnyOf() { - assertFalse(JobStatus.RUNNING.isAnyOf()); - assertFalse(JobStatus.RUNNING.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING, JobStatus.FAILED, - JobStatus.PAUSED, JobStatus.PAUSING)); - assertFalse(JobStatus.CLOSED.isAnyOf(JobStatus.RUNNING, JobStatus.CLOSING, JobStatus.FAILED, - JobStatus.PAUSED, JobStatus.PAUSING)); + assertFalse(JobStatus.OPENED.isAnyOf()); + assertFalse(JobStatus.OPENED.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING, JobStatus.FAILED, + JobStatus.OPENING)); + assertFalse(JobStatus.CLOSED.isAnyOf(JobStatus.CLOSING, JobStatus.FAILED, JobStatus.OPENING, JobStatus.OPENED)); - assertTrue(JobStatus.RUNNING.isAnyOf(JobStatus.RUNNING)); - assertTrue(JobStatus.RUNNING.isAnyOf(JobStatus.RUNNING, JobStatus.CLOSED)); - assertTrue(JobStatus.PAUSED.isAnyOf(JobStatus.PAUSED, JobStatus.PAUSING)); - assertTrue(JobStatus.PAUSING.isAnyOf(JobStatus.PAUSED, JobStatus.PAUSING)); + assertTrue(JobStatus.OPENED.isAnyOf(JobStatus.OPENED)); + assertTrue(JobStatus.OPENED.isAnyOf(JobStatus.OPENED, JobStatus.CLOSED)); + assertTrue(JobStatus.CLOSING.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING)); + assertTrue(JobStatus.CLOSED.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING)); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java index ca7ac809389..8f0085cea63 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java @@ -6,11 +6,16 @@ package org.elasticsearch.xpack.prelert.job.manager; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.prelert.PrelertPlugin; +import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataDescription; @@ -29,6 +34,8 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectR import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange; +import org.elasticsearch.xpack.prelert.job.results.AutodetectResult; +import org.elasticsearch.xpack.prelert.utils.CloseableIterator; import org.junit.Before; import org.mockito.Mockito; @@ -37,9 +44,12 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; +import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doThrow; import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.verify; @@ -48,6 +58,7 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -70,17 +81,83 @@ public class AutodetectProcessManagerTests extends ESTestCase { jobProvider = mock(JobProvider.class); jobResultsPersister = mock(JobResultsPersister.class); jobDataCountsPersister = mock(JobDataCountsPersister.class); - givenAllocationWithStatus(JobStatus.CLOSED); + givenAllocationWithStatus(JobStatus.OPENED); } - public void testCreateProcessBySubmittingData() { + public void testOpenJob() { + Client client = mock(Client.class); + AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); + when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); + AutodetectProcessManager manager = createManager(communicator, client); + + manager.openJob("foo", false); + assertEquals(1, manager.numberOfOpenJobs()); + assertTrue(manager.jobHasActiveAutodetectProcess("foo")); + UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("foo", JobStatus.OPENED); + verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any()); + } + + public void testOpenJob_exceedMaxNumJobs() { + when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); + when(jobProvider.dataCounts("foo")).thenReturn(new DataCounts("foo")); + when(jobManager.getJobOrThrowIfUnknown("bar")).thenReturn(createJobDetails("bar")); + when(jobProvider.dataCounts("bar")).thenReturn(new DataCounts("bar")); + when(jobManager.getJobOrThrowIfUnknown("baz")).thenReturn(createJobDetails("baz")); + when(jobProvider.dataCounts("baz")).thenReturn(new DataCounts("baz")); + when(jobManager.getJobOrThrowIfUnknown("foobar")).thenReturn(createJobDetails("foobar")); + when(jobProvider.dataCounts("foobar")).thenReturn(new DataCounts("foobar")); + + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); + when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable); + ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME)).thenReturn(executorService); + AutodetectResultsParser parser = mock(AutodetectResultsParser.class); + @SuppressWarnings("unchecked") + CloseableIterator iterator = mock(CloseableIterator.class); + when(iterator.hasNext()).thenReturn(false); + when(parser.parseResults(any())).thenReturn(iterator); + AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); + when(autodetectProcess.isProcessAlive()).thenReturn(true); + when(autodetectProcess.getPersistStream()).thenReturn(new ByteArrayInputStream(new byte[0])); + AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess; + Settings.Builder settings = Settings.builder(); + settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); + Set> settingSet = new HashSet<>(); + settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet); + AutodetectProcessManager manager = new AutodetectProcessManager(settings.build(), client, threadPool, + jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings); + + manager.openJob("foo", false); + manager.openJob("bar", false); + manager.openJob("baz", false); + assertEquals(3, manager.numberOfOpenJobs()); + + Exception e = expectThrows(ElasticsearchStatusException.class, () -> manager.openJob("foobar", false)); + assertEquals("max running job capacity [3] reached", e.getMessage()); + + manager.closeJob("baz"); + assertEquals(2, manager.numberOfOpenJobs()); + manager.openJob("foobar", false); + assertEquals(3, manager.numberOfOpenJobs()); + } + + public void testProcessData() { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManager(communicator); - assertEquals(0, manager.numberOfRunningJobs()); + assertEquals(0, manager.numberOfOpenJobs()); DataLoadParams params = new DataLoadParams(TimeRange.builder().build()); + manager.openJob("foo", false); manager.processData("foo", createInputStream(""), params, () -> false); - assertEquals(1, manager.numberOfRunningJobs()); + assertEquals(1, manager.numberOfOpenJobs()); } public void testProcessDataThrowsElasticsearchStatusException_onIoException() throws Exception { @@ -92,6 +169,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { Supplier cancellable = () -> false; doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params, cancellable); + manager.openJob("foo", false); ESTestCase.expectThrows(ElasticsearchException.class, () -> manager.processData("foo", inputStream, params, cancellable)); } @@ -100,14 +178,15 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); AutodetectProcessManager manager = createManager(communicator); - assertEquals(0, manager.numberOfRunningJobs()); + assertEquals(0, manager.numberOfOpenJobs()); + manager.openJob("foo", false); manager.processData("foo", createInputStream(""), mock(DataLoadParams.class), () -> false); // job is created - assertEquals(1, manager.numberOfRunningJobs()); - manager.closeJob("foo", JobStatus.CLOSED); - assertEquals(0, manager.numberOfRunningJobs()); + assertEquals(1, manager.numberOfOpenJobs()); + manager.closeJob("foo"); + assertEquals(0, manager.numberOfOpenJobs()); } public void testBucketResetMessageIsSent() throws IOException { @@ -117,6 +196,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { Supplier cancellable = () -> false; DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true); InputStream inputStream = createInputStream(""); + manager.openJob("foo", false); manager.processData("foo", inputStream, params, cancellable); verify(communicator).writeToJob(inputStream, params, cancellable); } @@ -127,6 +207,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); InputStream inputStream = createInputStream(""); + manager.openJob("foo", false); manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false); InterimResultsParams params = InterimResultsParams.builder().build(); @@ -143,7 +224,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { doThrow(new IOException("blah")).when(communicator).flushJob(params); ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> manager.flushJob("foo", params)); - assertEquals("Exception flushing process for job foo", e.getMessage()); + assertEquals("[foo] exception while flushing job", e.getMessage()); } public void testWriteUpdateConfigMessage() throws IOException { @@ -158,34 +239,25 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator); assertFalse(manager.jobHasActiveAutodetectProcess("foo")); + manager.openJob("foo", false); manager.processData("foo", createInputStream(""), mock(DataLoadParams.class), () -> false); assertTrue(manager.jobHasActiveAutodetectProcess("foo")); assertFalse(manager.jobHasActiveAutodetectProcess("bar")); } - public void testProcessData_GivenPausingJob() { + public void testProcessData_GivenStatusNotStarted() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); + when(communicator.writeToJob(any(), any(), any())).thenReturn(new DataCounts("foo")); AutodetectProcessManager manager = createManager(communicator); Job job = createJobDetails("foo"); when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(job); - givenAllocationWithStatus(JobStatus.PAUSING); + givenAllocationWithStatus(JobStatus.OPENED); InputStream inputStream = createInputStream(""); - DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false); - - assertThat(dataCounts, equalTo(new DataCounts("foo"))); - } - - public void testProcessData_GivenPausedJob() { - AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); - Job job = createJobDetails("foo"); - when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(job); - givenAllocationWithStatus(JobStatus.PAUSED); - AutodetectProcessManager manager = createManager(communicator); - InputStream inputStream = createInputStream(""); + manager.openJob("foo", false); DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false); assertThat(dataCounts, equalTo(new DataCounts("foo"))); @@ -200,11 +272,15 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobManager.getJobOrThrowIfUnknown("_id")).thenReturn(createJobDetails("_id")); when(jobProvider.dataCounts("_id")).thenReturn(new DataCounts("_id")); + Set> settingSet = new HashSet<>(); + settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet); AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, - jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory); + jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings); expectThrows(EsRejectedExecutionException.class, () -> manager.create("_id", false)); verify(autodetectProcess, times(1)).close(); @@ -212,18 +288,25 @@ public class AutodetectProcessManagerTests extends ESTestCase { private void givenAllocationWithStatus(JobStatus status) { Allocation.Builder allocation = new Allocation.Builder(); - allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses allocation.setStatus(status); when(jobManager.getJobAllocation("foo")).thenReturn(allocation.build()); } private AutodetectProcessManager createManager(AutodetectCommunicator communicator) { Client client = mock(Client.class); + return createManager(communicator, client); + } + + private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) { ThreadPool threadPool = mock(ThreadPool.class); AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); + Set> settingSet = new HashSet<>(); + settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet); AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, - jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory); + jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings); manager = spy(manager); doReturn(communicator).when(manager).create(any(), anyBoolean()); return manager; @@ -231,6 +314,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommunicator communicator, String jobId) { AutodetectProcessManager manager = createManager(communicator); + manager.openJob(jobId, false); manager.processData(jobId, createInputStream(""), mock(DataLoadParams.class), () -> false); return manager; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java index 75809ac0045..d7786ae6fd1 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java @@ -43,13 +43,12 @@ public class JobManagerTests extends ESTestCase { private ClusterService clusterService; private JobProvider jobProvider; - private Auditor auditor; @Before public void setupMocks() { clusterService = mock(ClusterService.class); jobProvider = mock(JobProvider.class); - auditor = mock(Auditor.class); + Auditor auditor = mock(Auditor.class); when(jobProvider.audit(anyString())).thenReturn(auditor); } @@ -65,7 +64,7 @@ public class JobManagerTests extends ESTestCase { } public void testFilter() { - Set running = new HashSet(Arrays.asList("henry", "dim", "dave")); + Set running = new HashSet<>(Arrays.asList("henry", "dim", "dave")); Set diff = new HashSet<>(Arrays.asList("dave", "tom")).stream().filter((s) -> !running.contains(s)) .collect(Collectors.toCollection(HashSet::new)); @@ -85,7 +84,7 @@ public class JobManagerTests extends ESTestCase { assertThat(prelertMetadata.getJobs().containsKey("foo"), is(false)); } - public void testRemoveJobFromClusterState_GivenJobIsRunning() { + public void testRemoveJobFromClusterState_GivenJobIsOpened() { JobManager jobManager = createJobManager(); ClusterState clusterState = createClusterState(); Job job = buildJobBuilder("foo").build(); @@ -93,9 +92,10 @@ public class JobManagerTests extends ESTestCase { Allocation.Builder allocation = new Allocation.Builder(); allocation.setNodeId("myNode"); allocation.setJobId(job.getId()); - allocation.setStatus(JobStatus.RUNNING); + allocation.setStatus(JobStatus.OPENING); PrelertMetadata.Builder newMetadata = new PrelertMetadata.Builder(clusterState.metaData().custom(PrelertMetadata.TYPE)); - newMetadata.putAllocation("myNode", job.getId()); + newMetadata.createAllocation(job.getId(), false); + newMetadata.assignToNode(job.getId(), "myNode"); newMetadata.updateAllocation(job.getId(), allocation.build()); ClusterState jobRunningClusterState = new ClusterState.Builder(clusterState) @@ -104,7 +104,7 @@ public class JobManagerTests extends ESTestCase { ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> jobManager.removeJobFromClusterState("foo", jobRunningClusterState)); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - assertThat(e.getMessage(), equalTo("Cannot delete job 'foo' while it is RUNNING")); + assertThat(e.getMessage(), equalTo("Cannot delete job 'foo' while it is OPENING")); } public void testRemoveJobFromClusterState_jobMissing() { @@ -137,7 +137,8 @@ public class JobManagerTests extends ESTestCase { Job job = buildJobBuilder("foo").build(); PrelertMetadata prelertMetadata = new PrelertMetadata.Builder() .putJob(job, false) - .putAllocation("nodeId", "foo") + .createAllocation("foo", false) + .assignToNode("foo", "nodeId") .build(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)).build(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java index a8c3fcc2cfb..8fb8c22e034 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java @@ -19,9 +19,11 @@ public class AllocationTests extends AbstractSerializingTestCase { protected Allocation createTestInstance() { String nodeId = randomAsciiOfLength(10); String jobId = randomAsciiOfLength(10); + boolean ignoreDowntime = randomBoolean(); JobStatus jobStatus = randomFrom(JobStatus.values()); + String statusReason = randomBoolean() ? randomAsciiOfLength(10) : null; SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, randomPositiveLong(), randomPositiveLong()); - return new Allocation(nodeId, jobId, jobStatus, schedulerState); + return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason, schedulerState); } @Override @@ -34,4 +36,13 @@ public class AllocationTests extends AbstractSerializingTestCase { return Allocation.PARSER.apply(parser, () -> matcher).build(); } + public void testUnsetIgnoreDownTime() { + Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null, null); + assertTrue(allocation.isIgnoreDowntime()); + Allocation.Builder builder = new Allocation.Builder(allocation); + builder.setStatus(JobStatus.OPENED); + allocation = builder.build(); + assertFalse(allocation.isIgnoreDowntime()); + } + } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java index f658c69f46b..6632893d890 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java @@ -56,33 +56,35 @@ public class JobAllocatorTests extends ESTestCase { PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(cs.metaData().custom(PrelertMetadata.TYPE)); pmBuilder.putJob((buildJobBuilder("_job_id").build()), false); + pmBuilder.createAllocation("_job_id", false); cs = ClusterState.builder(cs).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .build(); assertTrue("A unassigned job, so we should allocate", jobAllocator.shouldAllocate(cs)); pmBuilder = new PrelertMetadata.Builder(cs.metaData().custom(PrelertMetadata.TYPE)); - pmBuilder.putAllocation("_node_id", "_job_id"); + pmBuilder.assignToNode("_job_id", "_node_id"); cs = ClusterState.builder(cs).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .build(); assertFalse("Job is allocate, so nothing to allocate", jobAllocator.shouldAllocate(cs)); } - public void testAllocateJobs() { + public void testAssignJobsToNodes() { PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); + pmBuilder.createAllocation("_job_id", false); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) .masterNodeId("_node_id")) .build(); - ClusterState result1 = jobAllocator.allocateJobs(cs1); + ClusterState result1 = jobAllocator.assignJobsToNodes(cs1); PrelertMetadata pm = result1.metaData().custom(PrelertMetadata.TYPE); assertEquals("_job_id must be allocated to _node_id", pm.getAllocations().get("_job_id").getNodeId(), "_node_id"); - ClusterState result2 = jobAllocator.allocateJobs(result1); + ClusterState result2 = jobAllocator.assignJobsToNodes(result1); assertSame("job has been allocated, same instance must be returned", result1, result2); ClusterState cs2 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() @@ -95,13 +97,13 @@ public class JobAllocatorTests extends ESTestCase { ) .build(); // should fail, prelert only support single node for now - expectThrows(IllegalStateException.class, () -> jobAllocator.allocateJobs(cs2)); + expectThrows(IllegalStateException.class, () -> jobAllocator.assignJobsToNodes(cs2)); ClusterState cs3 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .build(); // we need to have at least one node - expectThrows(IllegalStateException.class, () -> jobAllocator.allocateJobs(cs3)); + expectThrows(IllegalStateException.class, () -> jobAllocator.assignJobsToNodes(cs3)); pmBuilder = new PrelertMetadata.Builder(result1.getMetaData().custom(PrelertMetadata.TYPE)); pmBuilder.removeJob("_job_id"); @@ -111,7 +113,7 @@ public class JobAllocatorTests extends ESTestCase { .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) .masterNodeId("_node_id")) .build(); - ClusterState result3 = jobAllocator.allocateJobs(cs4); + ClusterState result3 = jobAllocator.assignJobsToNodes(cs4); pm = result3.metaData().custom(PrelertMetadata.TYPE); assertNull("_job_id must be unallocated, because job has been removed", pm.getAllocations().get("_job_id")); } @@ -152,7 +154,8 @@ public class JobAllocatorTests extends ESTestCase { // add an allocated job PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_id").build(), false); - pmBuilder.putAllocation("_id", "_id"); + pmBuilder.createAllocation("_id", false); + pmBuilder.assignToNode("_id", "_node_id"); cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() .add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT)) @@ -168,6 +171,7 @@ public class JobAllocatorTests extends ESTestCase { // make job not allocated pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); + pmBuilder.createAllocation("_job_id", false); cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() .add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT)) @@ -194,6 +198,7 @@ public class JobAllocatorTests extends ESTestCase { jobBuilder.setDataDescription(dataDescriptionBuilder); pmBuilder.putJob(jobBuilder.build(), false); + pmBuilder.createAllocation("_job_id", false); ClusterState cs = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) @@ -204,7 +209,7 @@ public class JobAllocatorTests extends ESTestCase { .build(); - ClusterState clusterStateWithAllocation = jobAllocator.allocateJobs(cs); + ClusterState clusterStateWithAllocation = jobAllocator.assignJobsToNodes(cs); PrelertMetadata metadata = clusterStateWithAllocation.metaData().custom(PrelertMetadata.TYPE); assertEquals(JobSchedulerStatus.STOPPED, metadata.getAllocations().get("_job_id").getSchedulerState().getStatus()); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java index 6b8baf7b6a0..5a3c8424d03 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.prelert.job.metadata; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -19,48 +18,51 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction; -import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.data.DataProcessor; import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService; import org.junit.Before; -import org.mockito.Mockito; import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; public class JobLifeCycleServiceTests extends ESTestCase { - private ClusterService clusterService; - private ScheduledJobService scheduledJobService; private DataProcessor dataProcessor; private Client client; private JobLifeCycleService jobLifeCycleService; @Before public void instantiateJobAllocator() { - clusterService = Mockito.mock(ClusterService.class); - scheduledJobService = Mockito.mock(ScheduledJobService.class); - dataProcessor = Mockito.mock(DataProcessor.class); - client = Mockito.mock(Client.class); + ClusterService clusterService = mock(ClusterService.class); + ScheduledJobService scheduledJobService = mock(ScheduledJobService.class); + dataProcessor = mock(DataProcessor.class); + client = mock(Client.class); jobLifeCycleService = new JobLifeCycleService(Settings.EMPTY, client, clusterService, scheduledJobService, dataProcessor, Runnable::run); } public void testStartStop() { - jobLifeCycleService.startJob(buildJobBuilder("_job_id").build()); - assertTrue(jobLifeCycleService.localAllocatedJobs.contains("_job_id")); + Allocation.Builder allocation = new Allocation.Builder(); + allocation.setJobId("_job_id"); + jobLifeCycleService.startJob(allocation.build()); + assertTrue(jobLifeCycleService.localAssignedJobs.contains("_job_id")); + verify(dataProcessor).openJob("_job_id", false); + jobLifeCycleService.stopJob("_job_id"); - assertTrue(jobLifeCycleService.localAllocatedJobs.isEmpty()); + assertTrue(jobLifeCycleService.localAssignedJobs.isEmpty()); + verify(dataProcessor).closeJob("_job_id"); } - public void testClusterChanged() { + public void testClusterChanged_startJob() { PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); - pmBuilder.putAllocation("_node_id", "_job_id"); + pmBuilder.createAllocation("_job_id", false); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() @@ -68,66 +70,110 @@ public class JobLifeCycleServiceTests extends ESTestCase { .localNodeId("_node_id")) .build(); jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1)); + assertFalse("not allocated to a node", + jobLifeCycleService.localAssignedJobs.contains("_job_id")); + + pmBuilder = new PrelertMetadata.Builder(); + pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); + pmBuilder.createAllocation("_job_id", false); + pmBuilder.updateStatus("_job_id", JobStatus.OPENED, null); + cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() + .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) + .localNodeId("_node_id")) + .build(); + jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1)); + assertFalse("Status not started", + jobLifeCycleService.localAssignedJobs.contains("_job_id")); + + pmBuilder = new PrelertMetadata.Builder(); + pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); + pmBuilder.createAllocation("_job_id", false); + pmBuilder.assignToNode("_job_id", "_node_id"); + cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() + .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) + .localNodeId("_node_id")) + .build(); + jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1)); assertTrue("Expect allocation, because job allocation says _job_id should be allocated locally", - jobLifeCycleService.localAllocatedJobs.contains("_job_id")); + jobLifeCycleService.localAssignedJobs.contains("_job_id")); + verify(dataProcessor, times(1)).openJob("_job_id", false); - pmBuilder.removeJob("_job_id"); - ClusterState cs2 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() - .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) - .localNodeId("_node_id")) - .build(); - jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs2, cs1)); - assertFalse("Expect no allocation, because the job has been removed", jobLifeCycleService.localAllocatedJobs.contains("_job_id")); + jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1)); + verify(dataProcessor, times(1)).openJob("_job_id", false); } - public void testClusterChanged_GivenJobIsPausing() { + public void testClusterChanged_stopJob() { + jobLifeCycleService.localAssignedJobs.add("_job_id"); + PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); - Job.Builder job = buildJobBuilder("foo"); - pmBuilder.putJob(job.build(), false); - pmBuilder.putAllocation("_node_id", "foo"); - Allocation.Builder allocation = new Allocation.Builder(); - allocation.setJobId("foo"); - allocation.setNodeId("_node_id"); - allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses - allocation.setStatus(JobStatus.PAUSING); - pmBuilder.updateAllocation("foo", allocation.build()); + pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); + pmBuilder.createAllocation("_job_id", false); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) .localNodeId("_node_id")) .build(); - jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1)); + assertEquals("Status is not closing, so nothing happened", jobLifeCycleService.localAssignedJobs.size(), 1); - verify(dataProcessor).closeJob("foo", JobStatus.PAUSED); + pmBuilder = new PrelertMetadata.Builder(); + pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); + pmBuilder.createAllocation("_job_id", false); + pmBuilder.updateStatus("_job_id", JobStatus.OPENED, null); + pmBuilder.updateStatus("_job_id", JobStatus.CLOSING, null); + pmBuilder.assignToNode("_job_id", "_node_id"); + cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() + .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) + .localNodeId("_node_id")) + .build(); + jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1)); + assertEquals(jobLifeCycleService.localAssignedJobs.size(), 0); + verify(dataProcessor, times(1)).closeJob("_job_id"); } - public void testClusterChanged_GivenJobIsPausingAndCloseJobThrows() { + public void testClusterChanged_allocationRemovedStopJob() { + jobLifeCycleService.localAssignedJobs.add("_job_id"); + PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); - Job.Builder job = buildJobBuilder("foo"); - pmBuilder.putJob(job.build(), false); - pmBuilder.putAllocation("_node_id", "foo"); - Allocation.Builder allocation = new Allocation.Builder(); - allocation.setJobId("foo"); - allocation.setNodeId("_node_id"); - allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses - allocation.setStatus(JobStatus.PAUSING); - pmBuilder.updateAllocation("foo", allocation.build()); + pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) .localNodeId("_node_id")) .build(); - doThrow(new ElasticsearchException("")).when(dataProcessor).closeJob("foo", JobStatus.PAUSED); - jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1)); + assertEquals(jobLifeCycleService.localAssignedJobs.size(), 0); + verify(dataProcessor, times(1)).closeJob("_job_id"); + } - verify(dataProcessor).closeJob("foo", JobStatus.PAUSED); - UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("foo", JobStatus.FAILED); + public void testStart_openJobFails() { + doThrow(new RuntimeException("error")).when(dataProcessor).openJob("_job_id", false); + Allocation.Builder allocation = new Allocation.Builder(); + allocation.setJobId("_job_id"); + jobLifeCycleService.startJob(allocation.build()); + assertTrue(jobLifeCycleService.localAssignedJobs.contains("_job_id")); + verify(dataProcessor).openJob("_job_id", false); + UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("_job_id", JobStatus.FAILED); + expectedRequest.setReason("failed to open, error"); + verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any()); + } + + public void testStart_closeJobFails() { + jobLifeCycleService.localAssignedJobs.add("_job_id"); + doThrow(new RuntimeException("error")).when(dataProcessor).closeJob("_job_id"); + jobLifeCycleService.stopJob("_job_id"); + assertEquals(jobLifeCycleService.localAssignedJobs.size(), 0); + verify(dataProcessor).closeJob("_job_id"); + UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("_job_id", JobStatus.FAILED); + expectedRequest.setReason("failed to close, error"); verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java index 4b6cfa9a710..d564b3bac71 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java @@ -41,9 +41,12 @@ public class PrelertMetadataTests extends ESTestCase { builder.putJob(job2, false); builder.putJob(job3, false); - builder.putAllocation(job1.getId(), "node1"); - builder.putAllocation(job2.getId(), "node1"); - builder.putAllocation(job3.getId(), "node1"); + builder.createAllocation(job1.getId(), false); + builder.assignToNode(job1.getId(), "node1"); + builder.createAllocation(job2.getId(), false); + builder.assignToNode(job2.getId(), "node1"); + builder.createAllocation(job3.getId(), false); + builder.assignToNode(job3.getId(), "node1"); PrelertMetadata expected = builder.build(); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -65,9 +68,12 @@ public class PrelertMetadataTests extends ESTestCase { builder.putJob(job2, false); builder.putJob(job3, false); - builder.putAllocation(job1.getId(), "node1"); - builder.putAllocation(job2.getId(), "node1"); - builder.putAllocation(job3.getId(), "node1"); + builder.createAllocation(job1.getId(), false); + builder.assignToNode(job1.getId(), "node1"); + builder.createAllocation(job2.getId(), false); + builder.assignToNode(job2.getId(), "node1"); + builder.createAllocation(job3.getId(), false); + builder.assignToNode(job3.getId(), "node1"); PrelertMetadata expected = builder.build(); @@ -104,19 +110,15 @@ public class PrelertMetadataTests extends ESTestCase { public void testUpdateAllocation_setFinishedTime() { PrelertMetadata.Builder builder = new PrelertMetadata.Builder(); builder.putJob(buildJobBuilder("_job_id").build(), false); - builder.putAllocation("_node_id", "_job_id"); - PrelertMetadata prelertMetadata = builder.build(); + builder.createAllocation("_job_id", false); - builder = new PrelertMetadata.Builder(prelertMetadata); - Allocation.Builder allocation = new Allocation.Builder(); - allocation.setJobId("_job_id"); - allocation.setNodeId("_node_id"); - allocation.setStatus(JobStatus.RUNNING); - builder.updateAllocation("_job_id", allocation.build()); - assertThat(builder.build().getJobs().get("_job_id").getFinishedTime(), nullValue()); - allocation.setStatus(JobStatus.CLOSED); - builder.updateAllocation("_job_id", allocation.build()); - assertThat(builder.build().getJobs().get("_job_id").getFinishedTime(), notNullValue()); + builder.updateStatus("_job_id", JobStatus.OPENED, null); + PrelertMetadata prelertMetadata = builder.build(); + assertThat(prelertMetadata.getJobs().get("_job_id").getFinishedTime(), nullValue()); + + builder.updateStatus("_job_id", JobStatus.CLOSED, null); + prelertMetadata = builder.build(); + assertThat(prelertMetadata.getJobs().get("_job_id").getFinishedTime(), notNullValue()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java index de264d3519e..a00b76e6860 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java @@ -92,8 +92,8 @@ public class ScheduledJobServiceTests extends ESTestCase { public void testStart_GivenNewlyCreatedJobLoopBack() throws IOException { Job.Builder builder = createScheduledJob(); - Allocation allocation = - new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTING, 0L, 60000L)); + Allocation allocation = new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, + new SchedulerState(JobSchedulerStatus.STARTING, 0L, 60000L)); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); when(jobManager.getJobAllocation("foo")).thenReturn(allocation); @@ -115,8 +115,8 @@ public class ScheduledJobServiceTests extends ESTestCase { public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws IOException { Job.Builder builder = createScheduledJob(); - Allocation allocation = - new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTING, 0L, null)); + Allocation allocation = new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, + new SchedulerState(JobSchedulerStatus.STARTING, 0L, null)); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); when(jobManager.getJobAllocation("foo")).thenReturn(allocation); @@ -132,8 +132,8 @@ public class ScheduledJobServiceTests extends ESTestCase { verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any()); - allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), allocation.getStatus(), - new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L)); + allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), false, allocation.getStatus(), + null, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L)); scheduledJobService.stop(allocation); verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); } @@ -146,7 +146,7 @@ public class ScheduledJobServiceTests extends ESTestCase { public void testStop_GivenStartedScheduledJob() throws IOException { Job.Builder builder = createScheduledJob(); Allocation allocation1 = - new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTED, 0L, null)); + new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, 0L, null)); when(jobManager.getJobAllocation("foo")).thenReturn(allocation1); DataExtractor dataExtractor = mock(DataExtractor.class); @@ -160,7 +160,7 @@ public class ScheduledJobServiceTests extends ESTestCase { // Properly stop it to avoid leaking threads in the test Allocation allocation2 = - new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, null)); + new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, null)); scheduledJobService.registry.put("foo", scheduledJobService.createJobScheduler(builder.build())); scheduledJobService.stop(allocation2); diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.close_data.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.close_job.json similarity index 63% rename from elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.close_data.json rename to elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.close_job.json index 135a7fa8de0..752ad57052c 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.close_data.json +++ b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.close_job.json @@ -1,5 +1,5 @@ { - "xpack.prelert.close_data": { + "xpack.prelert.close_job": { "methods": [ "POST" ], "url": { "path": "/_xpack/prelert/data/{job_id}/_close", @@ -9,6 +9,10 @@ "type": "string", "required": true, "description": "The name of the job to close" + }, + "close_timeout": { + "type": "time", + "description": "Controls the time to wait until a job has closed. Default to 30 minutes" } } } diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.open_job.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.open_job.json new file mode 100644 index 00000000000..7d82d694c2f --- /dev/null +++ b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.open_job.json @@ -0,0 +1,25 @@ +{ + "xpack.prelert.open_job": { + "methods": [ "POST" ], + "url": { + "path": "/_xpack/prelert/data/{job_id}/_open", + "paths": [ "/_xpack/prelert/data/{job_id}/_open" ], + "parts": { + "job_id": { + "type": "string", + "required": true, + "description": "The ID of the job to open" + }, + "ignore_downtime": { + "type": "boolean", + "description": "Controls if gaps in data are treated as anomalous or as a maintenance window after a job re-start" + }, + "open_timeout": { + "type": "time", + "description": "Controls the time to wait until a job has opened. Default to 30 minutes" + } + } + }, + "body": null + } +} diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.pause_job.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.pause_job.json deleted file mode 100644 index a20c64e2472..00000000000 --- a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.pause_job.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "xpack.prelert.pause_job": { - "methods": [ "POST" ], - "url": { - "path": "/_xpack/prelert/jobs/{job_id}/_pause", - "paths": [ "/_xpack/prelert/jobs/{job_id}/_pause" ], - "parts": { - "job_id": { - "type": "string", - "required": true, - "description": "The ID of the job to pause" - } - } - }, - "body": null - } -} diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.post_data.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.post_data.json index d44e6577fca..fda693821cb 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.post_data.json +++ b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.post_data.json @@ -12,10 +12,6 @@ } }, "params": { - "ignore_downtime": { - "type": "boolean", - "description": "Controls if gaps in data are treated as anomalous or as a maintenance window after a job re-start" - }, "reset_start": { "type": "string", "description": "Optional parameter to specify the start of the bucket resetting range" diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.resume_job.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.resume_job.json deleted file mode 100644 index b58ac755250..00000000000 --- a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.prelert.resume_job.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "xpack.prelert.resume_job": { - "methods": [ "POST" ], - "url": { - "path": "/_xpack/prelert/jobs/{job_id}/_resume", - "paths": [ "/_xpack/prelert/jobs/{job_id}/_resume" ], - "parts": { - "job_id": { - "type": "string", - "required": true, - "description": "The ID of the job to resume" - } - } - }, - "body": null - } -} diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/delete_model_snapshot.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/delete_model_snapshot.yaml index b91345e7c3d..f44d1b5ffe2 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/delete_model_snapshot.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/delete_model_snapshot.yaml @@ -16,6 +16,14 @@ setup: } } + - do: + xpack.prelert.open_job: + job_id: foo + + - do: + xpack.prelert.close_job: + job_id: foo + - do: index: index: prelertresults-foo diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml index 4de0bf69443..000a3cb5008 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml @@ -16,6 +16,10 @@ setup: } } + - do: + xpack.prelert.open_job: + job_id: job-stats-test + - do: xpack.prelert.put_job: body: > @@ -36,6 +40,9 @@ setup: "types":["response"] } } + - do: + xpack.prelert.open_job: + job_id: scheduled-job - do: index: @@ -119,7 +126,7 @@ setup: - is_false: jobs.0.data_counts - is_false: jobs.0.model_size_stats - is_false: jobs.0.scheduler_state - - match: { jobs.0.status: RUNNING } + - match: { jobs.0.status: OPENED } - do: xpack.prelert.get_jobs: @@ -129,7 +136,7 @@ setup: - is_true: jobs.0.data_counts - is_false: jobs.0.scheduler_state - match: { jobs.0.job_id: job-stats-test } - - match: { jobs.0.status: RUNNING } + - match: { jobs.0.status: OPENED } - do: xpack.prelert.get_jobs: diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/post_data.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/post_data.yaml index 9633feb3f1f..fe5190c88a8 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/post_data.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/post_data.yaml @@ -16,6 +16,10 @@ setup: } } + - do: + xpack.prelert.open_job: + job_id: farequote + --- "Test POST data job api, flush, close and verify DataCounts doc": - do: @@ -41,7 +45,7 @@ setup: - match: { acknowledged: true } - do: - xpack.prelert.close_data: + xpack.prelert.close_job: job_id: farequote - match: { acknowledged: true } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml index e625090ff1b..6b2dee8b7b8 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml @@ -15,6 +15,15 @@ setup: "time_format":"yyyy-MM-dd HH:mm:ssX" } } + + - do: + xpack.prelert.open_job: + job_id: foo + + - do: + xpack.prelert.close_job: + job_id: foo + - do: index: index: prelertresults-foo