diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java index 8753303f2a0..d8c9908318e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java @@ -63,6 +63,7 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.action.UpdateJobStateAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; +import org.elasticsearch.xpack.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; @@ -328,7 +329,8 @@ public class MlPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), - new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class) + new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class), + new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class) ); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index c0452224b45..fc8e42f6048 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -145,10 +145,6 @@ public class PostDataAction extends Action { private final JobManager jobManager; + private final Client client; @Inject public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager) { + ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + JobManager jobManager, Client client) { super(settings, UpdateJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, UpdateJobAction.Request::new); this.jobManager = jobManager; + this.client = client; } @Override @@ -153,13 +156,38 @@ public class UpdateJobAction extends Action listener) throws Exception { if (request.getJobId().equals(Job.ALL)) { throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update"); } - jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, listener); + ActionListener wrappedListener = listener; + if (request.getJobUpdate().isAutodetectProcessUpdate()) { + wrappedListener = ActionListener.wrap( + response -> updateProcess(request, response, listener), + listener::onFailure); + } + + jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, wrappedListener); + } + + private void updateProcess(Request request, PutJobAction.Response updateConfigResponse, + ActionListener listener) { + + UpdateProcessAction.Request updateProcessRequest = new UpdateProcessAction.Request(request.getJobId(), + request.getJobUpdate().getModelDebugConfig(), request.getJobUpdate().getDetectorUpdates()); + client.execute(UpdateProcessAction.INSTANCE, updateProcessRequest, new ActionListener() { + @Override + public void onResponse(UpdateProcessAction.Response response) { + listener.onResponse(updateConfigResponse); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java new file mode 100644 index 00000000000..c10084afac6 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java @@ -0,0 +1,216 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.StatusToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MlPlugin; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class UpdateProcessAction extends + Action { + + + public static final UpdateProcessAction INSTANCE = new UpdateProcessAction(); + public static final String NAME = "cluster:admin/ml/job/update/process"; + + private UpdateProcessAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + static class RequestBuilder extends ActionRequestBuilder { + + RequestBuilder(ElasticsearchClient client, UpdateProcessAction action) { + super(client, action, new Request()); + } + } + + public static class Response extends BaseTasksResponse implements StatusToXContentObject, Writeable { + + private boolean isUpdated; + + private Response() { + this.isUpdated = true; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + isUpdated = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(isUpdated); + } + + @Override + public RestStatus status() { + return RestStatus.ACCEPTED; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("updated", isUpdated); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hashCode(isUpdated); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Response other = (Response) obj; + + return this.isUpdated == other.isUpdated; + } + } + + public static class Request extends TransportJobTaskAction.JobTaskRequest { + + private ModelDebugConfig modelDebugConfig; + private List detectorUpdates; + + Request() { + } + + public Request(String jobId, ModelDebugConfig modelDebugConfig, List detectorUpdates) { + super(jobId); + this.modelDebugConfig = modelDebugConfig; + this.detectorUpdates = detectorUpdates; + } + + public ModelDebugConfig getModelDebugConfig() { + return modelDebugConfig; + } + + public List getDetectorUpdates() { + return detectorUpdates; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + modelDebugConfig = in.readOptionalWriteable(ModelDebugConfig::new); + if (in.readBoolean()) { + in.readList(JobUpdate.DetectorUpdate::new); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalWriteable(modelDebugConfig); + boolean hasDetectorUpdates = detectorUpdates != null; + out.writeBoolean(hasDetectorUpdates); + if (hasDetectorUpdates) { + out.writeList(detectorUpdates); + } + } + + @Override + public int hashCode() { + return Objects.hash(getJobId(), modelDebugConfig, detectorUpdates); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + + return Objects.equals(getJobId(), other.getJobId()) && + Objects.equals(modelDebugConfig, other.modelDebugConfig) && + Objects.equals(detectorUpdates, other.detectorUpdates); + } + } + + public static class TransportAction extends TransportJobTaskAction { + + @Inject + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + JobManager jobManager, AutodetectProcessManager processManager) { + super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, + Request::new, Response::new, MlPlugin.THREAD_POOL_NAME, jobManager, processManager, Request::getJobId); + } + + @Override + protected Response readTaskResponse(StreamInput in) throws IOException { + Response response = new Response(); + response.readFrom(in); + return response; + } + + @Override + protected void taskOperation(Request request, InternalOpenJobAction.JobTask task, ActionListener listener) { + threadPool.executor(MlPlugin.THREAD_POOL_NAME).execute(() -> { + try { + if (request.getModelDebugConfig() != null) { + processManager.writeUpdateModelDebugMessage(request.getJobId(), request.getModelDebugConfig()); + } + if (request.getDetectorUpdates() != null) { + for (JobUpdate.DetectorUpdate update : request.getDetectorUpdates()) { + processManager.writeUpdateDetectorRulesMessage(request.getJobId(), update.getIndex(), update.getRules()); + } + } + + listener.onResponse(new Response()); + } catch (Exception e) { + listener.onFailure(e); + } + }); + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java index ef1b2c9ef26..b022bfdf9e8 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java @@ -152,6 +152,10 @@ public class JobUpdate implements Writeable, ToXContent { return customSettings; } + public boolean isAutodetectProcessUpdate() { + return modelDebugConfig != null || detectorUpdates != null; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/config/ModelDebugConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/config/ModelDebugConfig.java index 0dfa7ed0b96..c14b87f459e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/config/ModelDebugConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/config/ModelDebugConfig.java @@ -20,92 +20,40 @@ import java.util.Locale; import java.util.Objects; public class ModelDebugConfig extends ToXContentToBytes implements Writeable { - /** - * Enum of the acceptable output destinations. - */ - public enum DebugDestination implements Writeable { - FILE, DATA_STORE; - - /** - * Case-insensitive from string method. Works with FILE, File, file, - * etc. - * - * @param value - * String representation - * @return The output destination - */ - public static DebugDestination fromString(String value) { - return DebugDestination.valueOf(value.toUpperCase(Locale.ROOT)); - } - - public static DebugDestination readFromStream(StreamInput in) throws IOException { - int ordinal = in.readVInt(); - if (ordinal < 0 || ordinal >= values().length) { - throw new IOException("Unknown DebugDestination ordinal [" + ordinal + "]"); - } - return values()[ordinal]; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(ordinal()); - } - - @Override - public String toString() { - return name().toLowerCase(Locale.ROOT); - } - } private static final double MAX_PERCENTILE = 100.0; private static final ParseField TYPE_FIELD = new ParseField("model_debug_config"); - private static final ParseField WRITE_TO_FIELD = new ParseField("write_to"); - private static final ParseField BOUNDS_PERCENTILE_FIELD = new ParseField("bounds_percentile"); - private static final ParseField TERMS_FIELD = new ParseField("terms"); + public static final ParseField BOUNDS_PERCENTILE_FIELD = new ParseField("bounds_percentile"); + public static final ParseField TERMS_FIELD = new ParseField("terms"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - TYPE_FIELD.getPreferredName(), a -> { - if (a[0] == null) { - return new ModelDebugConfig((Double) a[1], (String) a[2]); - } else { - return new ModelDebugConfig((DebugDestination) a[0], (Double) a[1], (String) a[2]); - } - }); + TYPE_FIELD.getPreferredName(), a -> new ModelDebugConfig((Double) a[0], (String) a[1])); + static { - PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> DebugDestination.fromString(p.text()), WRITE_TO_FIELD, - ValueType.STRING); PARSER.declareDouble(ConstructingObjectParser.constructorArg(), BOUNDS_PERCENTILE_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TERMS_FIELD); } - private final DebugDestination writeTo; private final double boundsPercentile; private final String terms; public ModelDebugConfig(double boundsPercentile, String terms) { - this(DebugDestination.FILE, boundsPercentile, terms); - } - - public ModelDebugConfig(DebugDestination writeTo, double boundsPercentile, String terms) { if (boundsPercentile < 0.0 || boundsPercentile > MAX_PERCENTILE) { String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_DEBUG_CONFIG_INVALID_BOUNDS_PERCENTILE); throw new IllegalArgumentException(msg); } - this.writeTo = writeTo; this.boundsPercentile = boundsPercentile; this.terms = terms; } public ModelDebugConfig(StreamInput in) throws IOException { - writeTo = in.readOptionalWriteable(DebugDestination::readFromStream); boundsPercentile = in.readDouble(); terms = in.readOptionalString(); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(writeTo); out.writeDouble(boundsPercentile); out.writeOptionalString(terms); } @@ -113,9 +61,6 @@ public class ModelDebugConfig extends ToXContentToBytes implements Writeable { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - if (writeTo != null) { - builder.field(WRITE_TO_FIELD.getPreferredName(), writeTo); - } builder.field(BOUNDS_PERCENTILE_FIELD.getPreferredName(), boundsPercentile); if (terms != null) { builder.field(TERMS_FIELD.getPreferredName(), terms); @@ -124,10 +69,6 @@ public class ModelDebugConfig extends ToXContentToBytes implements Writeable { return builder; } - public DebugDestination getWriteTo() { - return this.writeTo; - } - public double getBoundsPercentile() { return this.boundsPercentile; } @@ -147,12 +88,11 @@ public class ModelDebugConfig extends ToXContentToBytes implements Writeable { } ModelDebugConfig that = (ModelDebugConfig) other; - return Objects.equals(this.writeTo, that.writeTo) && Objects.equals(this.boundsPercentile, that.boundsPercentile) - && Objects.equals(this.terms, that.terms); + return Objects.equals(this.boundsPercentile, that.boundsPercentile) && Objects.equals(this.terms, that.terms); } @Override public int hashCode() { - return Objects.hash(this.writeTo, boundsPercentile, terms); + return Objects.hash(boundsPercentile, terms); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index 514c5c34346..e1a2d428ad9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -161,9 +161,6 @@ public final class Messages { public static final String JOB_DATA_CONCURRENT_USE_CLOSE = "job.data.concurrent.use.close"; public static final String JOB_DATA_CONCURRENT_USE_FLUSH = "job.data.concurrent.use.flush"; - public static final String JOB_DATA_CONCURRENT_USE_PAUSE = "job.data.concurrent.use.pause"; - public static final String JOB_DATA_CONCURRENT_USE_RESUME = "job.data.concurrent.use.resume"; - public static final String JOB_DATA_CONCURRENT_USE_REVERT = "job.data.concurrent.use.revert"; public static final String JOB_DATA_CONCURRENT_USE_UPDATE = "job.data.concurrent.use.update"; public static final String JOB_DATA_CONCURRENT_USE_UPLOAD = "job.data.concurrent.use.upload"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index b44bfc9d987..24486e8b728 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -12,7 +12,9 @@ import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; @@ -30,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; +import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -92,9 +95,17 @@ public class AutodetectCommunicator implements Closeable { }, true); } - public void writeUpdateConfigMessage(String config) throws IOException { + + public void writeUpdateModelDebugMessage(ModelDebugConfig config) throws IOException { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { - autodetectProcess.writeUpdateConfigMessage(config); + autodetectProcess.writeUpdateModelDebugMessage(config); + return null; + }, false); + } + + public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { + checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { + autodetectProcess.writeUpdateDetectorRulesMessage(detectorIndex, rules); return null; }, false); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 57b216cb726..cb659f287c9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -13,6 +15,7 @@ import java.io.Closeable; import java.io.IOException; import java.time.ZonedDateTime; import java.util.Iterator; +import java.util.List; /** * Interface representing the native C++ autodetect process @@ -31,17 +34,28 @@ public interface AutodetectProcess extends Closeable { /** * Write the reset buckets control message + * * @param params Reset bucket options - * @throws IOException If write reset mesage fails + * @throws IOException If write reset message fails */ void writeResetBucketsControlMessage(DataLoadParams params) throws IOException; /** - * Write an update configuration message - * @param config Config message - * @throws IOException If the write config message fails + * Update the model debug configuration + * + * @param modelDebugConfig New model debug config + * @throws IOException If the write fails */ - void writeUpdateConfigMessage(String config) throws IOException; + void writeUpdateModelDebugMessage(ModelDebugConfig modelDebugConfig) throws IOException; + + /** + * Write message to update the detector rules + * + * @param detectorIndex Index of the detector to update + * @param rules Detector rules + * @throws IOException If the write fails + */ + void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException; /** * Flush the job pushing any stale data into autodetect. diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index ff7a582701a..a27ab36e342 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -19,9 +19,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.UpdateJobStateAction; import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.MlFilter; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -45,6 +47,7 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; +import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.Set; @@ -165,14 +168,24 @@ public class AutodetectProcessManager extends AbstractComponent { } } - public void writeUpdateConfigMessage(String jobId, String config) throws IOException { + public void writeUpdateModelDebugMessage(String jobId, ModelDebugConfig config) throws IOException { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { - logger.debug("Cannot update config: no active autodetect process for job {}", jobId); + logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); return; } - communicator.writeUpdateConfigMessage(config); - // TODO check for errors from autodetect + communicator.writeUpdateModelDebugMessage(config); + // TODO check for errors from autodetects + } + + public void writeUpdateDetectorRulesMessage(String jobId, int detectorIndex, List rules) throws IOException { + AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); + if (communicator == null) { + logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); + return; + } + communicator.writeUpdateDetectorRulesMessage(detectorIndex, rules); + // TODO check for errors from autodetects } public void openJob(String jobId, boolean ignoreDowntime, Consumer handler) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 6a4cd6314d0..03e8dbd8a53 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; @@ -15,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.io.IOException; import java.time.ZonedDateTime; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -47,7 +50,11 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { } @Override - public void writeUpdateConfigMessage(String config) throws IOException { + public void writeUpdateModelDebugMessage(ModelDebugConfig modelDebugConfig) throws IOException { + } + + @Override + public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { } /** diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index b3c60831d15..e624b18322f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -89,9 +91,15 @@ class NativeAutodetectProcess implements AutodetectProcess { } @Override - public void writeUpdateConfigMessage(String config) throws IOException { + public void writeUpdateModelDebugMessage(ModelDebugConfig modelDebugConfig) throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields); - writer.writeUpdateConfigMessage(config); + writer.writeUpdateModelDebugMessage(modelDebugConfig); + } + + @Override + public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields); + writer.writeUpdateDetectorRulesMessage(detectorIndex, rules); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 2450d233ee6..518a0c9e7af 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -7,10 +7,18 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; import java.io.IOException; import java.io.OutputStream; +import java.io.StringWriter; import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; @@ -49,6 +57,10 @@ public class ControlMsgToProcessWriter { */ public static final String UPDATE_MESSAGE_CODE = "u"; + + private static final String EQUALS = " = "; + private static final char NEW_LINE = '\n'; + /** * An number to uniquely identify each flush so that subsequent code can * wait for acknowledgement of the correct flush. @@ -122,10 +134,6 @@ public class ControlMsgToProcessWriter { return flushId; } - public void writeUpdateConfigMessage(String config) throws IOException { - writeMessage(UPDATE_MESSAGE_CODE + config); - } - public void writeResetBucketsMessage(DataLoadParams params) throws IOException { writeControlCodeFollowedByTimeRange(RESET_BUCKETS_MESSAGE_CODE, params.getStart(), params.getEnd()); } @@ -141,6 +149,31 @@ public class ControlMsgToProcessWriter { writeMessage(message.toString()); } + public void writeUpdateModelDebugMessage(ModelDebugConfig modelDebugConfig) throws IOException { + StringWriter configWriter = new StringWriter(); + configWriter.append(UPDATE_MESSAGE_CODE).append("[modelDebugConfig]\n"); + new ModelDebugConfigWriter(modelDebugConfig, configWriter).write(); + writeMessage(configWriter.toString()); + } + + public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { + StringWriter configWriter = new StringWriter(); + configWriter.append(UPDATE_MESSAGE_CODE).append("[detectorRules]\n"); + configWriter.append("detectorIndex=").append(Integer.toString(detectorIndex)).append("\n"); + + configWriter.append("rulesJson="); + + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startArray(); + for (DetectionRule rule : rules) { + rule.toXContent(builder, ToXContent.EMPTY_PARAMS); + } + builder.endArray(); + configWriter.append(builder.string()); + + writeMessage(configWriter.toString()); + } + /** * Transform the supplied control message to length encoded values and * write to the OutputStream. @@ -163,5 +196,4 @@ public class ControlMsgToProcessWriter { // The control field comes last lengthEncodedWriter.writeField(message); } - } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ModelDebugConfigWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ModelDebugConfigWriter.java index 929e1f97dea..7bd44834caf 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ModelDebugConfigWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ModelDebugConfigWriter.java @@ -15,9 +15,6 @@ import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterCon import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.NEW_LINE; public class ModelDebugConfigWriter { - private static final String WRITE_TO_STR = "writeto"; - private static final String BOUNDS_PERCENTILE_STR = "boundspercentile"; - private static final String TERMS_STR = "terms"; private final ModelDebugConfig modelDebugConfig; private final Writer writer; @@ -29,23 +26,17 @@ public class ModelDebugConfigWriter { public void write() throws IOException { StringBuilder contents = new StringBuilder(); - if (modelDebugConfig.getWriteTo() != null) { - contents.append(WRITE_TO_STR) - .append(EQUALS) - .append(modelDebugConfig.getWriteTo()) - .append(NEW_LINE); - } - contents.append(BOUNDS_PERCENTILE_STR) - .append(EQUALS) - .append(modelDebugConfig.getBoundsPercentile()) - .append(NEW_LINE); + contents.append("boundspercentile") + .append(EQUALS) + .append(modelDebugConfig.getBoundsPercentile()) + .append(NEW_LINE); String terms = modelDebugConfig.getTerms(); - contents.append(TERMS_STR) - .append(EQUALS) - .append(terms == null ? "" : terms) - .append(NEW_LINE); + contents.append(ModelDebugConfig.TERMS_FIELD.getPreferredName()) + .append(EQUALS) + .append(terms == null ? "" : terms) + .append(NEW_LINE); writer.write(contents.toString()); } diff --git a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties index d71623103a6..3bd46d8e8ba 100644 --- a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties +++ b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties @@ -127,10 +127,7 @@ datafeed.aggregations.requires.job.with.summary.count.field = A job configured w job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job job.data.concurrent.use.flush = Cannot flush job {0} while another connection {2}is {1} the job -job.data.concurrent.use.pause = Cannot pause job {0} while another connection {2}is {1} the job -job.data.concurrent.use.resume = Cannot resume job {0} while another connection {2}is {1} the job -job.data.concurrent.use.revert = Cannot revert model snapshot for job {0} while another connection {2}is {1} the job -job.data.concurrent.use.update = Cannot update job {0} while another connection {2}is {1} the job +job.data.concurrent.use.update = Cannot update job {0} while it is in use job.data.concurrent.use.upload = Cannot write to job {0} while another connection {2}is {1} the job job.missing.quantiles = Cannot read persisted quantiles for job ''{0}'' diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateProcessActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateProcessActionRequestTests.java new file mode 100644 index 00000000000..e8bc72a2d2d --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateProcessActionRequestTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; +import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; +import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; + +import java.util.List; + +public class UpdateProcessActionRequestTests extends AbstractStreamableTestCase { + + + @Override + protected UpdateProcessAction.Request createTestInstance() { + ModelDebugConfig config = null; + if (randomBoolean()) { + config = new ModelDebugConfig(5.0, "debug,config"); + } + List updates = null; + if (randomBoolean()) { + + } + return new UpdateProcessAction.Request(randomAsciiOfLength(10), config, updates); + } + + @Override + protected UpdateProcessAction.Request createBlankInstance() { + return new UpdateProcessAction.Request(); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java index 179310c372e..475abdf30a8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/config/JobUpdateTests.java @@ -16,6 +16,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import static org.mockito.Mockito.mock; + public class JobUpdateTests extends AbstractSerializingTestCase { @Override @@ -139,4 +141,13 @@ public class JobUpdateTests extends AbstractSerializingTestCase { updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getIndex()).getDetectorRules()); } } + + public void testIsAutodetectProcessUpdate() { + JobUpdate update = new JobUpdate(null, null, null, null, null, null, null, null, null, null); + assertFalse(update.isAutodetectProcessUpdate()); + update = new JobUpdate(null, null, new ModelDebugConfig(1.0, "ff"), null, null, null, null, null, null, null); + assertTrue(update.isAutodetectProcessUpdate()); + update = new JobUpdate(null, Arrays.asList(mock(JobUpdate.DetectorUpdate.class)), null, null, null, null, null, null, null, null); + assertTrue(update.isAutodetectProcessUpdate()); + } } \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/config/ModelDebugConfigTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/config/ModelDebugConfigTests.java index f7ac2707173..6b70e6d61cd 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/config/ModelDebugConfigTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/config/ModelDebugConfigTests.java @@ -8,36 +8,11 @@ package org.elasticsearch.xpack.ml.job.config; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig.DebugDestination; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; public class ModelDebugConfigTests extends AbstractSerializingTestCase { - public void testEquals() { - assertFalse(new ModelDebugConfig(0d, null).equals(null)); - assertFalse(new ModelDebugConfig(0d, null).equals("a string")); - assertFalse(new ModelDebugConfig(80.0, "").equals(new ModelDebugConfig(81.0, ""))); - assertFalse(new ModelDebugConfig(80.0, "foo").equals(new ModelDebugConfig(80.0, "bar"))); - assertFalse(new ModelDebugConfig(DebugDestination.FILE, 80.0, "foo") - .equals(new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo"))); - - ModelDebugConfig modelDebugConfig = new ModelDebugConfig(0d, null); - assertTrue(modelDebugConfig.equals(modelDebugConfig)); - assertTrue(new ModelDebugConfig(0d, null).equals(new ModelDebugConfig(0d, null))); - assertTrue(new ModelDebugConfig(80.0, "foo").equals(new ModelDebugConfig(80.0, "foo"))); - assertTrue(new ModelDebugConfig(DebugDestination.FILE, 80.0, "foo").equals(new ModelDebugConfig(80.0, "foo"))); - assertTrue(new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo") - .equals(new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo"))); - } - - public void testHashCode() { - assertEquals(new ModelDebugConfig(80.0, "foo").hashCode(), new ModelDebugConfig(80.0, "foo").hashCode()); - assertEquals(new ModelDebugConfig(DebugDestination.FILE, 80.0, "foo").hashCode(), new ModelDebugConfig(80.0, "foo").hashCode()); - assertEquals(new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo").hashCode(), - new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo").hashCode()); - } - public void testVerify_GivenBoundPercentileLessThanZero() { IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> new ModelDebugConfig(-1.0, "")); assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MODEL_DEBUG_CONFIG_INVALID_BOUNDS_PERCENTILE, ""), e.getMessage()); @@ -55,7 +30,7 @@ public class ModelDebugConfigTests extends AbstractSerializingTestCase rules = Collections.singletonList(mock(DetectionRule.class)); + communicator.writeUpdateDetectorRulesMessage(1, rules); + Mockito.verify(process).writeUpdateDetectorRulesMessage(1, rules); } } @@ -179,17 +192,28 @@ public class AutodetectCommunicatorTests extends ESTestCase { communicator.close(); } - public void testWriteUpdateConfigMessageInUse() throws Exception { + public void testWriteUpdateModelDebugConfigMessageInUse() throws Exception { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); communicator.inUse.set(new CountDownLatch(1)); - expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateConfigMessage("")); + expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class))); communicator.inUse.set(null); - communicator.writeUpdateConfigMessage(""); + communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class)); } + public void testWriteUpdateDetectorRulesMessageInUse() throws Exception { + AutodetectProcess process = mockAutodetectProcessWithOutputStream(); + AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); + AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); + + List rules = Collections.singletonList(mock(DetectionRule.class)); + communicator.inUse.set(new CountDownLatch(1)); + expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateDetectorRulesMessage(0, rules)); + + communicator.inUse.set(null); + communicator.writeUpdateDetectorRulesMessage(0, rules); + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 0ee30257b76..3d801718960 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -19,10 +19,12 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.MlFilter; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -44,6 +46,7 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Date; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -259,11 +262,20 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertEquals("[foo] exception while flushing job", e.getMessage()); } - public void testWriteUpdateConfigMessage() throws IOException { + public void testWriteUpdateModelDebugMessage() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); - manager.writeUpdateConfigMessage("foo", "go faster"); - verify(communicator).writeUpdateConfigMessage("go faster"); + ModelDebugConfig debugConfig = mock(ModelDebugConfig.class); + manager.writeUpdateModelDebugMessage("foo", debugConfig); + verify(communicator).writeUpdateModelDebugMessage(debugConfig); + } + + public void testWriteUpdateDetectorRulesMessage() throws IOException { + AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); + AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); + List rules = Collections.singletonList(mock(DetectionRule.class)); + manager.writeUpdateDetectorRulesMessage("foo", 2, rules); + verify(communicator).writeUpdateDetectorRulesMessage(2, rules); } public void testJobHasActiveAutodetectProcess() throws IOException { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index a498fad7a93..a9524c39cbe 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -144,7 +145,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { new AutodetectResultsParser(Settings.EMPTY))) { process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); - process.writeUpdateConfigMessage(""); + process.writeUpdateModelDebugMessage(new ModelDebugConfig(1.0, "term,s")); process.flushStream(); String message = new String(bos.toByteArray(), StandardCharsets.UTF_8); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java index 22ac79c7c53..6561a4f7e26 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java @@ -10,10 +10,20 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyNoMoreInteractions; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.stream.IntStream; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.config.Condition; +import org.elasticsearch.xpack.ml.job.config.Connective; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; +import org.elasticsearch.xpack.ml.job.config.Operator; +import org.elasticsearch.xpack.ml.job.config.RuleCondition; +import org.elasticsearch.xpack.ml.job.config.RuleConditionType; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.junit.Before; import org.mockito.InOrder; @@ -137,4 +147,41 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { inOrder.verify(lengthEncodedWriter).writeField("r0 600"); verifyNoMoreInteractions(lengthEncodedWriter); } + + public void testWriteUpdateModelDebugMessage() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + + writer.writeUpdateModelDebugMessage(new ModelDebugConfig(10.0, "foo,bar")); + + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("u[modelDebugConfig]\nboundspercentile = 10.0\nterms = foo,bar\n"); + verifyNoMoreInteractions(lengthEncodedWriter); + } + + public void testWriteUpdateDetectorRulesMessage() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + + DetectionRule rule1 = new DetectionRule("targetField1", "targetValue", Connective.AND, createRule("5")); + DetectionRule rule2 = new DetectionRule("targetField2", "targetValue", Connective.AND, createRule("5")); + writer.writeUpdateDetectorRulesMessage(2, Arrays.asList(rule1, rule2)); + + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("u[detectorRules]\ndetectorIndex=2\n" + + "rulesJson=[{\"conditions_connective\":\"and\",\"rule_conditions\":" + + "[{\"condition_type\":\"numerical_actual\",\"condition\":{\"operator\":\"gt\",\"value\":\"5\"}}]," + + "\"target_field_name\":\"targetField1\",\"target_field_value\":\"targetValue\"}," + + "{\"conditions_connective\":\"and\",\"rule_conditions\":[{\"condition_type\":\"numerical_actual\"," + + "\"condition\":{\"operator\":\"gt\",\"value\":\"5\"}}]," + + "\"target_field_name\":\"targetField2\",\"target_field_value\":\"targetValue\"}]"); + verifyNoMoreInteractions(lengthEncodedWriter); + } + + private static List createRule(String value) { + Condition condition = new Condition(Operator.GT, value); + return Collections.singletonList(new RuleCondition(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition, null)); + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ModelDebugConfigWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ModelDebugConfigWriterTests.java index 833146c334b..f6554e34593 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ModelDebugConfigWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ModelDebugConfigWriterTests.java @@ -16,7 +16,6 @@ import org.junit.After; import org.junit.Before; import org.mockito.Mockito; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; -import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig.DebugDestination; public class ModelDebugConfigWriterTests extends ESTestCase { private OutputStreamWriter writer; @@ -30,23 +29,12 @@ public class ModelDebugConfigWriterTests extends ESTestCase { public void verifyNoMoreWriterInteractions() { verifyNoMoreInteractions(writer); } - - public void testWrite_GivenFileConfig() throws IOException { + public void testWrite_GivenFullConfig() throws IOException { ModelDebugConfig modelDebugConfig = new ModelDebugConfig(65.0, "foo,bar"); ModelDebugConfigWriter writer = new ModelDebugConfigWriter(modelDebugConfig, this.writer); writer.write(); - verify(this.writer).write("writeto = file\nboundspercentile = 65.0\nterms = foo,bar\n"); + verify(this.writer).write("boundspercentile = 65.0\nterms = foo,bar\n"); } - - public void testWrite_GivenFullConfig() throws IOException { - ModelDebugConfig modelDebugConfig = new ModelDebugConfig(DebugDestination.DATA_STORE, 65.0, "foo,bar"); - ModelDebugConfigWriter writer = new ModelDebugConfigWriter(modelDebugConfig, this.writer); - - writer.write(); - - verify(this.writer).write("writeto = data_store\nboundspercentile = 65.0\nterms = foo,bar\n"); - } - }