Online updates to the running autodetect process (elastic/elasticsearch#886)

* Methods to update the running process with new settings

* Task to update the running autodetect process

* Don’t start process update task if not config specified

Original commit: elastic/x-pack-elasticsearch@4364b141b5
This commit is contained in:
David Kyle 2017-02-08 14:19:24 +00:00 committed by GitHub
parent 639c02a45e
commit 9dc4a2f31c
23 changed files with 520 additions and 170 deletions

View File

@ -63,6 +63,7 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStateAction; import org.elasticsearch.xpack.ml.action.UpdateJobStateAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction; import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
@ -328,7 +329,8 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class),
new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class) new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class),
new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class)
); );
} }

View File

@ -145,10 +145,6 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
super(jobId); super(jobId);
} }
public String getJobId() {
return jobId;
}
public String getResetStart() { public String getResetStart() {
return resetStart; return resetStart;
} }

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -132,14 +133,16 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
public static class TransportAction extends TransportMasterNodeAction<UpdateJobAction.Request, PutJobAction.Response> { public static class TransportAction extends TransportMasterNodeAction<UpdateJobAction.Request, PutJobAction.Response> {
private final JobManager jobManager; private final JobManager jobManager;
private final Client client;
@Inject @Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager) { JobManager jobManager, Client client) {
super(settings, UpdateJobAction.NAME, transportService, clusterService, threadPool, actionFilters, super(settings, UpdateJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, UpdateJobAction.Request::new); indexNameExpressionResolver, UpdateJobAction.Request::new);
this.jobManager = jobManager; this.jobManager = jobManager;
this.client = client;
} }
@Override @Override
@ -153,13 +156,38 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
} }
@Override @Override
protected void masterOperation(UpdateJobAction.Request request, ClusterState state, protected void masterOperation(Request request, ClusterState state,
ActionListener<PutJobAction.Response> listener) throws Exception { ActionListener<PutJobAction.Response> listener) throws Exception {
if (request.getJobId().equals(Job.ALL)) { if (request.getJobId().equals(Job.ALL)) {
throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update"); throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update");
} }
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, listener); ActionListener<PutJobAction.Response> wrappedListener = listener;
if (request.getJobUpdate().isAutodetectProcessUpdate()) {
wrappedListener = ActionListener.wrap(
response -> updateProcess(request, response, listener),
listener::onFailure);
}
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, wrappedListener);
}
private void updateProcess(Request request, PutJobAction.Response updateConfigResponse,
ActionListener<PutJobAction.Response> listener) {
UpdateProcessAction.Request updateProcessRequest = new UpdateProcessAction.Request(request.getJobId(),
request.getJobUpdate().getModelDebugConfig(), request.getJobUpdate().getDetectorUpdates());
client.execute(UpdateProcessAction.INSTANCE, updateProcessRequest, new ActionListener<UpdateProcessAction.Response>() {
@Override
public void onResponse(UpdateProcessAction.Response response) {
listener.onResponse(updateConfigResponse);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} }
@Override @Override

View File

@ -0,0 +1,216 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class UpdateProcessAction extends
Action<UpdateProcessAction.Request, UpdateProcessAction.Response, UpdateProcessAction.RequestBuilder> {
public static final UpdateProcessAction INSTANCE = new UpdateProcessAction();
public static final String NAME = "cluster:admin/ml/job/update/process";
private UpdateProcessAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client, UpdateProcessAction action) {
super(client, action, new Request());
}
}
public static class Response extends BaseTasksResponse implements StatusToXContentObject, Writeable {
private boolean isUpdated;
private Response() {
this.isUpdated = true;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
isUpdated = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(isUpdated);
}
@Override
public RestStatus status() {
return RestStatus.ACCEPTED;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("updated", isUpdated);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hashCode(isUpdated);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return this.isUpdated == other.isUpdated;
}
}
public static class Request extends TransportJobTaskAction.JobTaskRequest<Request> {
private ModelDebugConfig modelDebugConfig;
private List<JobUpdate.DetectorUpdate> detectorUpdates;
Request() {
}
public Request(String jobId, ModelDebugConfig modelDebugConfig, List<JobUpdate.DetectorUpdate> detectorUpdates) {
super(jobId);
this.modelDebugConfig = modelDebugConfig;
this.detectorUpdates = detectorUpdates;
}
public ModelDebugConfig getModelDebugConfig() {
return modelDebugConfig;
}
public List<JobUpdate.DetectorUpdate> getDetectorUpdates() {
return detectorUpdates;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
modelDebugConfig = in.readOptionalWriteable(ModelDebugConfig::new);
if (in.readBoolean()) {
in.readList(JobUpdate.DetectorUpdate::new);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(modelDebugConfig);
boolean hasDetectorUpdates = detectorUpdates != null;
out.writeBoolean(hasDetectorUpdates);
if (hasDetectorUpdates) {
out.writeList(detectorUpdates);
}
}
@Override
public int hashCode() {
return Objects.hash(getJobId(), modelDebugConfig, detectorUpdates);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(getJobId(), other.getJobId()) &&
Objects.equals(modelDebugConfig, other.modelDebugConfig) &&
Objects.equals(detectorUpdates, other.detectorUpdates);
}
}
public static class TransportAction extends TransportJobTaskAction<InternalOpenJobAction.JobTask, Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, AutodetectProcessManager processManager) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, Response::new, MlPlugin.THREAD_POOL_NAME, jobManager, processManager, Request::getJobId);
}
@Override
protected Response readTaskResponse(StreamInput in) throws IOException {
Response response = new Response();
response.readFrom(in);
return response;
}
@Override
protected void taskOperation(Request request, InternalOpenJobAction.JobTask task, ActionListener<Response> listener) {
threadPool.executor(MlPlugin.THREAD_POOL_NAME).execute(() -> {
try {
if (request.getModelDebugConfig() != null) {
processManager.writeUpdateModelDebugMessage(request.getJobId(), request.getModelDebugConfig());
}
if (request.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : request.getDetectorUpdates()) {
processManager.writeUpdateDetectorRulesMessage(request.getJobId(), update.getIndex(), update.getRules());
}
}
listener.onResponse(new Response());
} catch (Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -152,6 +152,10 @@ public class JobUpdate implements Writeable, ToXContent {
return customSettings; return customSettings;
} }
public boolean isAutodetectProcessUpdate() {
return modelDebugConfig != null || detectorUpdates != null;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();

View File

@ -20,92 +20,40 @@ import java.util.Locale;
import java.util.Objects; import java.util.Objects;
public class ModelDebugConfig extends ToXContentToBytes implements Writeable { public class ModelDebugConfig extends ToXContentToBytes implements Writeable {
/**
* Enum of the acceptable output destinations.
*/
public enum DebugDestination implements Writeable {
FILE, DATA_STORE;
/**
* Case-insensitive from string method. Works with FILE, File, file,
* etc.
*
* @param value
* String representation
* @return The output destination
*/
public static DebugDestination fromString(String value) {
return DebugDestination.valueOf(value.toUpperCase(Locale.ROOT));
}
public static DebugDestination readFromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown DebugDestination ordinal [" + ordinal + "]");
}
return values()[ordinal];
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(ordinal());
}
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
private static final double MAX_PERCENTILE = 100.0; private static final double MAX_PERCENTILE = 100.0;
private static final ParseField TYPE_FIELD = new ParseField("model_debug_config"); private static final ParseField TYPE_FIELD = new ParseField("model_debug_config");
private static final ParseField WRITE_TO_FIELD = new ParseField("write_to"); public static final ParseField BOUNDS_PERCENTILE_FIELD = new ParseField("bounds_percentile");
private static final ParseField BOUNDS_PERCENTILE_FIELD = new ParseField("bounds_percentile"); public static final ParseField TERMS_FIELD = new ParseField("terms");
private static final ParseField TERMS_FIELD = new ParseField("terms");
public static final ConstructingObjectParser<ModelDebugConfig, Void> PARSER = new ConstructingObjectParser<>( public static final ConstructingObjectParser<ModelDebugConfig, Void> PARSER = new ConstructingObjectParser<>(
TYPE_FIELD.getPreferredName(), a -> { TYPE_FIELD.getPreferredName(), a -> new ModelDebugConfig((Double) a[0], (String) a[1]));
if (a[0] == null) {
return new ModelDebugConfig((Double) a[1], (String) a[2]);
} else {
return new ModelDebugConfig((DebugDestination) a[0], (Double) a[1], (String) a[2]);
}
});
static { static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> DebugDestination.fromString(p.text()), WRITE_TO_FIELD,
ValueType.STRING);
PARSER.declareDouble(ConstructingObjectParser.constructorArg(), BOUNDS_PERCENTILE_FIELD); PARSER.declareDouble(ConstructingObjectParser.constructorArg(), BOUNDS_PERCENTILE_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TERMS_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TERMS_FIELD);
} }
private final DebugDestination writeTo;
private final double boundsPercentile; private final double boundsPercentile;
private final String terms; private final String terms;
public ModelDebugConfig(double boundsPercentile, String terms) { public ModelDebugConfig(double boundsPercentile, String terms) {
this(DebugDestination.FILE, boundsPercentile, terms);
}
public ModelDebugConfig(DebugDestination writeTo, double boundsPercentile, String terms) {
if (boundsPercentile < 0.0 || boundsPercentile > MAX_PERCENTILE) { if (boundsPercentile < 0.0 || boundsPercentile > MAX_PERCENTILE) {
String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_DEBUG_CONFIG_INVALID_BOUNDS_PERCENTILE); String msg = Messages.getMessage(Messages.JOB_CONFIG_MODEL_DEBUG_CONFIG_INVALID_BOUNDS_PERCENTILE);
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
this.writeTo = writeTo;
this.boundsPercentile = boundsPercentile; this.boundsPercentile = boundsPercentile;
this.terms = terms; this.terms = terms;
} }
public ModelDebugConfig(StreamInput in) throws IOException { public ModelDebugConfig(StreamInput in) throws IOException {
writeTo = in.readOptionalWriteable(DebugDestination::readFromStream);
boundsPercentile = in.readDouble(); boundsPercentile = in.readDouble();
terms = in.readOptionalString(); terms = in.readOptionalString();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(writeTo);
out.writeDouble(boundsPercentile); out.writeDouble(boundsPercentile);
out.writeOptionalString(terms); out.writeOptionalString(terms);
} }
@ -113,9 +61,6 @@ public class ModelDebugConfig extends ToXContentToBytes implements Writeable {
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
if (writeTo != null) {
builder.field(WRITE_TO_FIELD.getPreferredName(), writeTo);
}
builder.field(BOUNDS_PERCENTILE_FIELD.getPreferredName(), boundsPercentile); builder.field(BOUNDS_PERCENTILE_FIELD.getPreferredName(), boundsPercentile);
if (terms != null) { if (terms != null) {
builder.field(TERMS_FIELD.getPreferredName(), terms); builder.field(TERMS_FIELD.getPreferredName(), terms);
@ -124,10 +69,6 @@ public class ModelDebugConfig extends ToXContentToBytes implements Writeable {
return builder; return builder;
} }
public DebugDestination getWriteTo() {
return this.writeTo;
}
public double getBoundsPercentile() { public double getBoundsPercentile() {
return this.boundsPercentile; return this.boundsPercentile;
} }
@ -147,12 +88,11 @@ public class ModelDebugConfig extends ToXContentToBytes implements Writeable {
} }
ModelDebugConfig that = (ModelDebugConfig) other; ModelDebugConfig that = (ModelDebugConfig) other;
return Objects.equals(this.writeTo, that.writeTo) && Objects.equals(this.boundsPercentile, that.boundsPercentile) return Objects.equals(this.boundsPercentile, that.boundsPercentile) && Objects.equals(this.terms, that.terms);
&& Objects.equals(this.terms, that.terms);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(this.writeTo, boundsPercentile, terms); return Objects.hash(boundsPercentile, terms);
} }
} }

