Add job update endpoint (elastic/elasticsearch#854)

* Remove redundant code

* Add job update endpoint

* Support updating detector description & rules

* Fix merge conflicts

* Use toStrings and fix race condition in update

* Revert to using xpack.ml.support.AbstractSerializingTestCase

Original commit: elastic/x-pack-elasticsearch@771ada0572
This commit is contained in:
David Kyle 2017-02-03 14:22:36 +00:00 committed by GitHub
parent 2883b00b7c
commit 70b8129b78
12 changed files with 864 additions and 19 deletions

View File

@ -60,6 +60,7 @@ import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStateAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
@ -98,6 +99,7 @@ import org.elasticsearch.xpack.ml.rest.job.RestGetJobStatsAction;
import org.elasticsearch.xpack.ml.rest.job.RestGetJobsAction;
import org.elasticsearch.xpack.ml.rest.job.RestOpenJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestPostDataAction;
import org.elasticsearch.xpack.ml.rest.job.RestPostJobUpdateAction;
import org.elasticsearch.xpack.ml.rest.job.RestPutJobAction;
import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestDeleteModelSnapshotAction;
import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestGetModelSnapshotsAction;
@ -259,6 +261,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new RestGetJobsAction(settings, restController),
new RestGetJobStatsAction(settings, restController),
new RestPutJobAction(settings, restController),
new RestPostJobUpdateAction(settings, restController),
new RestDeleteJobAction(settings, restController),
new RestOpenJobAction(settings, restController),
new RestGetFiltersAction(settings, restController),
@ -295,6 +298,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(GetJobsAction.INSTANCE, GetJobsAction.TransportAction.class),
new ActionHandler<>(GetJobsStatsAction.INSTANCE, GetJobsStatsAction.TransportAction.class),
new ActionHandler<>(PutJobAction.INSTANCE, PutJobAction.TransportAction.class),
new ActionHandler<>(UpdateJobAction.INSTANCE, UpdateJobAction.TransportAction.class),
new ActionHandler<>(DeleteJobAction.INSTANCE, DeleteJobAction.TransportAction.class),
new ActionHandler<>(OpenJobAction.INSTANCE, OpenJobAction.TransportAction.class),
new ActionHandler<>(InternalOpenJobAction.INSTANCE, InternalOpenJobAction.TransportAction.class),

View File

@ -214,7 +214,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
validate(mlMetadata, request.getJobId());
InternalOpenJobAction.Request internalRequest = new InternalOpenJobAction.Request(request.jobId);
internalRequest.setIgnoreDowntime(internalRequest.isIgnoreDowntime());
internalOpenJobAction.execute(internalRequest, LoggingTaskListener.instance());
observer.waitForState(request.getJobId(), request.openTimeout, JobState.OPENED, e -> {
if (e != null) {

View File

@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import java.io.IOException;
import java.util.Objects;
public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobAction.Response, UpdateJobAction.RequestBuilder> {
public static final UpdateJobAction INSTANCE = new UpdateJobAction();
public static final String NAME = "cluster:admin/ml/job/update";
private UpdateJobAction() {
super(NAME);
}
@Override
public UpdateJobAction.RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new UpdateJobAction.RequestBuilder(client, this);
}
@Override
public PutJobAction.Response newResponse() {
return new PutJobAction.Response();
}
public static class Request extends AcknowledgedRequest<UpdateJobAction.Request> implements ToXContent {
public static UpdateJobAction.Request parseRequest(String jobId, XContentParser parser) {
JobUpdate update = JobUpdate.PARSER.apply(parser, null);
return new UpdateJobAction.Request(jobId, update);
}
private String jobId;
private JobUpdate update;
public Request(String jobId, JobUpdate update) {
this.jobId = jobId;
this.update = update;
}
Request() {
}
public String getJobId() {
return jobId;
}
public JobUpdate getJobUpdate() {
return update;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
update = new JobUpdate(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
update.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
update.toXContent(builder, params);
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateJobAction.Request request = (UpdateJobAction.Request) o;
return Objects.equals(update, request.update);
}
@Override
public int hashCode() {
return Objects.hash(update);
}
@Override
public final String toString() {
return Strings.toString(this);
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, PutJobAction.Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, UpdateJobAction action) {
super(client, action, new Request());
}
}
public static class TransportAction extends TransportMasterNodeAction<UpdateJobAction.Request, PutJobAction.Response> {
private final JobManager jobManager;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager) {
super(settings, UpdateJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, UpdateJobAction.Request::new);
this.jobManager = jobManager;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected PutJobAction.Response newResponse() {
return new PutJobAction.Response();
}
@Override
protected void masterOperation(UpdateJobAction.Request request, ClusterState state,
ActionListener<PutJobAction.Response> listener) throws Exception {
if (request.getJobId().equals(Job.ALL)) {
throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update");
}
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, listener);
}
@Override
protected ClusterBlockException checkBlock(UpdateJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
@ -26,6 +27,7 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.IgnoreDowntime;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
@ -202,13 +204,31 @@ public class JobManager extends AbstractComponent {
});
}
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener<PutJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("update-job-" + jobId,
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
private Job updatedJob;
@Override
protected PutJobAction.Response newResponse(boolean acknowledged) {
return new PutJobAction.Response(acknowledged, updatedJob);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Job job = getJob(jobId, currentState).results().get(0);
updatedJob = jobUpdate.mergeWithJob(job);
return updateClusterState(updatedJob, true, currentState);
}
});
}
ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) {
MlMetadata.Builder builder = createMlMetadataBuilder(currentState);
builder.putJob(job, overwrite);
return buildNewClusterState(currentState, builder);
}
public void deleteJob(Client client, DeleteJobAction.Request request, ActionListener<DeleteJobAction.Response> actionListener) {
String jobId = request.getJobId();
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);

View File

@ -51,7 +51,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
private static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
private static final ParseField BATCH_SPAN = new ParseField("batch_span");
private static final ParseField CATEGORIZATION_FIELD_NAME = new ParseField("categorization_field_name");
private static final ParseField CATEGORIZATION_FILTERS = new ParseField("categorization_filters");
public static final ParseField CATEGORIZATION_FILTERS = new ParseField("categorization_filters");
private static final ParseField LATENCY = new ParseField("latency");
private static final ParseField PERIOD = new ParseField("period");
private static final ParseField SUMMARY_COUNT_FIELD_NAME = new ParseField("summary_count_field_name");

View File

@ -26,7 +26,6 @@ import java.util.stream.Collectors;
public class DetectionRule extends ToXContentToBytes implements Writeable {
public static final ParseField DETECTION_RULE_FIELD = new ParseField("detection_rule");
public static final ParseField RULE_ACTION_FIELD = new ParseField("rule_action");
public static final ParseField TARGET_FIELD_NAME_FIELD = new ParseField("target_field_name");
public static final ParseField TARGET_FIELD_VALUE_FIELD = new ParseField("target_field_value");
public static final ParseField CONDITIONS_CONNECTIVE_FIELD = new ParseField("conditions_connective");

View File

@ -77,7 +77,6 @@ public class Detector extends ToXContentToBytes implements Writeable {
}
}
public static final ParseField DETECTOR_FIELD = new ParseField("detector");
public static final ParseField DETECTOR_DESCRIPTION_FIELD = new ParseField("detector_description");
public static final ParseField FUNCTION_FIELD = new ParseField("function");
public static final ParseField FIELD_NAME_FIELD = new ParseField("field_name");

View File

@ -0,0 +1,376 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.config;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class JobUpdate implements Writeable, ToXContent {
public static final ParseField DETECTORS = new ParseField("detectors");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<JobUpdate, Void> PARSER =
new ConstructingObjectParser<>("job_update", a -> new JobUpdate((String) a[0], (List<DetectorUpdate>) a[1],
(ModelDebugConfig) a[2], (AnalysisLimits) a[3], (Long) a[4], (Long) a[5], (Long) a[6], (Long) a[7],
(List<String>) a[8], (Map<String, Object>) a[9]));
static {
PARSER.declareStringOrNull(ConstructingObjectParser.optionalConstructorArg(), Job.DESCRIPTION);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), DetectorUpdate.PARSER, DETECTORS);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelDebugConfig.PARSER, Job.MODEL_DEBUG_CONFIG);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), AnalysisLimits.PARSER, Job.ANALYSIS_LIMITS);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.BACKGROUND_PERSIST_INTERVAL);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.RENORMALIZATION_WINDOW_DAYS);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.RESULTS_RETENTION_DAYS);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), Job.MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), Job.CUSTOM_SETTINGS,
ObjectParser.ValueType.OBJECT);
}
private final String description;
private final List<DetectorUpdate> detectorUpdates;
private final ModelDebugConfig modelDebugConfig;
private final AnalysisLimits analysisLimits;
private final Long renormalizationWindowDays;
private final Long backgroundPersistInterval;
private final Long modelSnapshotRetentionDays;
private final Long resultsRetentionDays;
private final List<String> categorizationFilters;
private final Map<String, Object> customSettings;
public JobUpdate(@Nullable String description, @Nullable List<DetectorUpdate> detectorUpdates,
@Nullable ModelDebugConfig modelDebugConfig, @Nullable AnalysisLimits analysisLimits,
@Nullable Long backgroundPersistInterval, @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings) {
this.description = description;
this.detectorUpdates = detectorUpdates;
this.modelDebugConfig = modelDebugConfig;
this.analysisLimits = analysisLimits;
this.renormalizationWindowDays = renormalizationWindowDays;
this.backgroundPersistInterval = backgroundPersistInterval;
this.modelSnapshotRetentionDays = modelSnapshotRetentionDays;
this.resultsRetentionDays = resultsRetentionDays;
this.categorizationFilters = categorisationFilters;
this.customSettings = customSettings;
}
public JobUpdate(StreamInput in) throws IOException {
description = in.readOptionalString();
if (in.readBoolean()) {
detectorUpdates = in.readList(DetectorUpdate::new);
} else {
detectorUpdates = null;
}
modelDebugConfig = in.readOptionalWriteable(ModelDebugConfig::new);
analysisLimits = in.readOptionalWriteable(AnalysisLimits::new);
renormalizationWindowDays = in.readOptionalLong();
backgroundPersistInterval = in.readOptionalLong();
modelSnapshotRetentionDays = in.readOptionalLong();
resultsRetentionDays = in.readOptionalLong();
if (in.readBoolean()) {
categorizationFilters = in.readList(StreamInput::readString);
} else {
categorizationFilters = null;
}
customSettings = in.readMap();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(description);
out.writeBoolean(detectorUpdates != null);
if (detectorUpdates != null) {
out.writeList(detectorUpdates);
}
out.writeOptionalWriteable(modelDebugConfig);
out.writeOptionalWriteable(analysisLimits);
out.writeOptionalLong(renormalizationWindowDays);
out.writeOptionalLong(backgroundPersistInterval);
out.writeOptionalLong(modelSnapshotRetentionDays);
out.writeOptionalLong(resultsRetentionDays);
out.writeBoolean(categorizationFilters != null);
if (categorizationFilters != null) {
out.writeStringList(categorizationFilters);
}
out.writeMap(customSettings);
}
public String getDescription() {
return description;
}
public List<DetectorUpdate> getDetectorUpdates() {
return detectorUpdates;
}
public ModelDebugConfig getModelDebugConfig() {
return modelDebugConfig;
}
public AnalysisLimits getAnalysisLimits() {
return analysisLimits;
}
public Long getRenormalizationWindowDays() {
return renormalizationWindowDays;
}
public Long getBackgroundPersistInterval() {
return backgroundPersistInterval;
}
public Long getModelSnapshotRetentionDays() {
return modelSnapshotRetentionDays;
}
public Long getResultsRetentionDays() {
return resultsRetentionDays;
}
public List<String> getCategorizationFilters() {
return categorizationFilters;
}
public Map<String, Object> getCustomSettings() {
return customSettings;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (description != null) {
builder.field(Job.DESCRIPTION.getPreferredName(), description);
}
if (detectorUpdates != null) {
builder.field(DETECTORS.getPreferredName(), detectorUpdates);
}
if (modelDebugConfig != null) {
builder.field(Job.MODEL_DEBUG_CONFIG.getPreferredName(), modelDebugConfig);
}
if (analysisLimits != null) {
builder.field(Job.ANALYSIS_LIMITS.getPreferredName(), analysisLimits);
}
if (renormalizationWindowDays != null) {
builder.field(Job.RENORMALIZATION_WINDOW_DAYS.getPreferredName(), renormalizationWindowDays);
}
if (backgroundPersistInterval != null) {
builder.field(Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName(), backgroundPersistInterval);
}
if (modelSnapshotRetentionDays != null) {
builder.field(Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays);
}
if (resultsRetentionDays != null) {
builder.field(Job.RESULTS_RETENTION_DAYS.getPreferredName(), resultsRetentionDays);
}
if (categorizationFilters != null) {
builder.field(AnalysisConfig.CATEGORIZATION_FILTERS.getPreferredName(), categorizationFilters);
}
if (customSettings != null) {
builder.field(Job.CUSTOM_SETTINGS.getPreferredName(), customSettings);
}
builder.endObject();
return builder;
}
/**
* Updates {@code source} with the new values in this object returning a new {@link Job}.
*
* @param source Source job to be updated
* @return A new job equivalent to {@code source} updated.
*/
public Job mergeWithJob(Job source) {
Job.Builder builder = new Job.Builder(source);
if (description != null) {
builder.setDescription(description);
}
if (detectorUpdates != null && detectorUpdates.isEmpty() == false) {
AnalysisConfig ac = source.getAnalysisConfig();
int numDetectors = ac.getDetectors().size();
for (DetectorUpdate dd : detectorUpdates) {
if (dd.getIndex() >= numDetectors) {
throw new IllegalArgumentException("Detector index is >= the number of detectors");
}
Detector.Builder detectorbuilder = new Detector.Builder(ac.getDetectors().get(dd.getIndex()));
if (dd.getDescription() != null) {
detectorbuilder.setDetectorDescription(dd.getDescription());
}
if (dd.getRules() != null) {
detectorbuilder.setDetectorRules(dd.getRules());
}
ac.getDetectors().set(dd.getIndex(), detectorbuilder.build());
}
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(ac);
builder.setAnalysisConfig(acBuilder);
}
if (modelDebugConfig != null) {
builder.setModelDebugConfig(modelDebugConfig);
}
if (analysisLimits != null) {
builder.setAnalysisLimits(analysisLimits);
}
if (renormalizationWindowDays != null) {
builder.setRenormalizationWindowDays(renormalizationWindowDays);
}
if (backgroundPersistInterval != null) {
builder.setBackgroundPersistInterval(backgroundPersistInterval);
}
if (modelSnapshotRetentionDays != null) {
builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays);
}
if (resultsRetentionDays != null) {
builder.setResultsRetentionDays(resultsRetentionDays);
}
if (categorizationFilters != null) {
AnalysisConfig.Builder analysisConfigBuilder = new AnalysisConfig.Builder(source.getAnalysisConfig());
analysisConfigBuilder.setCategorizationFilters(categorizationFilters);
builder.setAnalysisConfig(analysisConfigBuilder);
}
if (customSettings != null) {
builder.setCustomSettings(customSettings);
}
return builder.build();
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof JobUpdate == false) {
return false;
}
JobUpdate that = (JobUpdate) other;
return Objects.equals(this.description, that.description)
&& Objects.equals(this.detectorUpdates, that.detectorUpdates)
&& Objects.equals(this.modelDebugConfig, that.modelDebugConfig)
&& Objects.equals(this.analysisLimits, that.analysisLimits)
&& Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays)
&& Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval)
&& Objects.equals(this.modelSnapshotRetentionDays, that.modelSnapshotRetentionDays)
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.categorizationFilters, that.categorizationFilters)
&& Objects.equals(this.customSettings, that.customSettings);
}
@Override
public int hashCode() {
return Objects.hash(description, detectorUpdates, modelDebugConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings);
}
public static class DetectorUpdate implements Writeable, ToXContent {
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DetectorUpdate, Void> PARSER =
new ConstructingObjectParser<>("detector_update", a -> new DetectorUpdate((int) a[0], (String) a[1],
(List<DetectionRule>) a[2]));
public static final ParseField INDEX = new ParseField("index");
public static final ParseField RULES = new ParseField("rules");
static {
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), INDEX);
PARSER.declareStringOrNull(ConstructingObjectParser.optionalConstructorArg(), Job.DESCRIPTION);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), DetectionRule.PARSER, RULES);
}
private int index;
private String description;
private List<DetectionRule> rules;
public DetectorUpdate(int index, String description, List<DetectionRule> rules) {
this.index = index;
this.description = description;
this.rules = rules;
}
public DetectorUpdate(StreamInput in) throws IOException {
index = in.readInt();
description = in.readOptionalString();
if (in.readBoolean()) {
rules = in.readList(DetectionRule::new);
} else {
rules = null;
}
}
public int getIndex() {
return index;
}
public String getDescription() {
return description;
}
public List<DetectionRule> getRules() {
return rules;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(index);
out.writeOptionalString(description);
out.writeBoolean(rules != null);
if (rules != null) {
out.writeList(rules);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
if (description != null) {
builder.field(Job.DESCRIPTION.getPreferredName(), description);
}
if (rules != null) {
builder.field(RULES.getPreferredName(), rules);
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(index, description, rules);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof DetectorUpdate == false) {
return false;
}
DetectorUpdate that = (DetectorUpdate) other;
return this.index == that.index && Objects.equals(this.description, that.description)
&& Objects.equals(this.rules, that.rules);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
public class RestPostJobUpdateAction extends BaseRestHandler {
public RestPostJobUpdateAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST,
MlPlugin.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/_update", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String jobId = restRequest.param(Job.ID.getPreferredName());
XContentParser parser = restRequest.contentParser();
UpdateJobAction.Request updateJobRequest = UpdateJobAction.Request.parseRequest(jobId, parser);
return channel -> client.execute(UpdateJobAction.INSTANCE, updateJobRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,142 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.config;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
@Override
protected JobUpdate createTestInstance() {
String description = null;
if (randomBoolean()) {
description = randomAsciiOfLength(20);
}
List<JobUpdate.DetectorUpdate> detectorUpdates = null;
if (randomBoolean()) {
int size = randomInt(10);
detectorUpdates = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
String detectorDescription = null;
if (randomBoolean()) {
detectorDescription = randomAsciiOfLength(12);
}
List<DetectionRule> detectionRules = null;
if (randomBoolean()) {
detectionRules = new ArrayList<>();
Condition condition = new Condition(Operator.GT, "5");
detectionRules.add(new DetectionRule("foo", null, Connective.OR, Collections.singletonList(
new RuleCondition(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition, null))));
}
detectorUpdates.add(new JobUpdate.DetectorUpdate(i, detectorDescription, detectionRules));
}
}
ModelDebugConfig modelDebugConfig = null;
if (randomBoolean()) {
modelDebugConfig = new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10));
}
AnalysisLimits analysisLimits = null;
if (randomBoolean()) {
analysisLimits = new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong());
}
Long renormalizationWindowDays = null;
if (randomBoolean()) {
renormalizationWindowDays = randomNonNegativeLong();
}
Long backgroundPersistInterval = null;
if (randomBoolean()) {
backgroundPersistInterval = randomNonNegativeLong();
}
Long modelSnapshotRetentionDays = null;
if (randomBoolean()) {
modelSnapshotRetentionDays = randomNonNegativeLong();
}
Long resultsRetentionDays = null;
if (randomBoolean()) {
resultsRetentionDays = randomNonNegativeLong();
}
List<String> categorizationFilters = null;
if (randomBoolean()) {
categorizationFilters = Arrays.asList(generateRandomStringArray(10, 10, false));
}
Map<String, Object> customSettings = null;
if (randomBoolean()) {
customSettings = Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10));
}
return new JobUpdate(description, detectorUpdates, modelDebugConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings);
}
@Override
protected Writeable.Reader<JobUpdate> instanceReader() {
return JobUpdate::new;
}
@Override
protected JobUpdate parseInstance(XContentParser parser) {
return JobUpdate.PARSER.apply(parser, null);
}
public void testMergeWithJob() {
List<JobUpdate.DetectorUpdate> detectorUpdates = new ArrayList<>();
List<DetectionRule> detectionRules1 = Collections.singletonList(new DetectionRule("client", null, Connective.OR,
Collections.singletonList(
new RuleCondition(RuleConditionType.NUMERICAL_ACTUAL, null, null, new Condition(Operator.GT, "5"), null))));
detectorUpdates.add(new JobUpdate.DetectorUpdate(0, "description-1", detectionRules1));
List<DetectionRule> detectionRules2 = Collections.singletonList(new DetectionRule("host", null, Connective.OR,
Collections.singletonList(
new RuleCondition(RuleConditionType.NUMERICAL_ACTUAL, null, null, new Condition(Operator.GT, "5"), null))));
detectorUpdates.add(new JobUpdate.DetectorUpdate(1, "description-2", detectionRules2));
ModelDebugConfig modelDebugConfig = new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10));
AnalysisLimits analysisLimits = new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong());
List<String> categorizationFilters = Arrays.asList(generateRandomStringArray(10, 10, false));
Map<String, Object> customSettings = Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10));
JobUpdate update = new JobUpdate("updated_description", detectorUpdates, modelDebugConfig,
analysisLimits, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
categorizationFilters, customSettings);
Job.Builder jobBuilder = new Job.Builder("foo");
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("client");
Detector.Builder d2 = new Detector.Builder("min", "field");
d2.setOverFieldName("host");
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Arrays.asList(d1.build(), d2.build()));
ac.setCategorizationFieldName("cat_field");
jobBuilder.setAnalysisConfig(ac);
Job updatedJob = update.mergeWithJob(jobBuilder.build());
assertEquals(update.getDescription(), updatedJob.getDescription());
assertEquals(update.getModelDebugConfig(), updatedJob.getModelDebugConfig());
assertEquals(update.getAnalysisLimits(), updatedJob.getAnalysisLimits());
assertEquals(update.getRenormalizationWindowDays(), updatedJob.getRenormalizationWindowDays());
assertEquals(update.getBackgroundPersistInterval(), updatedJob.getBackgroundPersistInterval());
assertEquals(update.getModelSnapshotRetentionDays(), updatedJob.getModelSnapshotRetentionDays());
assertEquals(update.getResultsRetentionDays(), updatedJob.getResultsRetentionDays());
assertEquals(update.getCategorizationFilters(), updatedJob.getAnalysisConfig().getCategorizationFilters());
assertEquals(update.getCustomSettings(), updatedJob.getCustomSettings());
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getIndex()).getDetectorDescription());
assertEquals(detectorUpdate.getDescription(),
updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getIndex()).getDetectorDescription());
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getIndex()).getDetectorDescription());
assertEquals(detectorUpdate.getRules(),
updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getIndex()).getDetectorRules());
}
}
}

