[ML] Validate that no documents exist with the new job_id (elastic/x-pack-elasticsearch#1624)

* Validate that no documents exist with the new job_id

Original commit: elastic/x-pack-elasticsearch@acdfb7b5a9
This commit is contained in:
David Kyle 2017-06-05 10:04:18 +01:00
parent 955968c53c
commit 5f76bbd58d
4 changed files with 193 additions and 3 deletions

View File

@ -163,7 +163,12 @@ public class JobManager extends AbstractComponent {
public void putJob(PutJobAction.Request request, ClusterState state, ActionListener<PutJobAction.Response> actionListener) { public void putJob(PutJobAction.Request request, ClusterState state, ActionListener<PutJobAction.Response> actionListener) {
Job job = request.getJobBuilder().build(new Date()); Job job = request.getJobBuilder().build(new Date());
jobProvider.createJobResultIndex(job, state, new ActionListener<Boolean>() { MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (currentMlMetadata.getJobs().containsKey(job.getId())) {
actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId()));
}
ActionListener<Boolean> putJobListener = new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean indicesCreated) { public void onResponse(Boolean indicesCreated) {
@ -192,7 +197,16 @@ public class JobManager extends AbstractComponent {
} }
} }
}); };
ActionListener<Boolean> checkForLeftOverDocs = ActionListener.wrap(
response -> {
jobProvider.createJobResultIndex(job, state, putJobListener);
},
actionListener::onFailure
);
jobProvider.checkForLeftOverDocuments(job, checkForLeftOverDocs);
} }
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener<PutJobAction.Response> actionListener) { public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener<PutJobAction.Response> actionListener) {

View File

@ -111,6 +111,65 @@ public class JobProvider {
this.settings = settings; this.settings = settings;
} }
/**
* Check that a previously deleted job with the same Id has not left any result
* or categorizer state documents due to a failed delete. Any left over results would
* appear to be part of the new job.
*
* We can't check for model state as the Id is based on the snapshot Id which is
* a timestamp and so unpredictable however, it is unlikely a new job would have
* the same snapshot Id as an old one.
*
* @param job Job configuration
* @param listener The ActionListener
*/
public void checkForLeftOverDocuments(Job job, ActionListener<Boolean> listener) {
String resultsIndexName = job.getResultsIndexName();
SearchRequestBuilder stateDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
.setQuery(QueryBuilders.idsQuery().addIds(CategorizerState.documentId(job.getId(), 1),
CategorizerState.v54DocumentId(job.getId(), 1)))
.setIndicesOptions(IndicesOptions.lenientExpandOpen());
SearchRequestBuilder quantilesDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
.setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId())))
.setIndicesOptions(IndicesOptions.lenientExpandOpen());
SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
.setSize(1);
ActionListener<MultiSearchResponse> searchResponseActionListener = new ActionListener<MultiSearchResponse>() {
@Override
public void onResponse(MultiSearchResponse searchResponse) {
for (MultiSearchResponse.Item itemResponse : searchResponse.getResponses()) {
if (itemResponse.getResponse().getHits().getTotalHits() > 0) {
listener.onFailure(ExceptionsHelper.conflictStatusException(
"Result and/or state documents exist for a prior job with Id [" + job.getId() + "]. " +
"Please create the job with a different Id"));
return;
}
}
listener.onResponse(true);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
client.prepareMultiSearch()
.add(stateDocSearch)
.add(resultDocSearch)
.add(quantilesDocSearch)
.execute(searchResponseActionListener);
}
/** /**
* Create the Elasticsearch index and the mappings * Create the Elasticsearch index and the mappings
*/ */

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.job; package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -137,7 +138,9 @@ public class JobManagerTests extends ESTestCase {
return null; return null;
}).when(jobProvider).createJobResultIndex(requestCaptor.capture(), any(ClusterState.class), any(ActionListener.class)); }).when(jobProvider).createJobResultIndex(requestCaptor.capture(), any(ClusterState.class), any(ActionListener.class));
jobManager.putJob(putJobRequest, mock(ClusterState.class), new ActionListener<PutJobAction.Response>() { ClusterState clusterState = createClusterState();
jobManager.putJob(putJobRequest, clusterState, new ActionListener<PutJobAction.Response>() {
@Override @Override
public void onResponse(PutJobAction.Response response) { public void onResponse(PutJobAction.Response response) {
Job job = requestCaptor.getValue(); Job job = requestCaptor.getValue();
@ -155,6 +158,29 @@ public class JobManagerTests extends ESTestCase {
}); });
} }
public void testPutJob_ThrowsIfJobExists() {
JobManager jobManager = createJobManager();
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob());
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("foo").build(), false);
ClusterState clusterState = ClusterState.builder(new ClusterName("name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())).build();
expectThrows(ResourceAlreadyExistsException.class, () ->
jobManager.putJob(putJobRequest, clusterState, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
}));
}
private Job.Builder createJob() { private Job.Builder createJob() {
Detector.Builder d1 = new Detector.Builder("info_content", "domain"); Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("client"); d1.setOverFieldName("client");

View File

@ -551,3 +551,94 @@
job_id: jobs-crud-close-a-closed-job job_id: jobs-crud-close-a-closed-job
force: true force: true
- match: { closed: true } - match: { closed: true }
---
"Test cannot create job with existing categorizer state document":
- do:
index:
index: .ml-state
type: doc
id: jobs-crud-existing-docs_categorizer_state#1
body:
key: value
- do:
indices.refresh: {}
- do:
catch: /status_exception/
xpack.ml.put_job:
job_id: jobs-crud-existing-docs
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
---
"Test cannot create job with existing quantiles document":
- do:
index:
index: .ml-state
type: doc
id: jobs-crud-existing-docs_quantiles
body:
key: value
- do:
indices.refresh: {}
- do:
catch: /status_exception/
xpack.ml.put_job:
job_id: jobs-crud-existing-docs
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
---
"Test cannot create job with existing result document":
- do:
index:
index: .ml-anomalies-shared
type: doc
id: "jobs-crud-existing-result-docs_1464739200000_1"
body:
{
"job_id": "jobs-crud-existing-result-docs",
"result_type": "bucket",
"timestamp": "2016-06-01T00:00:00Z",
"anomaly_score": 90.0,
"bucket_span":1
}
- do:
indices.refresh: {}
- do:
catch: /status_exception/
xpack.ml.put_job:
job_id: jobs-crud-existing-result-docs
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}