diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index f5ab7d93a4b..25c8598dc7d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -163,7 +163,12 @@ public class JobManager extends AbstractComponent { public void putJob(PutJobAction.Request request, ClusterState state, ActionListener actionListener) { Job job = request.getJobBuilder().build(new Date()); - jobProvider.createJobResultIndex(job, state, new ActionListener() { + MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE); + if (currentMlMetadata.getJobs().containsKey(job.getId())) { + actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId())); + } + + ActionListener putJobListener = new ActionListener() { @Override public void onResponse(Boolean indicesCreated) { @@ -192,7 +197,16 @@ public class JobManager extends AbstractComponent { } } - }); + }; + + ActionListener checkForLeftOverDocs = ActionListener.wrap( + response -> { + jobProvider.createJobResultIndex(job, state, putJobListener); + }, + actionListener::onFailure + ); + + jobProvider.checkForLeftOverDocuments(job, checkForLeftOverDocs); } public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener actionListener) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index cd7c2958b2c..1902818d900 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -111,6 +111,65 @@ public class JobProvider { this.settings = settings; } + /** + * Check that a previously deleted job with the same Id has not left any result + * or categorizer state documents due to a failed delete. Any left over results would + * appear to be part of the new job. + * + * We can't check for model state as the Id is based on the snapshot Id which is + * a timestamp and so unpredictable however, it is unlikely a new job would have + * the same snapshot Id as an old one. + * + * @param job Job configuration + * @param listener The ActionListener + */ + public void checkForLeftOverDocuments(Job job, ActionListener listener) { + + String resultsIndexName = job.getResultsIndexName(); + SearchRequestBuilder stateDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + .setQuery(QueryBuilders.idsQuery().addIds(CategorizerState.documentId(job.getId(), 1), + CategorizerState.v54DocumentId(job.getId(), 1))) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()); + + SearchRequestBuilder quantilesDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId()))) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()); + + SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId())) + .setSize(1); + + + ActionListener searchResponseActionListener = new ActionListener() { + @Override + public void onResponse(MultiSearchResponse searchResponse) { + for (MultiSearchResponse.Item itemResponse : searchResponse.getResponses()) { + if (itemResponse.getResponse().getHits().getTotalHits() > 0) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + "Result and/or state documents exist for a prior job with Id [" + job.getId() + "]. " + + "Please create the job with a different Id")); + return; + } + } + + listener.onResponse(true); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }; + + client.prepareMultiSearch() + .add(stateDocSearch) + .add(resultDocSearch) + .add(quantilesDocSearch) + .execute(searchResponseActionListener); + } + + /** * Create the Elasticsearch index and the mappings */ diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 000576256ba..5606ad8abcc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; @@ -137,7 +138,9 @@ public class JobManagerTests extends ESTestCase { return null; }).when(jobProvider).createJobResultIndex(requestCaptor.capture(), any(ClusterState.class), any(ActionListener.class)); - jobManager.putJob(putJobRequest, mock(ClusterState.class), new ActionListener() { + ClusterState clusterState = createClusterState(); + + jobManager.putJob(putJobRequest, clusterState, new ActionListener() { @Override public void onResponse(PutJobAction.Response response) { Job job = requestCaptor.getValue(); @@ -155,6 +158,29 @@ public class JobManagerTests extends ESTestCase { }); } + public void testPutJob_ThrowsIfJobExists() { + JobManager jobManager = createJobManager(); + PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob()); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("foo").build(), false); + ClusterState clusterState = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())).build(); + + expectThrows(ResourceAlreadyExistsException.class, () -> + jobManager.putJob(putJobRequest, clusterState, new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + })); + } + private Job.Builder createJob() { Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("client"); diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index 9df0200da39..9a0ce4f3f50 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -551,3 +551,94 @@ job_id: jobs-crud-close-a-closed-job force: true - match: { closed: true } + +--- +"Test cannot create job with existing categorizer state document": + + - do: + index: + index: .ml-state + type: doc + id: jobs-crud-existing-docs_categorizer_state#1 + body: + key: value + + - do: + indices.refresh: {} + + - do: + catch: /status_exception/ + xpack.ml.put_job: + job_id: jobs-crud-existing-docs + body: > + { + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + } + } + +--- +"Test cannot create job with existing quantiles document": + + - do: + index: + index: .ml-state + type: doc + id: jobs-crud-existing-docs_quantiles + body: + key: value + + - do: + indices.refresh: {} + + - do: + catch: /status_exception/ + xpack.ml.put_job: + job_id: jobs-crud-existing-docs + body: > + { + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + } + } + + +--- +"Test cannot create job with existing result document": + + - do: + index: + index: .ml-anomalies-shared + type: doc + id: "jobs-crud-existing-result-docs_1464739200000_1" + body: + { + "job_id": "jobs-crud-existing-result-docs", + "result_type": "bucket", + "timestamp": "2016-06-01T00:00:00Z", + "anomaly_score": 90.0, + "bucket_span":1 + } + + - do: + indices.refresh: {} + + - do: + catch: /status_exception/ + xpack.ml.put_job: + job_id: jobs-crud-existing-result-docs + body: > + { + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + } + }