diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/action/PutCalendarAction.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/action/PutCalendarAction.java index e7cb59b484b..c5fc34a4563 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/action/PutCalendarAction.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/action/PutCalendarAction.java @@ -20,12 +20,9 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.calendars.Calendar; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.watcher.support.Exceptions; import java.io.IOException; import java.util.Objects; -import java.util.Set; -import java.util.function.Consumer; import static org.elasticsearch.action.ValidateActions.addValidationError; diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java index c89fb4086ac..f97a493bd5f 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java @@ -139,6 +139,4 @@ public class PutFilterAction extends Action detectorUpdates; + private MlFilter filter; private boolean updateScheduledEvents = false; Request() { } - public Request(String jobId, ModelPlotConfig modelPlotConfig, List detectorUpdates, + public Request(String jobId, ModelPlotConfig modelPlotConfig, List detectorUpdates, MlFilter filter, boolean updateScheduledEvents) { super(jobId); this.modelPlotConfig = modelPlotConfig; this.detectorUpdates = detectorUpdates; + this.filter = filter; this.updateScheduledEvents = updateScheduledEvents; } @@ -132,6 +135,10 @@ public class UpdateProcessAction extends return detectorUpdates; } + public MlFilter getFilter() { + return filter; + } + public boolean isUpdateScheduledEvents() { return updateScheduledEvents; } @@ -144,6 +151,7 @@ public class UpdateProcessAction extends detectorUpdates = in.readList(JobUpdate.DetectorUpdate::new); } if (in.getVersion().onOrAfter(Version.V_6_2_0)) { + filter = in.readOptionalWriteable(MlFilter::new); updateScheduledEvents = in.readBoolean(); } } @@ -158,13 +166,14 @@ public class UpdateProcessAction extends out.writeList(detectorUpdates); } if (out.getVersion().onOrAfter(Version.V_6_2_0)) { + out.writeOptionalWriteable(filter); out.writeBoolean(updateScheduledEvents); } } @Override public int hashCode() { - return Objects.hash(getJobId(), modelPlotConfig, detectorUpdates, updateScheduledEvents); + return Objects.hash(getJobId(), modelPlotConfig, detectorUpdates, filter, updateScheduledEvents); } @Override @@ -180,8 +189,8 @@ public class UpdateProcessAction extends return Objects.equals(getJobId(), other.getJobId()) && Objects.equals(modelPlotConfig, other.modelPlotConfig) && Objects.equals(detectorUpdates, other.detectorUpdates) && + Objects.equals(filter, other.filter) && Objects.equals(updateScheduledEvents, other.updateScheduledEvents); } } - } diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java index 1e24a2dde69..9140d20f7e3 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java @@ -29,7 +29,6 @@ import java.util.Objects; public class JobUpdate implements Writeable, ToXContentObject { public static final ParseField DETECTORS = new ParseField("detectors"); - public static final ParseField UPDATE_SCHEDULED_EVENTS = new ParseField("update_scheduled_events"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "job_update", args -> new Builder((String) args[0])); @@ -50,7 +49,6 @@ public class JobUpdate implements Writeable, ToXContentObject { PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT); PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID); PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); - PARSER.declareBoolean(Builder::setUpdateScheduledEvents, UPDATE_SCHEDULED_EVENTS); } /** @@ -75,7 +73,6 @@ public class JobUpdate implements Writeable, ToXContentObject { private final Map customSettings; private final String modelSnapshotId; private final Long establishedModelMemory; - private final boolean updateScheduledEvents; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @Nullable List detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig, @@ -83,7 +80,7 @@ public class JobUpdate implements Writeable, ToXContentObject { @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, @Nullable Map customSettings, @Nullable String modelSnapshotId, - @Nullable Long establishedModelMemory, boolean updateScheduledEvents) { + @Nullable Long establishedModelMemory) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -98,7 +95,6 @@ public class JobUpdate implements Writeable, ToXContentObject { this.customSettings = customSettings; this.modelSnapshotId = modelSnapshotId; this.establishedModelMemory = establishedModelMemory; - this.updateScheduledEvents = updateScheduledEvents; } public JobUpdate(StreamInput in) throws IOException { @@ -133,12 +129,6 @@ public class JobUpdate implements Writeable, ToXContentObject { } else { establishedModelMemory = null; } - - if (in.getVersion().onOrAfter(Version.V_6_2_0)) { - updateScheduledEvents = in.readBoolean(); - } else { - updateScheduledEvents = false; - } } @Override @@ -168,10 +158,6 @@ public class JobUpdate implements Writeable, ToXContentObject { if (out.getVersion().onOrAfter(Version.V_6_1_0)) { out.writeOptionalLong(establishedModelMemory); } - - if (out.getVersion().onOrAfter(Version.V_6_2_0)) { - out.writeBoolean(updateScheduledEvents); - } } public String getJobId() { @@ -234,10 +220,6 @@ public class JobUpdate implements Writeable, ToXContentObject { return modelPlotConfig != null || detectorUpdates != null; } - public boolean isUpdateScheduledEvents() { - return updateScheduledEvents; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -281,7 +263,6 @@ public class JobUpdate implements Writeable, ToXContentObject { if (establishedModelMemory != null) { builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); } - builder.field(UPDATE_SCHEDULED_EVENTS.getPreferredName(), updateScheduledEvents); builder.endObject(); return builder; } @@ -418,15 +399,14 @@ public class JobUpdate implements Writeable, ToXContentObject { && Objects.equals(this.categorizationFilters, that.categorizationFilters) && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) - && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) - && Objects.equals(this.updateScheduledEvents, that.updateScheduledEvents); + && Objects.equals(this.establishedModelMemory, that.establishedModelMemory); } @Override public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory, updateScheduledEvents); + modelSnapshotId, establishedModelMemory); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -536,7 +516,6 @@ public class JobUpdate implements Writeable, ToXContentObject { private Map customSettings; private String modelSnapshotId; private Long establishedModelMemory; - private boolean updateScheduledEvents = false; public Builder(String jobId) { this.jobId = jobId; @@ -612,15 +591,10 @@ public class JobUpdate implements Writeable, ToXContentObject { return this; } - public Builder setUpdateScheduledEvents(boolean updateScheduledEvents) { - this.updateScheduledEvents = updateScheduledEvents; - return this; - } - public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory, updateScheduledEvents); + modelSnapshotId, establishedModelMemory); } } } diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/RuleCondition.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/RuleCondition.java index a950189931f..d050373a6b7 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/RuleCondition.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/config/RuleCondition.java @@ -177,8 +177,8 @@ public class RuleCondition implements ToXContentObject, Writeable { return Objects.hash(type, fieldName, fieldValue, condition, filterId); } - public static RuleCondition createCategorical(String fieldName, String valueFilter) { - return new RuleCondition(RuleConditionType.CATEGORICAL, fieldName, null, null, valueFilter); + public static RuleCondition createCategorical(String fieldName, String filterId) { + return new RuleCondition(RuleConditionType.CATEGORICAL, fieldName, null, null, filterId); } public static RuleCondition createNumerical(RuleConditionType conditionType, String fieldName, String fieldValue, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java index 3a2df99a415..be50f50e396 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java @@ -7,9 +7,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.get.GetAction; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -26,7 +23,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetaIndex; import org.elasticsearch.xpack.ml.calendars.Calendar; -import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin; @@ -34,15 +32,19 @@ import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin; public class TransportDeleteCalendarAction extends HandledTransportAction { private final Client client; + private final JobManager jobManager; + private final JobProvider jobProvider; @Inject public TransportDeleteCalendarAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client) { + Client client, JobManager jobManager, JobProvider jobProvider) { super(settings, DeleteCalendarAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeleteCalendarAction.Request::new); this.client = client; + this.jobManager = jobManager; + this.jobProvider = jobProvider; } @Override @@ -50,29 +52,25 @@ public class TransportDeleteCalendarAction extends HandledTransportAction() { - @Override - public void onResponse(GetResponse getResponse) { - if (getResponse.isExists() == false) { - listener.onFailure(new ResourceNotFoundException("Could not delete calendar [" + calendarId - + "] because it does not exist")); - return; - } - - // Delete calendar and events - DeleteByQueryRequest dbqRequest = buildDeleteByQuery(calendarId); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, dbqRequest, ActionListener.wrap( - response -> listener.onResponse(new DeleteCalendarAction.Response(true)), - listener::onFailure)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(ExceptionsHelper.serverError("Could not delete calendar [" + calendarId + "]", e)); - } - } + ActionListener calendarListener = ActionListener.wrap( + calendar -> { + // Delete calendar and events + DeleteByQueryRequest dbqRequest = buildDeleteByQuery(calendarId); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, dbqRequest, ActionListener.wrap( + response -> { + if (response.getDeleted() == 0) { + listener.onFailure(new ResourceNotFoundException("No calendar with id [" + calendarId + "]")); + return; + } + jobManager.updateProcessOnCalendarChanged(calendar.getJobIds()); + listener.onResponse(new DeleteCalendarAction.Response(true)); + }, + listener::onFailure)); + }, + listener::onFailure ); + + jobProvider.calendar(calendarId, calendarListener); } private DeleteByQueryRequest buildDeleteByQuery(String calendarId) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java index 3601003c980..f3f8af8b8b1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.delete.DeleteAction; @@ -26,6 +25,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetaIndex; import org.elasticsearch.xpack.ml.calendars.Calendar; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.util.Map; @@ -37,15 +38,19 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction { private final Client client; + private final JobProvider jobProvider; + private final JobManager jobManager; @Inject public TransportDeleteCalendarEventAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client) { + Client client, JobProvider jobProvider, JobManager jobManager) { super(settings, DeleteCalendarEventAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeleteCalendarEventAction.Request::new); this.client = client; + this.jobProvider = jobProvider; + this.jobManager = jobManager; } @Override @@ -57,27 +62,34 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction source = getResponse.getSourceAsMap(); String calendarId = (String) source.get(Calendar.ID.getPreferredName()); if (calendarId == null) { - listener.onFailure(new ElasticsearchStatusException("Event [" + eventId + "] does not have a valid " - + Calendar.ID.getPreferredName(), RestStatus.BAD_REQUEST)); + listener.onFailure(ExceptionsHelper.badRequestException("Event [" + eventId + "] does not have a valid " + + Calendar.ID.getPreferredName())); return; } if (calendarId.equals(request.getCalendarId()) == false) { - listener.onFailure(new ElasticsearchStatusException( + listener.onFailure(ExceptionsHelper.badRequestException( "Event [" + eventId + "] has " + Calendar.ID.getPreferredName() + " [" + calendarId + "] which does not match the request " + Calendar.ID.getPreferredName() + - " [" + request.getCalendarId() + "]", RestStatus.BAD_REQUEST)); + " [" + request.getCalendarId() + "]")); return; } - deleteEvent(eventId, listener); + ActionListener calendarListener = ActionListener.wrap( + calendar -> { + deleteEvent(eventId, calendar, listener); + }, + listener::onFailure + ); + + jobProvider.calendar(calendarId, calendarListener); } @Override @@ -87,7 +99,7 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction listener) { + private void deleteEvent(String eventId, Calendar calendar, ActionListener listener) { DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, eventId); deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -97,9 +109,9 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction listener) { List events = request.getScheduledEvents(); - ActionListener calendarExistsListener = ActionListener.wrap( - r -> { + ActionListener calendarListener = ActionListener.wrap( + calendar -> { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); for (ScheduledEvent event: events) { @@ -78,6 +82,7 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction() { @Override public void onResponse(BulkResponse response) { + jobManager.updateProcessOnCalendarChanged(calendar.getJobIds()); listener.onResponse(new PostCalendarEventsAction.Response(events)); } @@ -90,13 +95,6 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction listener) { - jobProvider.calendar(calendarId, ActionListener.wrap( - c -> listener.onResponse(true), - listener::onFailure - )); + jobProvider.calendar(request.getCalendarId(), calendarListener); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java index bbec8233449..69b388cfde9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.ml.MLMetadataField; import org.elasticsearch.xpack.ml.MlMetaIndex; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.calendars.Calendar; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -42,16 +43,18 @@ public class TransportPutCalendarAction extends HandledTransportAction() { @Override public void onResponse(IndexResponse indexResponse) { + jobManager.updateProcessOnCalendarChanged(calendar.getJobIds()); listener.onResponse(new PutCalendarAction.Response(calendar)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java index 046a2cea951..0e3b7f39c13 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetaIndex; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; @@ -35,15 +36,17 @@ import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin; public class TransportPutFilterAction extends HandledTransportAction { private final Client client; + private final JobManager jobManager; @Inject public TransportPutFilterAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client) { + Client client, JobManager jobManager) { super(settings, PutFilterAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutFilterAction.Request::new); this.client = client; + this.jobManager = jobManager; } @Override @@ -64,6 +67,7 @@ public class TransportPutFilterAction extends HandledTransportAction() { @Override public void onResponse(BulkResponse indexResponse) { + jobManager.updateProcessOnFilterChanged(filter); listener.onResponse(new PutFilterAction.Response()); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java index 743ff42227c..58369e5594e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MLMetadataField; import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; @@ -24,16 +25,18 @@ public class TransportUpdateCalendarJobAction extends HandledTransportAction listener.onResponse(new PutCalendarAction.Response(c)), listener::onFailure); + c -> { + jobManager.updateProcessOnCalendarChanged(c.getJobIds()); + listener.onResponse(new PutCalendarAction.Response(c)); + }, listener::onFailure); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateProcessAction.java index 86330ff5f9d..bafc2a00d09 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateProcessAction.java @@ -41,10 +41,15 @@ public class TransportUpdateProcessAction extends TransportJobTaskAction listener) { + UpdateParams updateParams = UpdateParams.builder(request.getJobId()) + .modelPlotConfig(request.getModelPlotConfig()) + .detectorUpdates(request.getDetectorUpdates()) + .filter(request.getFilter()) + .updateScheduledEvents(request.isUpdateScheduledEvents()) + .build(); + try { - processManager.writeUpdateProcessMessage(task, - new UpdateParams(request.getModelPlotConfig(), - request.getDetectorUpdates(), request.isUpdateScheduledEvents()), + processManager.writeUpdateProcessMessage(task, updateParams, e -> { if (e == null) { listener.onResponse(new UpdateProcessAction.Response()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index c22fa2621b9..c85ef674dc7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -37,10 +37,12 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; +import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -280,18 +282,16 @@ public class JobManager extends AbstractComponent { // nothing to do return currentState; } - changeWasRequired = true; + // No change is required if the fields that the C++ uses aren't being updated + changeWasRequired = jobUpdate.isAutodetectProcessUpdate(); return updateClusterState(updatedJob, true, currentState); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (changeWasRequired) { - PersistentTasksCustomMetaData persistentTasks = - newState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - JobState jobState = MlMetadata.getJobState(jobId, persistentTasks); - if (jobState == JobState.OPENED) { - updateJobProcessNotifier.submitJobUpdate(jobUpdate); + if (isJobOpen(newState, jobId)) { + updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate)); } } else { logger.debug("[{}] Ignored job update with no changes: {}", () -> jobId, () -> { @@ -308,12 +308,40 @@ public class JobManager extends AbstractComponent { }); } + private boolean isJobOpen(ClusterState clusterState, String jobId) { + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + JobState jobState = MlMetadata.getJobState(jobId, persistentTasks); + return jobState == JobState.OPENED; + } + ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) { MlMetadata.Builder builder = createMlMetadataBuilder(currentState); builder.putJob(job, overwrite); return buildNewClusterState(currentState, builder); } + public void updateProcessOnFilterChanged(MlFilter filter) { + ClusterState clusterState = clusterService.state(); + QueryPage jobs = expandJobs("*", true, clusterService.state()); + for (Job job : jobs.results()) { + if (isJobOpen(clusterState, job.getId())) { + Set jobFilters = job.getAnalysisConfig().extractReferencedFilters(); + if (jobFilters.contains(filter.getId())) { + updateJobProcessNotifier.submitJobUpdate(UpdateParams.filterUpdate(job.getId(), filter)); + } + } + } + } + + public void updateProcessOnCalendarChanged(List calendarJobIds) { + ClusterState clusterState = clusterService.state(); + for (String jobId : calendarJobIds) { + if (isJobOpen(clusterState, jobId)) { + updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId)); + } + } + } + public void deleteJob(DeleteJobAction.Request request, JobStorageDeletionTask task, ActionListener actionListener) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 2e677f2f92c..5804702a3ae 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -17,7 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.action.UpdateProcessAction; -import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; import java.util.concurrent.LinkedBlockingQueue; @@ -30,7 +30,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local private final Client client; private final ThreadPool threadPool; - private final LinkedBlockingQueue orderedJobUpdates = new LinkedBlockingQueue<>(1000); + private final LinkedBlockingQueue orderedJobUpdates = new LinkedBlockingQueue<>(1000); private volatile ThreadPool.Cancellable cancellable; @@ -47,8 +47,8 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local }); } - boolean submitJobUpdate(JobUpdate jobUpdate) { - return orderedJobUpdates.offer(jobUpdate); + boolean submitJobUpdate(UpdateParams updateParams) { + return orderedJobUpdates.offer(updateParams); } @Override @@ -82,24 +82,17 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local private void processNextUpdate() { try { - JobUpdate jobUpdate = orderedJobUpdates.poll(); - if (jobUpdate != null) { - executeRemoteJobIfNecessary(jobUpdate); + UpdateParams updateParams = orderedJobUpdates.poll(); + if (updateParams != null) { + executeRemoteJob(updateParams); } } catch (Exception e) { logger.error("Unable while processing next job update", e); } } - void executeRemoteJobIfNecessary(JobUpdate update) { - // Do nothing if the fields that the C++ needs aren't being updated - if (update.isAutodetectProcessUpdate()) { - executeRemoteJob(update); - } - } - - void executeRemoteJob(JobUpdate update) { - Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), + void executeRemoteJob(UpdateParams update) { + Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(), update.isUpdateScheduledEvents()); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, @@ -126,5 +119,4 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } }); } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index b5ca1672e77..446bfd436d6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -22,7 +22,6 @@ import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.CategorizationAnalyzerConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; -import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; @@ -45,7 +44,6 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -58,7 +56,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.stream.Collectors; public class AutodetectCommunicator implements Closeable { @@ -215,35 +212,23 @@ public class AutodetectCommunicator implements Closeable { autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig()); } - List eventsAsRules = Collections.emptyList(); - if (scheduledEvents.isEmpty() == false) { - eventsAsRules = scheduledEvents.stream() - .map(e -> e.toDetectionRule(job.getAnalysisConfig().getBucketSpan())) - .collect(Collectors.toList()); - } - - // All detection rules for a detector must be updated together as the update - // wipes any previously set rules. - // Build a single list of rules for events and detection rules. - List> rules = new ArrayList<>(job.getAnalysisConfig().getDetectors().size()); - for (int i = 0; i < job.getAnalysisConfig().getDetectors().size(); i++) { - List detectorRules = new ArrayList<>(eventsAsRules); - rules.add(detectorRules); + // Filters have to be written before detectors + if (updateParams.getFilter() != null) { + autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter())); } // Add detector rules if (updateParams.getDetectorUpdates() != null) { for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) { if (update.getRules() != null) { - rules.get(update.getDetectorIndex()).addAll(update.getRules()); + autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules()); } } } - for (int i = 0; i < job.getAnalysisConfig().getDetectors().size(); i++) { - if (!rules.get(i).isEmpty()) { - autodetectProcess.writeUpdateDetectorRulesMessage(i, rules.get(i)); - } + // Add scheduled events; null means there's no update but an empty list means we should clear any events in the process + if (scheduledEvents != null) { + autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan()); } return null; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 29ff2fabea8..44ab1753e61 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -5,7 +5,10 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -74,6 +77,23 @@ public interface AutodetectProcess extends Closeable { void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException; + /** + * Write message to update the filters + * + * @param filters the filters to update + * @throws IOException If the write fails + */ + void writeUpdateFiltersMessage(List filters) throws IOException; + + /** + * Write message to update the scheduled events + * + * @param events Scheduled events + * @param bucketSpan The job bucket span + * @throws IOException If the write fails + */ + void writeUpdateScheduledEventsMessage(List events, TimeValue bucketSpan) throws IOException; + /** * Flush the job pushing any stale data into autodetect. * Every flush command generates a unique flush Id which will be output diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 08bae3daecc..9e946763a18 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -268,7 +268,7 @@ public class AutodetectProcessManager extends AbstractComponent { ActionListener> eventsListener = ActionListener.wrap( events -> { - communicator.writeUpdateProcessMessage(updateParams, events.results(), (aVoid, e) -> { + communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> { if (e == null) { handler.accept(null); } else { @@ -283,7 +283,7 @@ public class AutodetectProcessManager extends AbstractComponent { ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(Long.toString(new Date().getTime())); jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); } else { - eventsListener.onResponse(new QueryPage<>(Collections.emptyList(), 0, ScheduledEvent.RESULTS_FIELD)); + eventsListener.onResponse(null); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 9d27c8bb62b..3de56d0b169 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -5,7 +5,10 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; @@ -71,6 +74,14 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { } + @Override + public void writeUpdateFiltersMessage(List filters) throws IOException { + } + + @Override + public void writeUpdateScheduledEventsMessage(List events, TimeValue bucketSpan) throws IOException { + } + /** * Accept the request do nothing with it but write the flush acknowledgement to {@link #readAutodetectResults()} * @param params Should interim results be generated diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 7ace14708ff..51f2f907ec1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -8,8 +8,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.ml.MachineLearningClientActionPlugin; +import org.elasticsearch.xpack.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; @@ -159,6 +163,18 @@ class NativeAutodetectProcess implements AutodetectProcess { writer.writeUpdateDetectorRulesMessage(detectorIndex, rules); } + @Override + public void writeUpdateFiltersMessage(List filters) throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); + writer.writeUpdateFiltersMessage(filters); + } + + @Override + public void writeUpdateScheduledEventsMessage(List events, TimeValue bucketSpan) throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); + writer.writeUpdateScheduledEventsMessage(events, bucketSpan); + } + @Override public String flushJob(FlushJobParams params) throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java index 64947c57d32..d6f65ea4e11 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java @@ -7,33 +7,105 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.common.Nullable; import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import java.util.List; +import java.util.Objects; public final class UpdateParams { + private final String jobId; private final ModelPlotConfig modelPlotConfig; private final List detectorUpdates; + private final MlFilter filter; private final boolean updateScheduledEvents; - public UpdateParams(@Nullable ModelPlotConfig modelPlotConfig, - @Nullable List detectorUpdates, - boolean updateScheduledEvents) { + private UpdateParams(String jobId, @Nullable ModelPlotConfig modelPlotConfig, @Nullable List detectorUpdates, + @Nullable MlFilter filter, boolean updateScheduledEvents) { + this.jobId = Objects.requireNonNull(jobId); this.modelPlotConfig = modelPlotConfig; this.detectorUpdates = detectorUpdates; + this.filter = filter; this.updateScheduledEvents = updateScheduledEvents; } + public String getJobId() { + return jobId; + } + + @Nullable public ModelPlotConfig getModelPlotConfig() { return modelPlotConfig; } + @Nullable public List getDetectorUpdates() { return detectorUpdates; } + @Nullable + public MlFilter getFilter() { + return filter; + } + public boolean isUpdateScheduledEvents() { return updateScheduledEvents; } + + public static UpdateParams fromJobUpdate(JobUpdate jobUpdate) { + return new Builder(jobUpdate.getJobId()) + .modelPlotConfig(jobUpdate.getModelPlotConfig()) + .detectorUpdates(jobUpdate.getDetectorUpdates()) + .build(); + } + + public static UpdateParams filterUpdate(String jobId, MlFilter filter) { + return new Builder(jobId).filter(filter).build(); + } + + public static UpdateParams scheduledEventsUpdate(String jobId) { + return new Builder(jobId).updateScheduledEvents(true).build(); + } + + public static Builder builder(String jobId) { + return new Builder(jobId); + } + + public static class Builder { + + private String jobId; + private ModelPlotConfig modelPlotConfig; + private List detectorUpdates; + private MlFilter filter; + private boolean updateScheduledEvents; + + public Builder(String jobId) { + this.jobId = Objects.requireNonNull(jobId); + } + + public Builder modelPlotConfig(ModelPlotConfig modelPlotConfig) { + this.modelPlotConfig = modelPlotConfig; + return this; + } + + public Builder detectorUpdates(List detectorUpdates) { + this.detectorUpdates = detectorUpdates; + return this; + } + + public Builder filter(MlFilter filter) { + this.filter = filter; + return this; + } + + public Builder updateScheduledEvents(boolean updateScheduledEvents) { + this.updateScheduledEvents = updateScheduledEvents; + return this; + } + + public UpdateParams build() { + return new UpdateParams(jobId, modelPlotConfig, detectorUpdates, filter, updateScheduledEvents); + } + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 6b6f82c98a4..7255de3bb17 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -5,11 +5,14 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; @@ -211,6 +214,24 @@ public class ControlMsgToProcessWriter { writeMessage(stringBuilder.toString()); } + public void writeUpdateFiltersMessage(List filters) throws IOException { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(UPDATE_MESSAGE_CODE).append("[filters]\n"); + new MlFilterWriter(filters, stringBuilder).write(); + writeMessage(stringBuilder.toString()); + } + + public void writeUpdateScheduledEventsMessage(List events, TimeValue bucketSpan) throws IOException { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(UPDATE_MESSAGE_CODE).append("[scheduledEvents]\n"); + if (events.isEmpty()) { + stringBuilder.append("clear = true\n"); + } else { + new ScheduledEventsWriter(events, bucketSpan, stringBuilder).write(); + } + writeMessage(stringBuilder.toString()); + } + /** * Transform the supplied control message to length encoded values and * write to the OutputStream. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriter.java index 2cacbbe539b..88b842d2ba5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriter.java @@ -22,11 +22,9 @@ import org.elasticsearch.xpack.ml.utils.MlStrings; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.EQUALS; @@ -37,10 +35,6 @@ public class FieldConfigWriter { private static final String INFLUENCER_PREFIX = "influencer."; private static final String CATEGORIZATION_FIELD_OPTION = " categorizationfield="; private static final String CATEGORIZATION_FILTER_PREFIX = "categorizationfilter."; - private static final String FILTER_PREFIX = "filter."; - private static final String SCHEDULED_EVENT_PREFIX = "scheduledevent."; - private static final String SCHEDULED_EVENT_DESCRIPTION_SUFFIX = ".description"; - // Note: for the Engine API summarycountfield is currently passed as a // command line option to autodetect rather than in the field config file @@ -68,8 +62,9 @@ public class FieldConfigWriter { public void write() throws IOException { StringBuilder contents = new StringBuilder(); - writeDetectors(contents); + // Filters have to be written before the detectors writeFilters(contents); + writeDetectors(contents); writeScheduledEvents(contents); if (MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA == false) { @@ -141,46 +136,12 @@ public class FieldConfigWriter { } private void writeFilters(StringBuilder buffer) throws IOException { - for (MlFilter filter : filters) { - - StringBuilder filterAsJson = new StringBuilder(); - filterAsJson.append('['); - boolean first = true; - for (String item : filter.getItems()) { - if (first) { - first = false; - } else { - filterAsJson.append(','); - } - filterAsJson.append('"'); - filterAsJson.append(item); - filterAsJson.append('"'); - } - filterAsJson.append(']'); - buffer.append(FILTER_PREFIX).append(filter.getId()).append(EQUALS).append(filterAsJson) - .append(NEW_LINE); - } + new MlFilterWriter(filters, buffer).write(); } - private void writeScheduledEvents(StringBuilder contents) throws IOException { - if (scheduledEvents.isEmpty()) { - return; - } - - int eventIndex = 0; - for (ScheduledEvent event: scheduledEvents) { - - contents.append(SCHEDULED_EVENT_PREFIX).append(eventIndex) - .append(SCHEDULED_EVENT_DESCRIPTION_SUFFIX).append(EQUALS) - .append(event.getDescription()) - .append(NEW_LINE); - - contents.append(SCHEDULED_EVENT_PREFIX).append(eventIndex) - .append(DETECTOR_RULES_SUFFIX).append(EQUALS); - writeDetectionRulesJson(Collections.singletonList(event.toDetectionRule(config.getBucketSpan())), contents); - contents.append(NEW_LINE); - - ++eventIndex; + private void writeScheduledEvents(StringBuilder buffer) throws IOException { + if (scheduledEvents.isEmpty() == false) { + new ScheduledEventsWriter(scheduledEvents, config.getBucketSpan(), buffer).write(); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/MlFilterWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/MlFilterWriter.java new file mode 100644 index 00000000000..9e8c8452cab --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/MlFilterWriter.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect.writer; + +import org.elasticsearch.xpack.ml.job.config.MlFilter; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; + +import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.EQUALS; +import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.NEW_LINE; + +public class MlFilterWriter { + + private static final String FILTER_PREFIX = "filter."; + + private final Collection filters; + private final StringBuilder buffer; + + public MlFilterWriter(Collection filters, StringBuilder buffer) { + this.filters = Objects.requireNonNull(filters); + this.buffer = Objects.requireNonNull(buffer); + } + + public void write() throws IOException { + for (MlFilter filter : filters) { + + StringBuilder filterAsJson = new StringBuilder(); + filterAsJson.append('['); + boolean first = true; + for (String item : filter.getItems()) { + if (first) { + first = false; + } else { + filterAsJson.append(','); + } + filterAsJson.append('"'); + filterAsJson.append(item); + filterAsJson.append('"'); + } + filterAsJson.append(']'); + buffer.append(FILTER_PREFIX).append(filter.getId()).append(EQUALS).append(filterAsJson).append(NEW_LINE); + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ScheduledEventsWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ScheduledEventsWriter.java new file mode 100644 index 00000000000..229252abd8f --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ScheduledEventsWriter.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect.writer; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.xpack.ml.calendars.ScheduledEvent; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; + +import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.EQUALS; +import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.NEW_LINE; + +public class ScheduledEventsWriter { + + private static final String SCHEDULED_EVENT_PREFIX = "scheduledevent."; + private static final String SCHEDULED_EVENT_DESCRIPTION_SUFFIX = ".description"; + private static final String RULES_SUFFIX = ".rules"; + + private final Collection scheduledEvents; + private final TimeValue bucketSpan; + private final StringBuilder buffer; + + public ScheduledEventsWriter(Collection scheduledEvents, TimeValue bucketSpan, StringBuilder buffer) { + this.scheduledEvents = Objects.requireNonNull(scheduledEvents); + this.bucketSpan = Objects.requireNonNull(bucketSpan); + this.buffer = Objects.requireNonNull(buffer); + } + + public void write() throws IOException { + int eventIndex = 0; + for (ScheduledEvent event: scheduledEvents) { + + StringBuilder eventContent = new StringBuilder(); + eventContent.append(SCHEDULED_EVENT_PREFIX).append(eventIndex) + .append(SCHEDULED_EVENT_DESCRIPTION_SUFFIX).append(EQUALS) + .append(event.getDescription()) + .append(NEW_LINE); + + eventContent.append(SCHEDULED_EVENT_PREFIX).append(eventIndex).append(RULES_SUFFIX).append(EQUALS); + try (XContentBuilder contentBuilder = XContentFactory.jsonBuilder()) { + contentBuilder.startArray(); + event.toDetectionRule(bucketSpan).toXContent(contentBuilder, null); + contentBuilder.endArray(); + eventContent.append(contentBuilder.string()); + } + + eventContent.append(NEW_LINE); + buffer.append(eventContent.toString()); + + ++eventIndex; + } + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/UpdateProcessActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/UpdateProcessActionRequestTests.java index 308966a95ea..6f7c3e0b62e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/UpdateProcessActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/UpdateProcessActionRequestTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.config.MlFilter; +import org.elasticsearch.xpack.ml.job.config.MlFilterTests; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import java.util.ArrayList; @@ -29,7 +31,11 @@ public class UpdateProcessActionRequestTests extends AbstractStreamableTestCase< updates.add(new JobUpdate.DetectorUpdate(randomInt(), randomAlphaOfLength(10), null)); } } - return new UpdateProcessAction.Request(randomAlphaOfLength(10), config, updates, randomBoolean()); + MlFilter filter = null; + if (randomBoolean()) { + filter = MlFilterTests.createTestFilter(); + } + return new UpdateProcessAction.Request(randomAlphaOfLength(10), config, updates, filter, randomBoolean()); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 49b0f0a6395..860dea2b333 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -28,25 +28,37 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.MlFilter; +import org.elasticsearch.xpack.ml.job.config.RuleCondition; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.junit.Before; import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.List; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class JobManagerTests extends ESTestCase { @@ -57,6 +69,7 @@ public class JobManagerTests extends ESTestCase { private ClusterService clusterService; private JobProvider jobProvider; private Auditor auditor; + private UpdateJobProcessNotifier updateJobProcessNotifier; @Before public void setup() throws Exception { @@ -67,6 +80,7 @@ public class JobManagerTests extends ESTestCase { clusterService = mock(ClusterService.class); jobProvider = mock(JobProvider.class); auditor = mock(Auditor.class); + updateJobProcessNotifier = mock(UpdateJobProcessNotifier.class); } public void testGetJobOrThrowIfUnknown_GivenUnknownJob() { @@ -160,6 +174,98 @@ public class JobManagerTests extends ESTestCase { }); } + public void testUpdateProcessOnFilterChanged() { + Detector.Builder detectorReferencingFilter = new Detector.Builder("count", null); + detectorReferencingFilter.setByFieldName("foo"); + RuleCondition.createCategorical("foo", "foo_filter"); + DetectionRule filterRule = new DetectionRule.Builder(Collections.singletonList( + RuleCondition.createCategorical("foo", "foo_filter"))).build(); + detectorReferencingFilter.setRules(Collections.singletonList(filterRule)); + AnalysisConfig.Builder filterAnalysisConfig = new AnalysisConfig.Builder(Collections.singletonList( + detectorReferencingFilter.build())); + + Job.Builder jobReferencingFilter1 = buildJobBuilder("job-referencing-filter-1"); + jobReferencingFilter1.setAnalysisConfig(filterAnalysisConfig); + Job.Builder jobReferencingFilter2 = buildJobBuilder("job-referencing-filter-2"); + jobReferencingFilter2.setAnalysisConfig(filterAnalysisConfig); + Job.Builder jobReferencingFilter3 = buildJobBuilder("job-referencing-filter-3"); + jobReferencingFilter3.setAnalysisConfig(filterAnalysisConfig); + Job.Builder jobWithoutFilter = buildJobBuilder("job-without-filter"); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(jobReferencingFilter1.build(), false); + mlMetadata.putJob(jobReferencingFilter2.build(), false); + mlMetadata.putJob(jobReferencingFilter3.build(), false); + mlMetadata.putJob(jobWithoutFilter.build(), false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(jobReferencingFilter1.getId(), "node_id", JobState.OPENED, tasksBuilder); + addJobTask(jobReferencingFilter2.getId(), "node_id", JobState.OPENED, tasksBuilder); + addJobTask(jobWithoutFilter.getId(), "node_id", JobState.OPENED, tasksBuilder); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + .putCustom(MLMetadataField.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + JobManager jobManager = createJobManager(); + + MlFilter filter = new MlFilter("foo_filter", Arrays.asList("a", "b")); + + jobManager.updateProcessOnFilterChanged(filter); + + ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); + verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture()); + + List capturedUpdateParams = updateParamsCaptor.getAllValues(); + assertThat(capturedUpdateParams.size(), equalTo(2)); + assertThat(capturedUpdateParams.get(0).getJobId(), equalTo(jobReferencingFilter1.getId())); + assertThat(capturedUpdateParams.get(0).getFilter(), equalTo(filter)); + assertThat(capturedUpdateParams.get(1).getJobId(), equalTo(jobReferencingFilter2.getId())); + assertThat(capturedUpdateParams.get(1).getFilter(), equalTo(filter)); + } + + public void testUpdateProcessOnCalendarChanged() { + Job.Builder job1 = buildJobBuilder("job-1"); + Job.Builder job2 = buildJobBuilder("job-2"); + Job.Builder job3 = buildJobBuilder("job-3"); + Job.Builder job4 = buildJobBuilder("job-4"); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(job1.build(), false); + mlMetadata.putJob(job2.build(), false); + mlMetadata.putJob(job3.build(), false); + mlMetadata.putJob(job4.build(), false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job1.getId(), "node_id", JobState.OPENED, tasksBuilder); + addJobTask(job2.getId(), "node_id", JobState.OPENED, tasksBuilder); + addJobTask(job3.getId(), "node_id", JobState.OPENED, tasksBuilder); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + .putCustom(MLMetadataField.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + JobManager jobManager = createJobManager(); + + jobManager.updateProcessOnCalendarChanged(Arrays.asList("job-1", "job-3", "job-4")); + + ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); + verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture()); + + List capturedUpdateParams = updateParamsCaptor.getAllValues(); + assertThat(capturedUpdateParams.size(), equalTo(2)); + assertThat(capturedUpdateParams.get(0).getJobId(), equalTo(job1.getId())); + assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true)); + assertThat(capturedUpdateParams.get(1).getJobId(), equalTo(job3.getId())); + assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); + } + private Job.Builder createJob() { Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("client"); @@ -176,8 +282,7 @@ public class JobManagerTests extends ESTestCase { ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), Collections.singleton(MachineLearningClientActionPlugin.MAX_MODEL_MEMORY_LIMIT)); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - UpdateJobProcessNotifier notifier = mock(UpdateJobProcessNotifier.class); - return new JobManager(environment, environment.settings(), jobProvider, clusterService, auditor, client, notifier); + return new JobManager(environment, environment.settings(), jobProvider, clusterService, auditor, client, updateJobProcessNotifier); } private ClusterState createClusterState() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index c8d8d48f55e..900a86c6c28 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -32,8 +32,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.junit.Before; -import org.mockito.ArgumentCaptor; -import org.mockito.InOrder; import org.mockito.Mockito; import java.io.ByteArrayInputStream; @@ -96,38 +94,20 @@ public class AutodetectCommunicatorTests extends ESTestCase { List conditions = Collections.singletonList( RuleCondition.createCategorical("foo", "bar")); + DetectionRule updatedRule = new DetectionRule.Builder(conditions).build(); List detectorUpdates = Collections.singletonList( - new JobUpdate.DetectorUpdate(0, "updated description", - Collections.singletonList(new DetectionRule.Builder(conditions).build()))); + new JobUpdate.DetectorUpdate(0, "updated description", Collections.singletonList(updatedRule))); - UpdateParams updateParams = new UpdateParams(null, detectorUpdates, true); - List events = Collections.singletonList(ScheduledEventTests.createScheduledEvent(randomAlphaOfLength(10))); + UpdateParams updateParams = UpdateParams.builder("foo").detectorUpdates(detectorUpdates).build(); + List events = Collections.singletonList( + ScheduledEventTests.createScheduledEvent(randomAlphaOfLength(10))); communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {})); - // There are 2 detectors both will be updated with the rule for the scheduled event. - // The first has an additional update rule - ArgumentCaptor captor = ArgumentCaptor.forClass(List.class); - InOrder inOrder = Mockito.inOrder(process); - inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(0), captor.capture()); - assertEquals(2, captor.getValue().size()); - inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(1), captor.capture()); - assertEquals(1, captor.getValue().size()); + verify(process).writeUpdateDetectorRulesMessage(eq(0), eq(Collections.singletonList(updatedRule))); + verify(process).writeUpdateScheduledEventsMessage(events, AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN); verify(process).isProcessAlive(); verifyNoMoreInteractions(process); - - - // This time there is a single detector update and no scheduled events - detectorUpdates = Collections.singletonList( - new JobUpdate.DetectorUpdate(1, "updated description", - Collections.singletonList(new DetectionRule.Builder(conditions).build()))); - updateParams = new UpdateParams(null, detectorUpdates, true); - communicator.writeUpdateProcessMessage(updateParams, Collections.emptyList(), ((aVoid, e) -> {})); - - inOrder = Mockito.inOrder(process); - inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(1), captor.capture()); - assertEquals(1, captor.getValue().size()); - verify(process, times(2)).isProcessAlive(); } public void testFlushJob() throws IOException { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 1dde72d1550..8645dbb4f36 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -475,7 +475,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { verify(manager).setJobState(any(), eq(JobState.FAILED)); } - public void testwriteUpdateProcessMessage() { + public void testWriteUpdateProcessMessage() { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); ModelPlotConfig modelConfig = mock(ModelPlotConfig.class); @@ -483,9 +483,9 @@ public class AutodetectProcessManagerTests extends ESTestCase { List detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules)); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - UpdateParams updateParams = new UpdateParams(modelConfig, detectorUpdates, false); + UpdateParams updateParams = UpdateParams.builder("foo").modelPlotConfig(modelConfig).detectorUpdates(detectorUpdates).build(); manager.writeUpdateProcessMessage(jobTask, updateParams, e -> {}); - verify(communicator).writeUpdateProcessMessage(same(updateParams), eq(Collections.emptyList()), any()); + verify(communicator).writeUpdateProcessMessage(same(updateParams), eq(null), any()); } public void testJobHasActiveAutodetectProcess() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java index 8e50c98ccd9..114b7809988 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java @@ -5,10 +5,13 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.ml.job.config.Condition; import org.elasticsearch.xpack.ml.job.config.Connective; import org.elasticsearch.xpack.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.config.Operator; import org.elasticsearch.xpack.ml.job.config.RuleCondition; @@ -21,6 +24,7 @@ import org.mockito.InOrder; import org.mockito.Mockito; import java.io.IOException; +import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -205,6 +209,66 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { verifyNoMoreInteractions(lengthEncodedWriter); } + public void testWriteUpdateFiltersMessage() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + + MlFilter filter1 = new MlFilter("filter_1", Arrays.asList("a")); + MlFilter filter2 = new MlFilter("filter_2", Arrays.asList("b", "c")); + + writer.writeUpdateFiltersMessage(Arrays.asList(filter1, filter2)); + + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(2); + inOrder.verify(lengthEncodedWriter, times(1)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("u[filters]\nfilter.filter_1 = [\"a\"]\nfilter.filter_2 = [\"b\",\"c\"]\n"); + verifyNoMoreInteractions(lengthEncodedWriter); + } + + public void testWriteUpdateScheduledEventsMessage() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + + ScheduledEvent.Builder event1 = new ScheduledEvent.Builder(); + event1.calendarId("moon"); + event1.description("new year"); + event1.startTime(ZonedDateTime.parse("2018-01-01T00:00:00Z")); + event1.endTime(ZonedDateTime.parse("2018-01-02T00:00:00Z")); + + ScheduledEvent.Builder event2 = new ScheduledEvent.Builder(); + event2.calendarId("moon"); + event2.description("Jan maintenance day"); + event2.startTime(ZonedDateTime.parse("2018-01-06T00:00:00Z")); + event2.endTime(ZonedDateTime.parse("2018-01-07T00:00:00Z")); + + writer.writeUpdateScheduledEventsMessage(Arrays.asList(event1.build(), event2.build()), TimeValue.timeValueHours(1)); + + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(2); + inOrder.verify(lengthEncodedWriter, times(1)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("u[scheduledEvents]\n" + + "scheduledevent.0.description = new year\n" + + "scheduledevent.0.rules = [{\"actions\":[\"filter_results\",\"skip_sampling\"],\"conditions_connective\":\"and\"," + + "\"conditions\":[{\"type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1514764800\"}}," + + "{\"type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1514851200\"}}]}]\n" + + "scheduledevent.1.description = Jan maintenance day\n" + + "scheduledevent.1.rules = [{\"actions\":[\"filter_results\",\"skip_sampling\"],\"conditions_connective\":\"and\"," + + "\"conditions\":[{\"type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1515196800\"}}," + + "{\"type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1515283200\"}}]}]\n" + ); + verifyNoMoreInteractions(lengthEncodedWriter); + } + + public void testWriteUpdateScheduledEventsMessage_GivenEmpty() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + + writer.writeUpdateScheduledEventsMessage(Collections.emptyList(), TimeValue.timeValueHours(1)); + + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(2); + inOrder.verify(lengthEncodedWriter, times(1)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("u[scheduledEvents]\nclear = true\n"); + verifyNoMoreInteractions(lengthEncodedWriter); + } + private static List createRule(String value) { Condition condition = new Condition(Operator.GT, value); return Collections.singletonList(RuleCondition.createNumerical(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java index 670e7f0482a..e82bbd6d0f9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java @@ -228,9 +228,9 @@ public class FieldConfigWriterTests extends ESTestCase { createFieldConfigWriter().write(); - verify(writer).write("detector.0.clause = count\n" + - "filter.filter_1 = [\"a\",\"b\"]\n" + - "filter.filter_2 = [\"c\",\"d\"]\n"); + verify(writer).write("filter.filter_1 = [\"a\",\"b\"]\n" + + "filter.filter_2 = [\"c\",\"d\"]\n" + + "detector.0.clause = count\n"); verifyNoMoreInteractions(writer); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/MlFilterWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/MlFilterWriterTests.java new file mode 100644 index 00000000000..a1188e6e367 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/MlFilterWriterTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect.writer; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.config.MlFilter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class MlFilterWriterTests extends ESTestCase { + + public void testWrite_GivenEmpty() throws IOException { + StringBuilder buffer = new StringBuilder(); + new MlFilterWriter(Collections.emptyList(), buffer).write(); + + assertThat(buffer.toString().isEmpty(), is(true)); + } + + public void testWrite() throws IOException { + List filters = new ArrayList<>(); + filters.add(new MlFilter("filter_1", Arrays.asList("a", "b"))); + filters.add(new MlFilter("filter_2", Arrays.asList("c", "d"))); + + StringBuilder buffer = new StringBuilder(); + new MlFilterWriter(filters, buffer).write(); + + assertThat(buffer.toString(), equalTo("filter.filter_1 = [\"a\",\"b\"]\nfilter.filter_2 = [\"c\",\"d\"]\n")); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ScheduledEventsWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ScheduledEventsWriterTests.java new file mode 100644 index 00000000000..6a379aa0221 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ScheduledEventsWriterTests.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect.writer; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.calendars.ScheduledEvent; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class ScheduledEventsWriterTests extends ESTestCase { + + public void testWrite_GivenEmpty() throws IOException { + StringBuilder buffer = new StringBuilder(); + new ScheduledEventsWriter(Collections.emptyList(), TimeValue.timeValueHours(1), buffer).write(); + + assertThat(buffer.toString().isEmpty(), is(true)); + } + + public void testWrite() throws IOException { + List events = new ArrayList<>(); + events.add(new ScheduledEvent.Builder().description("Black Friday") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC)) + .calendarId("calendar_id").build()); + events.add(new ScheduledEvent.Builder().description("Blue Monday") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC)) + .calendarId("calendar_id").build()); + + StringBuilder buffer = new StringBuilder(); + new ScheduledEventsWriter(events, TimeValue.timeValueHours(1), buffer).write(); + + String expectedString = "scheduledevent.0.description = Black Friday\n" + + "scheduledevent.0.rules = [{\"actions\":[\"filter_results\",\"skip_sampling\"],\"conditions_connective\":\"and\"," + + "\"conditions\":[{\"type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1511395200\"}}," + + "{\"type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1515369600\"}}]}]\n" + + "scheduledevent.1.description = Blue Monday\n" + + "scheduledevent.1.rules = [{\"actions\":[\"filter_results\",\"skip_sampling\"],\"conditions_connective\":\"and\"," + + "\"conditions\":[{\"type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1519603200\"}}," + + "{\"type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1519862400\"}}]}]" + + "\n"; + assertThat(buffer.toString(), equalTo(expectedString)); + } +} \ No newline at end of file