View File

@ -0,0 +1,20 @@
{
"xpack.ml.update_job": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/ml/anomaly_detectors/{job_id}/_update",
"paths": [ "/_xpack/ml/anomaly_detectors/{job_id}/_update" ],
"parts": {
"job_id": {
"type": "string",
"required": true,
"description": "The ID of the job to create"
}
}
},
"body": {
"description" : "The job update settings",
"required" : true
}
}
}

View File

@ -181,6 +181,87 @@
}
}
---
"Test update job":
- do:
xpack.ml.put_job:
job_id: to-update
body: >
{
"description":"Pre update description",
"analysis_config" : {
"detectors" :[{"function":"mean","field_name":"responsetime","by_field_name":"airline"}, {"function":"count"}],
"categorization_field_name": "some_category",
"categorization_filters" : ["cat1.*", "cat2.*"]
},
"data_description" : {
"field_delimiter":","
},
"model_debug_config": {
"bounds_percentile": 95.0,
"write_to" : "data_store"
},
"analysis_limits": {
"model_memory_limit": 10
},
"renormalization_window_days": 1,
"background_persist_interval": 7200,
"model_snapshot_retention_days": 3,
"results_retention_days": 4,
"custom_settings": {
"setting1": "custom1",
"setting2": "custom2"
}
}
- match: { job_id: "to-update" }
- do:
xpack.ml.update_job:
job_id: to-update
body: >
{
"description":"Post update description",
"detectors": [{"index": 0, "rules": {"target_field_name": "airline",
"rule_conditions": [ { "condition_type": "numerical_actual",
"condition": {"operator": "gt", "value": "10" } } ] } },
{"index": 1, "description": "updated description"}],
"model_debug_config": {
"write_to" : "data_store",
"bounds_percentile": 99.0
},
"analysis_limits": {
"model_memory_limit": 20
},
"renormalization_window_days": 10,
"background_persist_interval": 10800,
"model_snapshot_retention_days": 30,
"results_retention_days": 40,
"categorization_filters" : ["cat3.*"],
"custom_settings": {
"setting3": "custom3"
}
}
- match: { job_id: "to-update" }
- match: { description: "Post update description" }
- match: { model_debug_config.bounds_percentile: 99.0 }
- match: { analysis_limits.model_memory_limit: 20 }
- match: { analysis_config.categorization_filters: ["cat3.*"] }
- match: { analysis_config.detectors.0.detector_rules.0.target_field_name: "airline" }
- match: { analysis_config.detectors.1.detector_description: "updated description" }
- match: { renormalization_window_days: 10 }
- match: { background_persist_interval: 10800 }
- match: { model_snapshot_retention_days: 30 }
- match: { results_retention_days: 40 }
- do:
catch: request
xpack.ml.put_job:
job_id: _all
body: >
{
"description":"Can't update all description"
}
---
"Test delete job that is referred by a datafeed":
- do: