[ML] Create job index before adding job to cluster state (elastic/x-pack-elasticsearch#943)
Original commit: elastic/x-pack-elasticsearch@1b586c7763
This commit is contained in:
parent
8a87a91897
commit
5716b2bf16
|
@ -166,37 +166,36 @@ public class JobManager extends AbstractComponent {
|
|||
public void putJob(PutJobAction.Request request, ClusterState state, ActionListener<PutJobAction.Response> actionListener) {
|
||||
Job job = request.getJob().build(new Date());
|
||||
|
||||
ActionListener<Boolean> createResultsIndexListener = ActionListener.wrap(jobSaved ->
|
||||
jobProvider.createJobResultIndex(job, state, new ActionListener<Boolean>() {
|
||||
jobProvider.createJobResultIndex(job, state, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean indicesCreated) {
|
||||
auditor.info(job.getId(), Messages.getMessage(Messages.JOB_AUDIT_CREATED));
|
||||
|
||||
// Also I wonder if we need to audit log infra
|
||||
// structure in ml as when we merge into xpack
|
||||
// we can use its audit trailing. See:
|
||||
// https://github.com/elastic/prelert-legacy/issues/48
|
||||
actionListener.onResponse(new PutJobAction.Response(jobSaved && indicesCreated, job));
|
||||
clusterService.submitStateUpdateTask("put-job-" + job.getId(),
|
||||
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
|
||||
@Override
|
||||
protected PutJobAction.Response newResponse(boolean acknowledged) {
|
||||
return new PutJobAction.Response(acknowledged, job);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return updateClusterState(job, false, currentState);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
actionListener.onFailure(e);
|
||||
if (e instanceof IllegalArgumentException
|
||||
&& e.getMessage().matches("mapper \\[.*\\] of different type, current_type \\[.*\\], merged_type \\[.*\\]")) {
|
||||
actionListener.onFailure(new IllegalArgumentException(Messages.JOB_CONFIG_MAPPING_TYPE_CLASH, e));
|
||||
} else {
|
||||
actionListener.onFailure(e);
|
||||
}
|
||||
|
||||
}
|
||||
}), actionListener::onFailure);
|
||||
|
||||
clusterService.submitStateUpdateTask("put-job-" + job.getId(),
|
||||
new AckedClusterStateUpdateTask<Boolean>(request, createResultsIndexListener) {
|
||||
@Override
|
||||
protected Boolean newResponse(boolean acknowledged) {
|
||||
return acknowledged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return updateClusterState(job, false, currentState);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener<PutJobAction.Response> actionListener) {
|
||||
|
|
|
@ -123,6 +123,9 @@ public final class Messages {
|
|||
"{0} and {1} cannot be the same: ''{2}''";
|
||||
public static final String JOB_CONFIG_DETECTOR_COUNT_DISALLOWED =
|
||||
"''count'' is not a permitted value for {0}";
|
||||
public static final String JOB_CONFIG_MAPPING_TYPE_CLASH =
|
||||
"A field has a different mapping type to an existing field with the same name. " +
|
||||
"Use the 'results_index_name' setting to assign the job to another index";
|
||||
|
||||
public static final String JOB_UNKNOWN_ID = "No known job with id ''{0}''";
|
||||
|
||||
|
|
|
@ -404,6 +404,33 @@ public class MlJobIT extends ESRestTestCase {
|
|||
assertThat(responseAsString, containsString(byFieldName2));
|
||||
}
|
||||
|
||||
public void testCreateJob_WithClashingFieldMappingsFails() throws Exception {
|
||||
String jobTemplate = "{\n" +
|
||||
" \"analysis_config\" : {\n" +
|
||||
" \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"metric\", \"by_field_name\":\"%s\"}]\n" +
|
||||
" }" +
|
||||
"}";
|
||||
|
||||
String jobId1 = "job-with-response-field";
|
||||
String byFieldName1 = "response";
|
||||
String jobId2 = "job-will-fail-with-mapping-error-on-response-field";
|
||||
String byFieldName2 = "response.time";
|
||||
String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1);
|
||||
|
||||
Response response = client().performRequest("put", MachineLearning.BASE_PATH
|
||||
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
|
||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||
|
||||
final String failingJobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2);
|
||||
ResponseException e = expectThrows(ResponseException.class,
|
||||
() -> client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2,
|
||||
Collections.emptyMap(), new StringEntity(failingJobConfig, ContentType.APPLICATION_JSON)));
|
||||
|
||||
assertThat(e.getMessage(),
|
||||
containsString("A field has a different mapping type to an existing field with the same name. " +
|
||||
"Use the 'results_index_name' setting to assign the job to another index"));
|
||||
}
|
||||
|
||||
public void testDeleteJob() throws Exception {
|
||||
String jobId = "foo";
|
||||
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
|
||||
|
|
Loading…
Reference in New Issue