View File

@ -161,9 +161,6 @@ public final class Messages {
public static final String JOB_DATA_CONCURRENT_USE_CLOSE = "job.data.concurrent.use.close"; public static final String JOB_DATA_CONCURRENT_USE_CLOSE = "job.data.concurrent.use.close";
public static final String JOB_DATA_CONCURRENT_USE_FLUSH = "job.data.concurrent.use.flush"; public static final String JOB_DATA_CONCURRENT_USE_FLUSH = "job.data.concurrent.use.flush";
public static final String JOB_DATA_CONCURRENT_USE_PAUSE = "job.data.concurrent.use.pause";
public static final String JOB_DATA_CONCURRENT_USE_RESUME = "job.data.concurrent.use.resume";
public static final String JOB_DATA_CONCURRENT_USE_REVERT = "job.data.concurrent.use.revert";
public static final String JOB_DATA_CONCURRENT_USE_UPDATE = "job.data.concurrent.use.update"; public static final String JOB_DATA_CONCURRENT_USE_UPDATE = "job.data.concurrent.use.update";
public static final String JOB_DATA_CONCURRENT_USE_UPLOAD = "job.data.concurrent.use.upload"; public static final String JOB_DATA_CONCURRENT_USE_UPLOAD = "job.data.concurrent.use.upload";

View File

@ -12,7 +12,9 @@ import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
@ -30,6 +32,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -92,9 +95,17 @@ public class AutodetectCommunicator implements Closeable {
}, true); }, true);
} }
public void writeUpdateConfigMessage(String config) throws IOException {
public void writeUpdateModelDebugMessage(ModelDebugConfig config) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateConfigMessage(config); autodetectProcess.writeUpdateModelDebugMessage(config);
return null;
}, false);
}
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateDetectorRulesMessage(detectorIndex, rules);
return null; return null;
}, false); }, false);
} }

View File

@ -5,6 +5,8 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.autodetect; package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
@ -13,6 +15,7 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
/** /**
* Interface representing the native C++ autodetect process * Interface representing the native C++ autodetect process
@ -31,17 +34,28 @@ public interface AutodetectProcess extends Closeable {
/** /**
* Write the reset buckets control message * Write the reset buckets control message
*
* @param params Reset bucket options * @param params Reset bucket options
* @throws IOException If write reset mesage fails * @throws IOException If write reset message fails
*/ */
void writeResetBucketsControlMessage(DataLoadParams params) throws IOException; void writeResetBucketsControlMessage(DataLoadParams params) throws IOException;
/** /**
* Write an update configuration message * Update the model debug configuration
* @param config Config message *
* @throws IOException If the write config message fails * @param modelDebugConfig New model debug config
* @throws IOException If the write fails
*/ */
void writeUpdateConfigMessage(String config) throws IOException; void writeUpdateModelDebugMessage(ModelDebugConfig modelDebugConfig) throws IOException;
/**
* Write message to update the detector rules
*
* @param detectorIndex Index of the detector to update
* @param rules Detector rules
* @throws IOException If the write fails
*/
void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException;
/** /**
* Flush the job pushing any stale data into autodetect. * Flush the job pushing any stale data into autodetect.

View File

@ -19,9 +19,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.UpdateJobStateAction; import org.elasticsearch.xpack.ml.action.UpdateJobStateAction;
import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -45,6 +47,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -165,14 +168,24 @@ public class AutodetectProcessManager extends AbstractComponent {
} }
} }
public void writeUpdateConfigMessage(String jobId, String config) throws IOException { public void writeUpdateModelDebugMessage(String jobId, ModelDebugConfig config) throws IOException {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) { if (communicator == null) {
logger.debug("Cannot update config: no active autodetect process for job {}", jobId); logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
return; return;
} }
communicator.writeUpdateConfigMessage(config); communicator.writeUpdateModelDebugMessage(config);
// TODO check for errors from autodetect // TODO check for errors from autodetects
}
public void writeUpdateDetectorRulesMessage(String jobId, int detectorIndex, List<DetectionRule> rules) throws IOException {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
return;
}
communicator.writeUpdateDetectorRulesMessage(detectorIndex, rules);
// TODO check for errors from autodetects
} }
public void openJob(String jobId, boolean ignoreDowntime, Consumer<Exception> handler) { public void openJob(String jobId, boolean ignoreDowntime, Consumer<Exception> handler) {

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
@ -15,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import java.io.IOException; import java.io.IOException;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -47,7 +50,11 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
} }
@Override @Override
public void writeUpdateConfigMessage(String config) throws IOException { public void writeUpdateModelDebugMessage(ModelDebugConfig modelDebugConfig) throws IOException {
}
@Override
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
} }
/** /**

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -89,9 +91,15 @@ class NativeAutodetectProcess implements AutodetectProcess {
} }
@Override @Override
public void writeUpdateConfigMessage(String config) throws IOException { public void writeUpdateModelDebugMessage(ModelDebugConfig modelDebugConfig) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields); ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields);
writer.writeUpdateConfigMessage(config); writer.writeUpdateModelDebugMessage(modelDebugConfig);
}
@Override
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields);
writer.writeUpdateDetectorRulesMessage(detectorIndex, rules);
} }
@Override @Override

View File

@ -7,10 +7,18 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
@ -49,6 +57,10 @@ public class ControlMsgToProcessWriter {
*/ */
public static final String UPDATE_MESSAGE_CODE = "u"; public static final String UPDATE_MESSAGE_CODE = "u";
private static final String EQUALS = " = ";
private static final char NEW_LINE = '\n';
/** /**
* An number to uniquely identify each flush so that subsequent code can * An number to uniquely identify each flush so that subsequent code can
* wait for acknowledgement of the correct flush. * wait for acknowledgement of the correct flush.
@ -122,10 +134,6 @@ public class ControlMsgToProcessWriter {
return flushId; return flushId;
} }
public void writeUpdateConfigMessage(String config) throws IOException {
writeMessage(UPDATE_MESSAGE_CODE + config);
}
public void writeResetBucketsMessage(DataLoadParams params) throws IOException { public void writeResetBucketsMessage(DataLoadParams params) throws IOException {
writeControlCodeFollowedByTimeRange(RESET_BUCKETS_MESSAGE_CODE, params.getStart(), params.getEnd()); writeControlCodeFollowedByTimeRange(RESET_BUCKETS_MESSAGE_CODE, params.getStart(), params.getEnd());
} }
@ -141,6 +149,31 @@ public class ControlMsgToProcessWriter {
writeMessage(message.toString()); writeMessage(message.toString());
} }
public void writeUpdateModelDebugMessage(ModelDebugConfig modelDebugConfig) throws IOException {
StringWriter configWriter = new StringWriter();
configWriter.append(UPDATE_MESSAGE_CODE).append("[modelDebugConfig]\n");
new ModelDebugConfigWriter(modelDebugConfig, configWriter).write();
writeMessage(configWriter.toString());
}
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
StringWriter configWriter = new StringWriter();
configWriter.append(UPDATE_MESSAGE_CODE).append("[detectorRules]\n");
configWriter.append("detectorIndex=").append(Integer.toString(detectorIndex)).append("\n");
configWriter.append("rulesJson=");
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startArray();
for (DetectionRule rule : rules) {
rule.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endArray();
configWriter.append(builder.string());
writeMessage(configWriter.toString());
}
/** /**
* Transform the supplied control message to length encoded values and * Transform the supplied control message to length encoded values and
* write to the OutputStream. * write to the OutputStream.
@ -163,5 +196,4 @@ public class ControlMsgToProcessWriter {
// The control field comes last // The control field comes last
lengthEncodedWriter.writeField(message); lengthEncodedWriter.writeField(message);
} }
} }

View File

@ -15,9 +15,6 @@ import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterCon
import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.NEW_LINE; import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.NEW_LINE;
public class ModelDebugConfigWriter { public class ModelDebugConfigWriter {
private static final String WRITE_TO_STR = "writeto";
private static final String BOUNDS_PERCENTILE_STR = "boundspercentile";
private static final String TERMS_STR = "terms";
private final ModelDebugConfig modelDebugConfig; private final ModelDebugConfig modelDebugConfig;
private final Writer writer; private final Writer writer;
@ -29,23 +26,17 @@ public class ModelDebugConfigWriter {
public void write() throws IOException { public void write() throws IOException {
StringBuilder contents = new StringBuilder(); StringBuilder contents = new StringBuilder();
if (modelDebugConfig.getWriteTo() != null) {
contents.append(WRITE_TO_STR)
.append(EQUALS)
.append(modelDebugConfig.getWriteTo())
.append(NEW_LINE);
}
contents.append(BOUNDS_PERCENTILE_STR) contents.append("boundspercentile")
.append(EQUALS) .append(EQUALS)
.append(modelDebugConfig.getBoundsPercentile()) .append(modelDebugConfig.getBoundsPercentile())
.append(NEW_LINE); .append(NEW_LINE);
String terms = modelDebugConfig.getTerms(); String terms = modelDebugConfig.getTerms();
contents.append(TERMS_STR) contents.append(ModelDebugConfig.TERMS_FIELD.getPreferredName())
.append(EQUALS) .append(EQUALS)
.append(terms == null ? "" : terms) .append(terms == null ? "" : terms)
.append(NEW_LINE); .append(NEW_LINE);
writer.write(contents.toString()); writer.write(contents.toString());
} }

View File

@ -127,10 +127,7 @@ datafeed.aggregations.requires.job.with.summary.count.field = A job configured w
job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job
job.data.concurrent.use.flush = Cannot flush job {0} while another connection {2}is {1} the job job.data.concurrent.use.flush = Cannot flush job {0} while another connection {2}is {1} the job
job.data.concurrent.use.pause = Cannot pause job {0} while another connection {2}is {1} the job job.data.concurrent.use.update = Cannot update job {0} while it is in use
job.data.concurrent.use.resume = Cannot resume job {0} while another connection {2}is {1} the job
job.data.concurrent.use.revert = Cannot revert model snapshot for job {0} while another connection {2}is {1} the job
job.data.concurrent.use.update = Cannot update job {0} while another connection {2}is {1} the job
job.data.concurrent.use.upload = Cannot write to job {0} while another connection {2}is {1} the job job.data.concurrent.use.upload = Cannot write to job {0} while another connection {2}is {1} the job
job.missing.quantiles = Cannot read persisted quantiles for job ''{0}'' job.missing.quantiles = Cannot read persisted quantiles for job ''{0}''

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import java.util.List;
public class UpdateProcessActionRequestTests extends AbstractStreamableTestCase<UpdateProcessAction.Request> {
@Override
protected UpdateProcessAction.Request createTestInstance() {
ModelDebugConfig config = null;
if (randomBoolean()) {
config = new ModelDebugConfig(5.0, "debug,config");
}
List<JobUpdate.DetectorUpdate> updates = null;
if (randomBoolean()) {
}
return new UpdateProcessAction.Request(randomAsciiOfLength(10), config, updates);
}
@Override
protected UpdateProcessAction.Request createBlankInstance() {
return new UpdateProcessAction.Request();
}
}

View File

@ -16,6 +16,8 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.mockito.Mockito.mock;
public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> { public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
@Override @Override
@ -139,4 +141,13 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getIndex()).getDetectorRules()); updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getIndex()).getDetectorRules());
} }
} }
public void testIsAutodetectProcessUpdate() {
JobUpdate update = new JobUpdate(null, null, null, null, null, null, null, null, null, null);
assertFalse(update.isAutodetectProcessUpdate());
update = new JobUpdate(null, null, new ModelDebugConfig(1.0, "ff"), null, null, null, null, null, null, null);
assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate(null, Arrays.asList(mock(JobUpdate.DetectorUpdate.class)), null, null, null, null, null, null, null, null);
assertTrue(update.isAutodetectProcessUpdate());
}
} }

View File

@ -8,36 +8,11 @@ package org.elasticsearch.xpack.ml.job.config;
import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig.DebugDestination;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
public class ModelDebugConfigTests extends AbstractSerializingTestCase<ModelDebugConfig> { public class ModelDebugConfigTests extends AbstractSerializingTestCase<ModelDebugConfig> {
public void testEquals() {
assertFalse(new ModelDebugConfig(0d, null).equals(null));
assertFalse(new ModelDebugConfig(0d, null).equals("a string"));
assertFalse(new ModelDebugConfig(80.0, "").equals(new ModelDebugConfig(81.0, "")));
assertFalse(new ModelDebugConfig(80.0, "foo").equals(new ModelDebugConfig(80.0, "bar")));
assertFalse(new ModelDebugConfig(DebugDestination.FILE, 80.0, "foo")
.equals(new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo")));
ModelDebugConfig modelDebugConfig = new ModelDebugConfig(0d, null);
assertTrue(modelDebugConfig.equals(modelDebugConfig));
assertTrue(new ModelDebugConfig(0d, null).equals(new ModelDebugConfig(0d, null)));
assertTrue(new ModelDebugConfig(80.0, "foo").equals(new ModelDebugConfig(80.0, "foo")));
assertTrue(new ModelDebugConfig(DebugDestination.FILE, 80.0, "foo").equals(new ModelDebugConfig(80.0, "foo")));
assertTrue(new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo")
.equals(new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo")));
}
public void testHashCode() {
assertEquals(new ModelDebugConfig(80.0, "foo").hashCode(), new ModelDebugConfig(80.0, "foo").hashCode());
assertEquals(new ModelDebugConfig(DebugDestination.FILE, 80.0, "foo").hashCode(), new ModelDebugConfig(80.0, "foo").hashCode());
assertEquals(new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo").hashCode(),
new ModelDebugConfig(DebugDestination.DATA_STORE, 80.0, "foo").hashCode());
}
public void testVerify_GivenBoundPercentileLessThanZero() { public void testVerify_GivenBoundPercentileLessThanZero() {
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> new ModelDebugConfig(-1.0, "")); IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> new ModelDebugConfig(-1.0, ""));
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MODEL_DEBUG_CONFIG_INVALID_BOUNDS_PERCENTILE, ""), e.getMessage()); assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MODEL_DEBUG_CONFIG_INVALID_BOUNDS_PERCENTILE, ""), e.getMessage());
@ -55,7 +30,7 @@ public class ModelDebugConfigTests extends AbstractSerializingTestCase<ModelDebu
@Override @Override
protected ModelDebugConfig createTestInstance() { protected ModelDebugConfig createTestInstance() {
return new ModelDebugConfig(randomFrom(DebugDestination.values()), randomDouble(), randomAsciiOfLengthBetween(1, 30)); return new ModelDebugConfig(randomDouble(), randomAsciiOfLengthBetween(1, 30));
} }
@Override @Override

View File

@ -10,8 +10,10 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -24,6 +26,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -49,12 +52,22 @@ public class AutodetectCommunicatorTests extends ESTestCase {
} }
} }
public void tesWriteUpdateConfigMessage() throws IOException { public void tesWriteUpdateModelDebugMessage() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
String config = ""; ModelDebugConfig config = new ModelDebugConfig(10.0, "apple,peach");
communicator.writeUpdateConfigMessage(config); communicator.writeUpdateModelDebugMessage(config);
Mockito.verify(process).writeUpdateConfigMessage(config); Mockito.verify(process).writeUpdateModelDebugMessage(config);
}
}
public void testWriteUpdateDetectorRulesMessage() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
communicator.writeUpdateDetectorRulesMessage(1, rules);
Mockito.verify(process).writeUpdateDetectorRulesMessage(1, rules);
} }
} }
@ -179,17 +192,28 @@ public class AutodetectCommunicatorTests extends ESTestCase {
communicator.close(); communicator.close();
} }
public void testWriteUpdateConfigMessageInUse() throws Exception { public void testWriteUpdateModelDebugConfigMessageInUse() throws Exception {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(new CountDownLatch(1)); communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateConfigMessage("")); expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class)));
communicator.inUse.set(null); communicator.inUse.set(null);
communicator.writeUpdateConfigMessage(""); communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class));
} }
public void testWriteUpdateDetectorRulesMessageInUse() throws Exception {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateDetectorRulesMessage(0, rules));
communicator.inUse.set(null);
communicator.writeUpdateDetectorRulesMessage(0, rules);
}
} }

View File

@ -19,10 +19,12 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -44,6 +46,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -259,11 +262,20 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals("[foo] exception while flushing job", e.getMessage()); assertEquals("[foo] exception while flushing job", e.getMessage());
} }
public void testWriteUpdateConfigMessage() throws IOException { public void testWriteUpdateModelDebugMessage() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
manager.writeUpdateConfigMessage("foo", "go faster"); ModelDebugConfig debugConfig = mock(ModelDebugConfig.class);
verify(communicator).writeUpdateConfigMessage("go faster"); manager.writeUpdateModelDebugMessage("foo", debugConfig);
verify(communicator).writeUpdateModelDebugMessage(debugConfig);
}
public void testWriteUpdateDetectorRulesMessage() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
manager.writeUpdateDetectorRulesMessage("foo", 2, rules);
verify(communicator).writeUpdateDetectorRulesMessage(2, rules);
} }
public void testJobHasActiveAutodetectProcess() throws IOException { public void testJobHasActiveAutodetectProcess() throws IOException {

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -144,7 +145,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
new AutodetectResultsParser(Settings.EMPTY))) { new AutodetectResultsParser(Settings.EMPTY))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
process.writeUpdateConfigMessage(""); process.writeUpdateModelDebugMessage(new ModelDebugConfig(1.0, "term,s"));
process.flushStream(); process.flushStream();
String message = new String(bos.toByteArray(), StandardCharsets.UTF_8); String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);

View File

@ -10,10 +10,20 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.Condition;
import org.elasticsearch.xpack.ml.job.config.Connective;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.config.Operator;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.config.RuleConditionType;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.junit.Before; import org.junit.Before;
import org.mockito.InOrder; import org.mockito.InOrder;
@ -137,4 +147,41 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
inOrder.verify(lengthEncodedWriter).writeField("r0 600"); inOrder.verify(lengthEncodedWriter).writeField("r0 600");
verifyNoMoreInteractions(lengthEncodedWriter); verifyNoMoreInteractions(lengthEncodedWriter);
} }
public void testWriteUpdateModelDebugMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
writer.writeUpdateModelDebugMessage(new ModelDebugConfig(10.0, "foo,bar"));
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4);
inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
inOrder.verify(lengthEncodedWriter).writeField("u[modelDebugConfig]\nboundspercentile = 10.0\nterms = foo,bar\n");
verifyNoMoreInteractions(lengthEncodedWriter);
}
public void testWriteUpdateDetectorRulesMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
DetectionRule rule1 = new DetectionRule("targetField1", "targetValue", Connective.AND, createRule("5"));
DetectionRule rule2 = new DetectionRule("targetField2", "targetValue", Connective.AND, createRule("5"));
writer.writeUpdateDetectorRulesMessage(2, Arrays.asList(rule1, rule2));
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4);
inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
inOrder.verify(lengthEncodedWriter).writeField("u[detectorRules]\ndetectorIndex=2\n" +
"rulesJson=[{\"conditions_connective\":\"and\",\"rule_conditions\":" +
"[{\"condition_type\":\"numerical_actual\",\"condition\":{\"operator\":\"gt\",\"value\":\"5\"}}]," +
"\"target_field_name\":\"targetField1\",\"target_field_value\":\"targetValue\"}," +
"{\"conditions_connective\":\"and\",\"rule_conditions\":[{\"condition_type\":\"numerical_actual\"," +
"\"condition\":{\"operator\":\"gt\",\"value\":\"5\"}}]," +
"\"target_field_name\":\"targetField2\",\"target_field_value\":\"targetValue\"}]");
verifyNoMoreInteractions(lengthEncodedWriter);
}
private static List<RuleCondition> createRule(String value) {
Condition condition = new Condition(Operator.GT, value);
return Collections.singletonList(new RuleCondition(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition, null));
}
} }

View File

@ -16,7 +16,6 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig.DebugDestination;
public class ModelDebugConfigWriterTests extends ESTestCase { public class ModelDebugConfigWriterTests extends ESTestCase {
private OutputStreamWriter writer; private OutputStreamWriter writer;
@ -30,23 +29,12 @@ public class ModelDebugConfigWriterTests extends ESTestCase {
public void verifyNoMoreWriterInteractions() { public void verifyNoMoreWriterInteractions() {
verifyNoMoreInteractions(writer); verifyNoMoreInteractions(writer);
} }
public void testWrite_GivenFullConfig() throws IOException {
public void testWrite_GivenFileConfig() throws IOException {
ModelDebugConfig modelDebugConfig = new ModelDebugConfig(65.0, "foo,bar"); ModelDebugConfig modelDebugConfig = new ModelDebugConfig(65.0, "foo,bar");
ModelDebugConfigWriter writer = new ModelDebugConfigWriter(modelDebugConfig, this.writer); ModelDebugConfigWriter writer = new ModelDebugConfigWriter(modelDebugConfig, this.writer);
writer.write(); writer.write();
verify(this.writer).write("writeto = file\nboundspercentile = 65.0\nterms = foo,bar\n"); verify(this.writer).write("boundspercentile = 65.0\nterms = foo,bar\n");
} }
public void testWrite_GivenFullConfig() throws IOException {
ModelDebugConfig modelDebugConfig = new ModelDebugConfig(DebugDestination.DATA_STORE, 65.0, "foo,bar");
ModelDebugConfigWriter writer = new ModelDebugConfigWriter(modelDebugConfig, this.writer);
writer.write();
verify(this.writer).write("writeto = data_store\nboundspercentile = 65.0\nterms = foo,bar\n");
}
} }