[ML] Update process when filters or calendars are updated (elastic/x-pack-elasticsearch#3385)

Relates elastic/x-pack-elasticsearch#3325

Original commit: elastic/x-pack-elasticsearch@9da4973cda
This commit is contained in:
Dimitris Athanasiou 2018-01-12 17:48:07 +00:00 committed by GitHub
parent 86e9f63b19
commit 73f8559fca
33 changed files with 698 additions and 229 deletions

View File

@ -20,12 +20,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.watcher.support.Exceptions;
import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import static org.elasticsearch.action.ValidateActions.addValidationError;

View File

@ -139,6 +139,4 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
}
}
}

View File

@ -22,7 +22,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MLMetadataField;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -126,6 +125,5 @@ public class UpdateCalendarJobAction extends Action<UpdateCalendarJobAction.Requ
super(client, INSTANCE, new Request());
}
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import java.io.IOException;
@ -111,16 +112,18 @@ public class UpdateProcessAction extends
private ModelPlotConfig modelPlotConfig;
private List<JobUpdate.DetectorUpdate> detectorUpdates;
private MlFilter filter;
private boolean updateScheduledEvents = false;
Request() {
}
public Request(String jobId, ModelPlotConfig modelPlotConfig, List<JobUpdate.DetectorUpdate> detectorUpdates,
public Request(String jobId, ModelPlotConfig modelPlotConfig, List<JobUpdate.DetectorUpdate> detectorUpdates, MlFilter filter,
boolean updateScheduledEvents) {
super(jobId);
this.modelPlotConfig = modelPlotConfig;
this.detectorUpdates = detectorUpdates;
this.filter = filter;
this.updateScheduledEvents = updateScheduledEvents;
}
@ -132,6 +135,10 @@ public class UpdateProcessAction extends
return detectorUpdates;
}
public MlFilter getFilter() {
return filter;
}
public boolean isUpdateScheduledEvents() {
return updateScheduledEvents;
}
@ -144,6 +151,7 @@ public class UpdateProcessAction extends
detectorUpdates = in.readList(JobUpdate.DetectorUpdate::new);
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
filter = in.readOptionalWriteable(MlFilter::new);
updateScheduledEvents = in.readBoolean();
}
}
@ -158,13 +166,14 @@ public class UpdateProcessAction extends
out.writeList(detectorUpdates);
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeOptionalWriteable(filter);
out.writeBoolean(updateScheduledEvents);
}
}
@Override
public int hashCode() {
return Objects.hash(getJobId(), modelPlotConfig, detectorUpdates, updateScheduledEvents);
return Objects.hash(getJobId(), modelPlotConfig, detectorUpdates, filter, updateScheduledEvents);
}
@Override
@ -180,8 +189,8 @@ public class UpdateProcessAction extends
return Objects.equals(getJobId(), other.getJobId()) &&
Objects.equals(modelPlotConfig, other.modelPlotConfig) &&
Objects.equals(detectorUpdates, other.detectorUpdates) &&
Objects.equals(filter, other.filter) &&
Objects.equals(updateScheduledEvents, other.updateScheduledEvents);
}
}
}

View File

@ -29,7 +29,6 @@ import java.util.Objects;
public class JobUpdate implements Writeable, ToXContentObject {
public static final ParseField DETECTORS = new ParseField("detectors");
public static final ParseField UPDATE_SCHEDULED_EVENTS = new ParseField("update_scheduled_events");
public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"job_update", args -> new Builder((String) args[0]));
@ -50,7 +49,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
PARSER.declareBoolean(Builder::setUpdateScheduledEvents, UPDATE_SCHEDULED_EVENTS);
}
/**
@ -75,7 +73,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private final Long establishedModelMemory;
private final boolean updateScheduledEvents;
private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
@Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@ -83,7 +80,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
@Nullable Long establishedModelMemory, boolean updateScheduledEvents) {
@Nullable Long establishedModelMemory) {
this.jobId = jobId;
this.groups = groups;
this.description = description;
@ -98,7 +95,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
this.customSettings = customSettings;
this.modelSnapshotId = modelSnapshotId;
this.establishedModelMemory = establishedModelMemory;
this.updateScheduledEvents = updateScheduledEvents;
}
public JobUpdate(StreamInput in) throws IOException {
@ -133,12 +129,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
} else {
establishedModelMemory = null;
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
updateScheduledEvents = in.readBoolean();
} else {
updateScheduledEvents = false;
}
}
@Override
@ -168,10 +158,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeBoolean(updateScheduledEvents);
}
}
public String getJobId() {
@ -234,10 +220,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
return modelPlotConfig != null || detectorUpdates != null;
}
public boolean isUpdateScheduledEvents() {
return updateScheduledEvents;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -281,7 +263,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (establishedModelMemory != null) {
builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
}
builder.field(UPDATE_SCHEDULED_EVENTS.getPreferredName(), updateScheduledEvents);
builder.endObject();
return builder;
}
@ -418,15 +399,14 @@ public class JobUpdate implements Writeable, ToXContentObject {
&& Objects.equals(this.categorizationFilters, that.categorizationFilters)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
&& Objects.equals(this.updateScheduledEvents, that.updateScheduledEvents);
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory);
}
@Override
public int hashCode() {
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, establishedModelMemory, updateScheduledEvents);
modelSnapshotId, establishedModelMemory);
}
public static class DetectorUpdate implements Writeable, ToXContentObject {
@ -536,7 +516,6 @@ public class JobUpdate implements Writeable, ToXContentObject {
private Map<String, Object> customSettings;
private String modelSnapshotId;
private Long establishedModelMemory;
private boolean updateScheduledEvents = false;
public Builder(String jobId) {
this.jobId = jobId;
@ -612,15 +591,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
return this;
}
public Builder setUpdateScheduledEvents(boolean updateScheduledEvents) {
this.updateScheduledEvents = updateScheduledEvents;
return this;
}
public JobUpdate build() {
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, establishedModelMemory, updateScheduledEvents);
modelSnapshotId, establishedModelMemory);
}
}
}

View File

@ -177,8 +177,8 @@ public class RuleCondition implements ToXContentObject, Writeable {
return Objects.hash(type, fieldName, fieldValue, condition, filterId);
}
public static RuleCondition createCategorical(String fieldName, String valueFilter) {
return new RuleCondition(RuleConditionType.CATEGORICAL, fieldName, null, null, valueFilter);
public static RuleCondition createCategorical(String fieldName, String filterId) {
return new RuleCondition(RuleConditionType.CATEGORICAL, fieldName, null, null, filterId);
}
public static RuleCondition createNumerical(RuleConditionType conditionType, String fieldName, String fieldValue,

View File

@ -7,9 +7,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@ -26,7 +23,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
@ -34,15 +32,19 @@ import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
public class TransportDeleteCalendarAction extends HandledTransportAction<DeleteCalendarAction.Request, DeleteCalendarAction.Response> {
private final Client client;
private final JobManager jobManager;
private final JobProvider jobProvider;
@Inject
public TransportDeleteCalendarAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
Client client, JobManager jobManager, JobProvider jobProvider) {
super(settings, DeleteCalendarAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, DeleteCalendarAction.Request::new);
this.client = client;
this.jobManager = jobManager;
this.jobProvider = jobProvider;
}
@Override
@ -50,29 +52,25 @@ public class TransportDeleteCalendarAction extends HandledTransportAction<Delete
final String calendarId = request.getCalendarId();
GetRequest getRequest = new GetRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, Calendar.documentId(calendarId));
executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
if (getResponse.isExists() == false) {
listener.onFailure(new ResourceNotFoundException("Could not delete calendar [" + calendarId
+ "] because it does not exist"));
return;
}
// Delete calendar and events
DeleteByQueryRequest dbqRequest = buildDeleteByQuery(calendarId);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, dbqRequest, ActionListener.wrap(
response -> listener.onResponse(new DeleteCalendarAction.Response(true)),
listener::onFailure));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(ExceptionsHelper.serverError("Could not delete calendar [" + calendarId + "]", e));
}
}
ActionListener<Calendar> calendarListener = ActionListener.wrap(
calendar -> {
// Delete calendar and events
DeleteByQueryRequest dbqRequest = buildDeleteByQuery(calendarId);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, dbqRequest, ActionListener.wrap(
response -> {
if (response.getDeleted() == 0) {
listener.onFailure(new ResourceNotFoundException("No calendar with id [" + calendarId + "]"));
return;
}
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
listener.onResponse(new DeleteCalendarAction.Response(true));
},
listener::onFailure));
},
listener::onFailure
);
jobProvider.calendar(calendarId, calendarListener);
}
private DeleteByQueryRequest buildDeleteByQuery(String calendarId) {

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteAction;
@ -26,6 +25,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.util.Map;
@ -37,15 +38,19 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction<D
DeleteCalendarEventAction.Response> {
private final Client client;
private final JobProvider jobProvider;
private final JobManager jobManager;
@Inject
public TransportDeleteCalendarEventAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
Client client, JobProvider jobProvider, JobManager jobManager) {
super(settings, DeleteCalendarEventAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, DeleteCalendarEventAction.Request::new);
this.client = client;
this.jobProvider = jobProvider;
this.jobManager = jobManager;
}
@Override
@ -57,27 +62,34 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction<D
@Override
public void onResponse(GetResponse getResponse) {
if (getResponse.isExists() == false) {
listener.onFailure(new ResourceNotFoundException("Missing event [" + eventId + "]"));
listener.onFailure(new ResourceNotFoundException("No event with id [" + eventId + "]"));
return;
}
Map<String, Object> source = getResponse.getSourceAsMap();
String calendarId = (String) source.get(Calendar.ID.getPreferredName());
if (calendarId == null) {
listener.onFailure(new ElasticsearchStatusException("Event [" + eventId + "] does not have a valid "
+ Calendar.ID.getPreferredName(), RestStatus.BAD_REQUEST));
listener.onFailure(ExceptionsHelper.badRequestException("Event [" + eventId + "] does not have a valid "
+ Calendar.ID.getPreferredName()));
return;
}
if (calendarId.equals(request.getCalendarId()) == false) {
listener.onFailure(new ElasticsearchStatusException(
listener.onFailure(ExceptionsHelper.badRequestException(
"Event [" + eventId + "] has " + Calendar.ID.getPreferredName() +
" [" + calendarId + "] which does not match the request " + Calendar.ID.getPreferredName() +
" [" + request.getCalendarId() + "]", RestStatus.BAD_REQUEST));
" [" + request.getCalendarId() + "]"));
return;
}
deleteEvent(eventId, listener);
ActionListener<Calendar> calendarListener = ActionListener.wrap(
calendar -> {
deleteEvent(eventId, calendar, listener);
},
listener::onFailure
);
jobProvider.calendar(calendarId, calendarListener);
}
@Override
@ -87,7 +99,7 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction<D
});
}
private void deleteEvent(String eventId, ActionListener<DeleteCalendarEventAction.Response> listener) {
private void deleteEvent(String eventId, Calendar calendar, ActionListener<DeleteCalendarEventAction.Response> listener) {
DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, eventId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@ -97,9 +109,9 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction<D
public void onResponse(DeleteResponse response) {
if (response.status() == RestStatus.NOT_FOUND) {
listener.onFailure(new ResourceNotFoundException("Could not delete event [" + eventId
+ "] because it does not exist"));
listener.onFailure(new ResourceNotFoundException("No event with id [" + eventId + "]"));
} else {
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
listener.onResponse(new DeleteCalendarEventAction.Response(true));
}
}

View File

@ -23,7 +23,9 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -39,16 +41,18 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<Po
private final Client client;
private final JobProvider jobProvider;
private final JobManager jobManager;
@Inject
public TransportPostCalendarEventsAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobProvider jobProvider) {
Client client, JobProvider jobProvider, JobManager jobManager) {
super(settings, PostCalendarEventsAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, PostCalendarEventsAction.Request::new);
this.client = client;
this.jobProvider = jobProvider;
this.jobManager = jobManager;
}
@Override
@ -56,8 +60,8 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<Po
ActionListener<PostCalendarEventsAction.Response> listener) {
List<ScheduledEvent> events = request.getScheduledEvents();
ActionListener<Boolean> calendarExistsListener = ActionListener.wrap(
r -> {
ActionListener<Calendar> calendarListener = ActionListener.wrap(
calendar -> {
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for (ScheduledEvent event: events) {
@ -78,6 +82,7 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<Po
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
listener.onResponse(new PostCalendarEventsAction.Response(events));
}
@ -90,13 +95,6 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<Po
},
listener::onFailure);
checkCalendarExists(request.getCalendarId(), calendarExistsListener);
}
private void checkCalendarExists(String calendarId, ActionListener<Boolean> listener) {
jobProvider.calendar(calendarId, ActionListener.wrap(
c -> listener.onResponse(true),
listener::onFailure
));
jobProvider.calendar(request.getCalendarId(), calendarListener);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.ml.MLMetadataField;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -42,16 +43,18 @@ public class TransportPutCalendarAction extends HandledTransportAction<PutCalend
private final Client client;
private final ClusterService clusterService;
private final JobManager jobManager;
@Inject
public TransportPutCalendarAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, ClusterService clusterService) {
Client client, ClusterService clusterService, JobManager jobManager) {
super(settings, PutCalendarAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, PutCalendarAction.Request::new);
this.client = client;
this.clusterService = clusterService;
this.jobManager = jobManager;
}
@Override
@ -78,6 +81,7 @@ public class TransportPutCalendarAction extends HandledTransportAction<PutCalend
new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
listener.onResponse(new PutCalendarAction.Response(calendar));
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -35,15 +36,17 @@ import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
public class TransportPutFilterAction extends HandledTransportAction<PutFilterAction.Request, PutFilterAction.Response> {
private final Client client;
private final JobManager jobManager;
@Inject
public TransportPutFilterAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
Client client, JobManager jobManager) {
super(settings, PutFilterAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, PutFilterAction.Request::new);
this.client = client;
this.jobManager = jobManager;
}
@Override
@ -64,6 +67,7 @@ public class TransportPutFilterAction extends HandledTransportAction<PutFilterAc
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse indexResponse) {
jobManager.updateProcessOnFilterChanged(filter);
listener.onResponse(new PutFilterAction.Response());
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MLMetadataField;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -24,16 +25,18 @@ public class TransportUpdateCalendarJobAction extends HandledTransportAction<Upd
private final ClusterService clusterService;
private final JobProvider jobProvider;
private final JobManager jobManager;
@Inject
public TransportUpdateCalendarJobAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, JobProvider jobProvider) {
ClusterService clusterService, JobProvider jobProvider, JobManager jobManager) {
super(settings, UpdateCalendarJobAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, UpdateCalendarJobAction.Request::new);
this.clusterService = clusterService;
this.jobProvider = jobProvider;
this.jobManager = jobManager;
}
@Override
@ -55,6 +58,9 @@ public class TransportUpdateCalendarJobAction extends HandledTransportAction<Upd
}
jobProvider.updateCalendar(request.getCalendarId(), request.getJobIdsToAdd(), request.getJobIdsToRemove(),
c -> listener.onResponse(new PutCalendarAction.Response(c)), listener::onFailure);
c -> {
jobManager.updateProcessOnCalendarChanged(c.getJobIds());
listener.onResponse(new PutCalendarAction.Response(c));
}, listener::onFailure);
}
}

View File

@ -41,10 +41,15 @@ public class TransportUpdateProcessAction extends TransportJobTaskAction<UpdateP
@Override
protected void taskOperation(UpdateProcessAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<UpdateProcessAction.Response> listener) {
UpdateParams updateParams = UpdateParams.builder(request.getJobId())
.modelPlotConfig(request.getModelPlotConfig())
.detectorUpdates(request.getDetectorUpdates())
.filter(request.getFilter())
.updateScheduledEvents(request.isUpdateScheduledEvents())
.build();
try {
processManager.writeUpdateProcessMessage(task,
new UpdateParams(request.getModelPlotConfig(),
request.getDetectorUpdates(), request.isUpdateScheduledEvents()),
processManager.writeUpdateProcessMessage(task, updateParams,
e -> {
if (e == null) {
listener.onResponse(new UpdateProcessAction.Response());

View File

@ -37,10 +37,12 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -280,18 +282,16 @@ public class JobManager extends AbstractComponent {
// nothing to do
return currentState;
}
changeWasRequired = true;
// No change is required if the fields that the C++ uses aren't being updated
changeWasRequired = jobUpdate.isAutodetectProcessUpdate();
return updateClusterState(updatedJob, true, currentState);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (changeWasRequired) {
PersistentTasksCustomMetaData persistentTasks =
newState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlMetadata.getJobState(jobId, persistentTasks);
if (jobState == JobState.OPENED) {
updateJobProcessNotifier.submitJobUpdate(jobUpdate);
if (isJobOpen(newState, jobId)) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate));
}
} else {
logger.debug("[{}] Ignored job update with no changes: {}", () -> jobId, () -> {
@ -308,12 +308,40 @@ public class JobManager extends AbstractComponent {
});
}
private boolean isJobOpen(ClusterState clusterState, String jobId) {
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlMetadata.getJobState(jobId, persistentTasks);
return jobState == JobState.OPENED;
}
ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) {
MlMetadata.Builder builder = createMlMetadataBuilder(currentState);
builder.putJob(job, overwrite);
return buildNewClusterState(currentState, builder);
}
public void updateProcessOnFilterChanged(MlFilter filter) {
ClusterState clusterState = clusterService.state();
QueryPage<Job> jobs = expandJobs("*", true, clusterService.state());
for (Job job : jobs.results()) {
if (isJobOpen(clusterState, job.getId())) {
Set<String> jobFilters = job.getAnalysisConfig().extractReferencedFilters();
if (jobFilters.contains(filter.getId())) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.filterUpdate(job.getId(), filter));
}
}
}
}
public void updateProcessOnCalendarChanged(List<String> calendarJobIds) {
ClusterState clusterState = clusterService.state();
for (String jobId : calendarJobIds) {
if (isJobOpen(clusterState, jobId)) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId));
}
}
}
public void deleteJob(DeleteJobAction.Request request, JobStorageDeletionTask task,
ActionListener<DeleteJobAction.Response> actionListener) {

View File

@ -17,7 +17,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import java.util.concurrent.LinkedBlockingQueue;
@ -30,7 +30,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
private final Client client;
private final ThreadPool threadPool;
private final LinkedBlockingQueue<JobUpdate> orderedJobUpdates = new LinkedBlockingQueue<>(1000);
private final LinkedBlockingQueue<UpdateParams> orderedJobUpdates = new LinkedBlockingQueue<>(1000);
private volatile ThreadPool.Cancellable cancellable;
@ -47,8 +47,8 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
});
}
boolean submitJobUpdate(JobUpdate jobUpdate) {
return orderedJobUpdates.offer(jobUpdate);
boolean submitJobUpdate(UpdateParams updateParams) {
return orderedJobUpdates.offer(updateParams);
}
@Override
@ -82,24 +82,17 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
private void processNextUpdate() {
try {
JobUpdate jobUpdate = orderedJobUpdates.poll();
if (jobUpdate != null) {
executeRemoteJobIfNecessary(jobUpdate);
UpdateParams updateParams = orderedJobUpdates.poll();
if (updateParams != null) {
executeRemoteJob(updateParams);
}
} catch (Exception e) {
logger.error("Unable while processing next job update", e);
}
}
void executeRemoteJobIfNecessary(JobUpdate update) {
// Do nothing if the fields that the C++ needs aren't being updated
if (update.isAutodetectProcessUpdate()) {
executeRemoteJob(update);
}
}
void executeRemoteJob(JobUpdate update) {
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(),
void executeRemoteJob(UpdateParams update) {
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(),
update.isUpdateScheduledEvents());
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request,
@ -126,5 +119,4 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
}
});
}
}

View File

@ -22,7 +22,6 @@ import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.CategorizationAnalyzerConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
@ -45,7 +44,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@ -58,7 +56,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class AutodetectCommunicator implements Closeable {
@ -215,35 +212,23 @@ public class AutodetectCommunicator implements Closeable {
autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig());
}
List<DetectionRule> eventsAsRules = Collections.emptyList();
if (scheduledEvents.isEmpty() == false) {
eventsAsRules = scheduledEvents.stream()
.map(e -> e.toDetectionRule(job.getAnalysisConfig().getBucketSpan()))
.collect(Collectors.toList());
}
// All detection rules for a detector must be updated together as the update
// wipes any previously set rules.
// Build a single list of rules for events and detection rules.
List<List<DetectionRule>> rules = new ArrayList<>(job.getAnalysisConfig().getDetectors().size());
for (int i = 0; i < job.getAnalysisConfig().getDetectors().size(); i++) {
List<DetectionRule> detectorRules = new ArrayList<>(eventsAsRules);
rules.add(detectorRules);
// Filters have to be written before detectors
if (updateParams.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter()));
}
// Add detector rules
if (updateParams.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) {
if (update.getRules() != null) {
rules.get(update.getDetectorIndex()).addAll(update.getRules());
autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
}
}
}
for (int i = 0; i < job.getAnalysisConfig().getDetectors().size(); i++) {
if (!rules.get(i).isEmpty()) {
autodetectProcess.writeUpdateDetectorRulesMessage(i, rules.get(i));
}
// Add scheduled events; null means there's no update but an empty list means we should clear any events in the process
if (scheduledEvents != null) {
autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan());
}
return null;

View File

@ -5,7 +5,10 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -74,6 +77,23 @@ public interface AutodetectProcess extends Closeable {
void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules)
throws IOException;
/**
* Write message to update the filters
*
* @param filters the filters to update
* @throws IOException If the write fails
*/
void writeUpdateFiltersMessage(List<MlFilter> filters) throws IOException;
/**
* Write message to update the scheduled events
*
* @param events Scheduled events
* @param bucketSpan The job bucket span
* @throws IOException If the write fails
*/
void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeValue bucketSpan) throws IOException;
/**
* Flush the job pushing any stale data into autodetect.
* Every flush command generates a unique flush Id which will be output

View File

@ -268,7 +268,7 @@ public class AutodetectProcessManager extends AbstractComponent {
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(
events -> {
communicator.writeUpdateProcessMessage(updateParams, events.results(), (aVoid, e) -> {
communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
@ -283,7 +283,7 @@ public class AutodetectProcessManager extends AbstractComponent {
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(Long.toString(new Date().getTime()));
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
} else {
eventsListener.onResponse(new QueryPage<>(Collections.emptyList(), 0, ScheduledEvent.RESULTS_FIELD));
eventsListener.onResponse(null);
}
}

View File

@ -5,7 +5,10 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
@ -71,6 +74,14 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
}
@Override
public void writeUpdateFiltersMessage(List<MlFilter> filters) throws IOException {
}
@Override
public void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeValue bucketSpan) throws IOException {
}
/**
* Accept the request do nothing with it but write the flush acknowledgement to {@link #readAutodetectResults()}
* @param params Should interim results be generated

View File

@ -8,8 +8,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.MachineLearningClientActionPlugin;
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
@ -159,6 +163,18 @@ class NativeAutodetectProcess implements AutodetectProcess {
writer.writeUpdateDetectorRulesMessage(detectorIndex, rules);
}
@Override
public void writeUpdateFiltersMessage(List<MlFilter> filters) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
writer.writeUpdateFiltersMessage(filters);
}
@Override
public void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeValue bucketSpan) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
writer.writeUpdateScheduledEventsMessage(events, bucketSpan);
}
@Override
public String flushJob(FlushJobParams params) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);

View File

@ -7,33 +7,105 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import java.util.List;
import java.util.Objects;
public final class UpdateParams {
private final String jobId;
private final ModelPlotConfig modelPlotConfig;
private final List<JobUpdate.DetectorUpdate> detectorUpdates;
private final MlFilter filter;
private final boolean updateScheduledEvents;
public UpdateParams(@Nullable ModelPlotConfig modelPlotConfig,
@Nullable List<JobUpdate.DetectorUpdate> detectorUpdates,
boolean updateScheduledEvents) {
private UpdateParams(String jobId, @Nullable ModelPlotConfig modelPlotConfig, @Nullable List<JobUpdate.DetectorUpdate> detectorUpdates,
@Nullable MlFilter filter, boolean updateScheduledEvents) {
this.jobId = Objects.requireNonNull(jobId);
this.modelPlotConfig = modelPlotConfig;
this.detectorUpdates = detectorUpdates;
this.filter = filter;
this.updateScheduledEvents = updateScheduledEvents;
}
public String getJobId() {
return jobId;
}
@Nullable
public ModelPlotConfig getModelPlotConfig() {
return modelPlotConfig;
}
@Nullable
public List<JobUpdate.DetectorUpdate> getDetectorUpdates() {
return detectorUpdates;
}
@Nullable
public MlFilter getFilter() {
return filter;
}
public boolean isUpdateScheduledEvents() {
return updateScheduledEvents;
}
public static UpdateParams fromJobUpdate(JobUpdate jobUpdate) {
return new Builder(jobUpdate.getJobId())
.modelPlotConfig(jobUpdate.getModelPlotConfig())
.detectorUpdates(jobUpdate.getDetectorUpdates())
.build();
}
public static UpdateParams filterUpdate(String jobId, MlFilter filter) {
return new Builder(jobId).filter(filter).build();
}
public static UpdateParams scheduledEventsUpdate(String jobId) {
return new Builder(jobId).updateScheduledEvents(true).build();
}
public static Builder builder(String jobId) {
return new Builder(jobId);
}
public static class Builder {
private String jobId;
private ModelPlotConfig modelPlotConfig;
private List<JobUpdate.DetectorUpdate> detectorUpdates;
private MlFilter filter;
private boolean updateScheduledEvents;
public Builder(String jobId) {
this.jobId = Objects.requireNonNull(jobId);
}
public Builder modelPlotConfig(ModelPlotConfig modelPlotConfig) {
this.modelPlotConfig = modelPlotConfig;
return this;
}
public Builder detectorUpdates(List<JobUpdate.DetectorUpdate> detectorUpdates) {
this.detectorUpdates = detectorUpdates;
return this;
}
public Builder filter(MlFilter filter) {
this.filter = filter;
return this;
}
public Builder updateScheduledEvents(boolean updateScheduledEvents) {
this.updateScheduledEvents = updateScheduledEvents;
return this;
}
public UpdateParams build() {
return new UpdateParams(jobId, modelPlotConfig, detectorUpdates, filter, updateScheduledEvents);
}
}
}

View File

@ -5,11 +5,14 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
@ -211,6 +214,24 @@ public class ControlMsgToProcessWriter {
writeMessage(stringBuilder.toString());
}
public void writeUpdateFiltersMessage(List<MlFilter> filters) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(UPDATE_MESSAGE_CODE).append("[filters]\n");
new MlFilterWriter(filters, stringBuilder).write();
writeMessage(stringBuilder.toString());
}
public void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeValue bucketSpan) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(UPDATE_MESSAGE_CODE).append("[scheduledEvents]\n");
if (events.isEmpty()) {
stringBuilder.append("clear = true\n");
} else {
new ScheduledEventsWriter(events, bucketSpan, stringBuilder).write();
}
writeMessage(stringBuilder.toString());
}
/**
* Transform the supplied control message to length encoded values and
* write to the OutputStream.

View File

@ -22,11 +22,9 @@ import org.elasticsearch.xpack.ml.utils.MlStrings;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.EQUALS;
@ -37,10 +35,6 @@ public class FieldConfigWriter {
private static final String INFLUENCER_PREFIX = "influencer.";
private static final String CATEGORIZATION_FIELD_OPTION = " categorizationfield=";
private static final String CATEGORIZATION_FILTER_PREFIX = "categorizationfilter.";
private static final String FILTER_PREFIX = "filter.";
private static final String SCHEDULED_EVENT_PREFIX = "scheduledevent.";
private static final String SCHEDULED_EVENT_DESCRIPTION_SUFFIX = ".description";
// Note: for the Engine API summarycountfield is currently passed as a
// command line option to autodetect rather than in the field config file
@ -68,8 +62,9 @@ public class FieldConfigWriter {
public void write() throws IOException {
StringBuilder contents = new StringBuilder();
writeDetectors(contents);
// Filters have to be written before the detectors
writeFilters(contents);
writeDetectors(contents);
writeScheduledEvents(contents);
if (MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA == false) {
@ -141,46 +136,12 @@ public class FieldConfigWriter {
}
private void writeFilters(StringBuilder buffer) throws IOException {
for (MlFilter filter : filters) {
StringBuilder filterAsJson = new StringBuilder();
filterAsJson.append('[');
boolean first = true;
for (String item : filter.getItems()) {
if (first) {
first = false;
} else {
filterAsJson.append(',');
}
filterAsJson.append('"');
filterAsJson.append(item);
filterAsJson.append('"');
}
filterAsJson.append(']');
buffer.append(FILTER_PREFIX).append(filter.getId()).append(EQUALS).append(filterAsJson)
.append(NEW_LINE);
}
new MlFilterWriter(filters, buffer).write();
}
private void writeScheduledEvents(StringBuilder contents) throws IOException {
if (scheduledEvents.isEmpty()) {
return;
}
int eventIndex = 0;
for (ScheduledEvent event: scheduledEvents) {
contents.append(SCHEDULED_EVENT_PREFIX).append(eventIndex)
.append(SCHEDULED_EVENT_DESCRIPTION_SUFFIX).append(EQUALS)
.append(event.getDescription())
.append(NEW_LINE);
contents.append(SCHEDULED_EVENT_PREFIX).append(eventIndex)
.append(DETECTOR_RULES_SUFFIX).append(EQUALS);
writeDetectionRulesJson(Collections.singletonList(event.toDetectionRule(config.getBucketSpan())), contents);
contents.append(NEW_LINE);
++eventIndex;
private void writeScheduledEvents(StringBuilder buffer) throws IOException {
if (scheduledEvents.isEmpty() == false) {
new ScheduledEventsWriter(scheduledEvents, config.getBucketSpan(), buffer).write();
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.EQUALS;
import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.NEW_LINE;
public class MlFilterWriter {
private static final String FILTER_PREFIX = "filter.";
private final Collection<MlFilter> filters;
private final StringBuilder buffer;
public MlFilterWriter(Collection<MlFilter> filters, StringBuilder buffer) {
this.filters = Objects.requireNonNull(filters);
this.buffer = Objects.requireNonNull(buffer);
}
public void write() throws IOException {
for (MlFilter filter : filters) {
StringBuilder filterAsJson = new StringBuilder();
filterAsJson.append('[');
boolean first = true;
for (String item : filter.getItems()) {
if (first) {
first = false;
} else {
filterAsJson.append(',');
}
filterAsJson.append('"');
filterAsJson.append(item);
filterAsJson.append('"');
}
filterAsJson.append(']');
buffer.append(FILTER_PREFIX).append(filter.getId()).append(EQUALS).append(filterAsJson).append(NEW_LINE);
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.EQUALS;
import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.NEW_LINE;
public class ScheduledEventsWriter {
private static final String SCHEDULED_EVENT_PREFIX = "scheduledevent.";
private static final String SCHEDULED_EVENT_DESCRIPTION_SUFFIX = ".description";
private static final String RULES_SUFFIX = ".rules";
private final Collection<ScheduledEvent> scheduledEvents;
private final TimeValue bucketSpan;
private final StringBuilder buffer;
public ScheduledEventsWriter(Collection<ScheduledEvent> scheduledEvents, TimeValue bucketSpan, StringBuilder buffer) {
this.scheduledEvents = Objects.requireNonNull(scheduledEvents);
this.bucketSpan = Objects.requireNonNull(bucketSpan);
this.buffer = Objects.requireNonNull(buffer);
}
public void write() throws IOException {
int eventIndex = 0;
for (ScheduledEvent event: scheduledEvents) {
StringBuilder eventContent = new StringBuilder();
eventContent.append(SCHEDULED_EVENT_PREFIX).append(eventIndex)
.append(SCHEDULED_EVENT_DESCRIPTION_SUFFIX).append(EQUALS)
.append(event.getDescription())
.append(NEW_LINE);
eventContent.append(SCHEDULED_EVENT_PREFIX).append(eventIndex).append(RULES_SUFFIX).append(EQUALS);
try (XContentBuilder contentBuilder = XContentFactory.jsonBuilder()) {
contentBuilder.startArray();
event.toDetectionRule(bucketSpan).toXContent(contentBuilder, null);
contentBuilder.endArray();
eventContent.append(contentBuilder.string());
}
eventContent.append(NEW_LINE);
buffer.append(eventContent.toString());
++eventIndex;
}
}
}

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.MlFilterTests;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import java.util.ArrayList;
@ -29,7 +31,11 @@ public class UpdateProcessActionRequestTests extends AbstractStreamableTestCase<
updates.add(new JobUpdate.DetectorUpdate(randomInt(), randomAlphaOfLength(10), null));
}
}
return new UpdateProcessAction.Request(randomAlphaOfLength(10), config, updates, randomBoolean());
MlFilter filter = null;
if (randomBoolean()) {
filter = MlFilterTests.createTestFilter();
}
return new UpdateProcessAction.Request(randomAlphaOfLength(10), config, updates, filter, randomBoolean());
}
@Override

View File

@ -28,25 +28,37 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class JobManagerTests extends ESTestCase {
@ -57,6 +69,7 @@ public class JobManagerTests extends ESTestCase {
private ClusterService clusterService;
private JobProvider jobProvider;
private Auditor auditor;
private UpdateJobProcessNotifier updateJobProcessNotifier;
@Before
public void setup() throws Exception {
@ -67,6 +80,7 @@ public class JobManagerTests extends ESTestCase {
clusterService = mock(ClusterService.class);
jobProvider = mock(JobProvider.class);
auditor = mock(Auditor.class);
updateJobProcessNotifier = mock(UpdateJobProcessNotifier.class);
}
public void testGetJobOrThrowIfUnknown_GivenUnknownJob() {
@ -160,6 +174,98 @@ public class JobManagerTests extends ESTestCase {
});
}
public void testUpdateProcessOnFilterChanged() {
Detector.Builder detectorReferencingFilter = new Detector.Builder("count", null);
detectorReferencingFilter.setByFieldName("foo");
RuleCondition.createCategorical("foo", "foo_filter");
DetectionRule filterRule = new DetectionRule.Builder(Collections.singletonList(
RuleCondition.createCategorical("foo", "foo_filter"))).build();
detectorReferencingFilter.setRules(Collections.singletonList(filterRule));
AnalysisConfig.Builder filterAnalysisConfig = new AnalysisConfig.Builder(Collections.singletonList(
detectorReferencingFilter.build()));
Job.Builder jobReferencingFilter1 = buildJobBuilder("job-referencing-filter-1");
jobReferencingFilter1.setAnalysisConfig(filterAnalysisConfig);
Job.Builder jobReferencingFilter2 = buildJobBuilder("job-referencing-filter-2");
jobReferencingFilter2.setAnalysisConfig(filterAnalysisConfig);
Job.Builder jobReferencingFilter3 = buildJobBuilder("job-referencing-filter-3");
jobReferencingFilter3.setAnalysisConfig(filterAnalysisConfig);
Job.Builder jobWithoutFilter = buildJobBuilder("job-without-filter");
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(jobReferencingFilter1.build(), false);
mlMetadata.putJob(jobReferencingFilter2.build(), false);
mlMetadata.putJob(jobReferencingFilter3.build(), false);
mlMetadata.putJob(jobWithoutFilter.build(), false);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(jobReferencingFilter1.getId(), "node_id", JobState.OPENED, tasksBuilder);
addJobTask(jobReferencingFilter2.getId(), "node_id", JobState.OPENED, tasksBuilder);
addJobTask(jobWithoutFilter.getId(), "node_id", JobState.OPENED, tasksBuilder);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())
.putCustom(MLMetadataField.TYPE, mlMetadata.build()))
.build();
when(clusterService.state()).thenReturn(clusterState);
JobManager jobManager = createJobManager();
MlFilter filter = new MlFilter("foo_filter", Arrays.asList("a", "b"));
jobManager.updateProcessOnFilterChanged(filter);
ArgumentCaptor<UpdateParams> updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class);
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture());
List<UpdateParams> capturedUpdateParams = updateParamsCaptor.getAllValues();
assertThat(capturedUpdateParams.size(), equalTo(2));
assertThat(capturedUpdateParams.get(0).getJobId(), equalTo(jobReferencingFilter1.getId()));
assertThat(capturedUpdateParams.get(0).getFilter(), equalTo(filter));
assertThat(capturedUpdateParams.get(1).getJobId(), equalTo(jobReferencingFilter2.getId()));
assertThat(capturedUpdateParams.get(1).getFilter(), equalTo(filter));
}
public void testUpdateProcessOnCalendarChanged() {
Job.Builder job1 = buildJobBuilder("job-1");
Job.Builder job2 = buildJobBuilder("job-2");
Job.Builder job3 = buildJobBuilder("job-3");
Job.Builder job4 = buildJobBuilder("job-4");
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(job1.build(), false);
mlMetadata.putJob(job2.build(), false);
mlMetadata.putJob(job3.build(), false);
mlMetadata.putJob(job4.build(), false);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job1.getId(), "node_id", JobState.OPENED, tasksBuilder);
addJobTask(job2.getId(), "node_id", JobState.OPENED, tasksBuilder);
addJobTask(job3.getId(), "node_id", JobState.OPENED, tasksBuilder);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())
.putCustom(MLMetadataField.TYPE, mlMetadata.build()))
.build();
when(clusterService.state()).thenReturn(clusterState);
JobManager jobManager = createJobManager();
jobManager.updateProcessOnCalendarChanged(Arrays.asList("job-1", "job-3", "job-4"));
ArgumentCaptor<UpdateParams> updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class);
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture());
List<UpdateParams> capturedUpdateParams = updateParamsCaptor.getAllValues();
assertThat(capturedUpdateParams.size(), equalTo(2));
assertThat(capturedUpdateParams.get(0).getJobId(), equalTo(job1.getId()));
assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true));
assertThat(capturedUpdateParams.get(1).getJobId(), equalTo(job3.getId()));
assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true));
}
private Job.Builder createJob() {
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("client");
@ -176,8 +282,7 @@ public class JobManagerTests extends ESTestCase {
ClusterSettings clusterSettings = new ClusterSettings(environment.settings(),
Collections.singleton(MachineLearningClientActionPlugin.MAX_MODEL_MEMORY_LIMIT));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
UpdateJobProcessNotifier notifier = mock(UpdateJobProcessNotifier.class);
return new JobManager(environment, environment.settings(), jobProvider, clusterService, auditor, client, notifier);
return new JobManager(environment, environment.settings(), jobProvider, clusterService, auditor, client, updateJobProcessNotifier);
}
private ClusterState createClusterState() {

View File

@ -32,8 +32,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
@ -96,38 +94,20 @@ public class AutodetectCommunicatorTests extends ESTestCase {
List<RuleCondition> conditions = Collections.singletonList(
RuleCondition.createCategorical("foo", "bar"));
DetectionRule updatedRule = new DetectionRule.Builder(conditions).build();
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(
new JobUpdate.DetectorUpdate(0, "updated description",
Collections.singletonList(new DetectionRule.Builder(conditions).build())));
new JobUpdate.DetectorUpdate(0, "updated description", Collections.singletonList(updatedRule)));
UpdateParams updateParams = new UpdateParams(null, detectorUpdates, true);
List<ScheduledEvent> events = Collections.singletonList(ScheduledEventTests.createScheduledEvent(randomAlphaOfLength(10)));
UpdateParams updateParams = UpdateParams.builder("foo").detectorUpdates(detectorUpdates).build();
List<ScheduledEvent> events = Collections.singletonList(
ScheduledEventTests.createScheduledEvent(randomAlphaOfLength(10)));
communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {}));
// There are 2 detectors both will be updated with the rule for the scheduled event.
// The first has an additional update rule
ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
InOrder inOrder = Mockito.inOrder(process);
inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(0), captor.capture());
assertEquals(2, captor.getValue().size());
inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(1), captor.capture());
assertEquals(1, captor.getValue().size());
verify(process).writeUpdateDetectorRulesMessage(eq(0), eq(Collections.singletonList(updatedRule)));
verify(process).writeUpdateScheduledEventsMessage(events, AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN);
verify(process).isProcessAlive();
verifyNoMoreInteractions(process);
// This time there is a single detector update and no scheduled events
detectorUpdates = Collections.singletonList(
new JobUpdate.DetectorUpdate(1, "updated description",
Collections.singletonList(new DetectionRule.Builder(conditions).build())));
updateParams = new UpdateParams(null, detectorUpdates, true);
communicator.writeUpdateProcessMessage(updateParams, Collections.emptyList(), ((aVoid, e) -> {}));
inOrder = Mockito.inOrder(process);
inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(1), captor.capture());
assertEquals(1, captor.getValue().size());
verify(process, times(2)).isProcessAlive();
}
public void testFlushJob() throws IOException {

View File

@ -475,7 +475,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verify(manager).setJobState(any(), eq(JobState.FAILED));
}
public void testwriteUpdateProcessMessage() {
public void testWriteUpdateProcessMessage() {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
ModelPlotConfig modelConfig = mock(ModelPlotConfig.class);
@ -483,9 +483,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules));
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
UpdateParams updateParams = new UpdateParams(modelConfig, detectorUpdates, false);
UpdateParams updateParams = UpdateParams.builder("foo").modelPlotConfig(modelConfig).detectorUpdates(detectorUpdates).build();
manager.writeUpdateProcessMessage(jobTask, updateParams, e -> {});
verify(communicator).writeUpdateProcessMessage(same(updateParams), eq(Collections.emptyList()), any());
verify(communicator).writeUpdateProcessMessage(same(updateParams), eq(null), any());
}
public void testJobHasActiveAutodetectProcess() {

View File

@ -5,10 +5,13 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.config.Condition;
import org.elasticsearch.xpack.ml.job.config.Connective;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.config.Operator;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
@ -21,6 +24,7 @@ import org.mockito.InOrder;
import org.mockito.Mockito;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -205,6 +209,66 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
verifyNoMoreInteractions(lengthEncodedWriter);
}
public void testWriteUpdateFiltersMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
MlFilter filter1 = new MlFilter("filter_1", Arrays.asList("a"));
MlFilter filter2 = new MlFilter("filter_2", Arrays.asList("b", "c"));
writer.writeUpdateFiltersMessage(Arrays.asList(filter1, filter2));
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(2);
inOrder.verify(lengthEncodedWriter, times(1)).writeField("");
inOrder.verify(lengthEncodedWriter).writeField("u[filters]\nfilter.filter_1 = [\"a\"]\nfilter.filter_2 = [\"b\",\"c\"]\n");
verifyNoMoreInteractions(lengthEncodedWriter);
}
public void testWriteUpdateScheduledEventsMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
ScheduledEvent.Builder event1 = new ScheduledEvent.Builder();
event1.calendarId("moon");
event1.description("new year");
event1.startTime(ZonedDateTime.parse("2018-01-01T00:00:00Z"));
event1.endTime(ZonedDateTime.parse("2018-01-02T00:00:00Z"));
ScheduledEvent.Builder event2 = new ScheduledEvent.Builder();
event2.calendarId("moon");
event2.description("Jan maintenance day");
event2.startTime(ZonedDateTime.parse("2018-01-06T00:00:00Z"));
event2.endTime(ZonedDateTime.parse("2018-01-07T00:00:00Z"));
writer.writeUpdateScheduledEventsMessage(Arrays.asList(event1.build(), event2.build()), TimeValue.timeValueHours(1));
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(2);
inOrder.verify(lengthEncodedWriter, times(1)).writeField("");
inOrder.verify(lengthEncodedWriter).writeField("u[scheduledEvents]\n"
+ "scheduledevent.0.description = new year\n"
+ "scheduledevent.0.rules = [{\"actions\":[\"filter_results\",\"skip_sampling\"],\"conditions_connective\":\"and\","
+ "\"conditions\":[{\"type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1514764800\"}},"
+ "{\"type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1514851200\"}}]}]\n"
+ "scheduledevent.1.description = Jan maintenance day\n"
+ "scheduledevent.1.rules = [{\"actions\":[\"filter_results\",\"skip_sampling\"],\"conditions_connective\":\"and\","
+ "\"conditions\":[{\"type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1515196800\"}},"
+ "{\"type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1515283200\"}}]}]\n"
);
verifyNoMoreInteractions(lengthEncodedWriter);
}
public void testWriteUpdateScheduledEventsMessage_GivenEmpty() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
writer.writeUpdateScheduledEventsMessage(Collections.emptyList(), TimeValue.timeValueHours(1));
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(2);
inOrder.verify(lengthEncodedWriter, times(1)).writeField("");
inOrder.verify(lengthEncodedWriter).writeField("u[scheduledEvents]\nclear = true\n");
verifyNoMoreInteractions(lengthEncodedWriter);
}
private static List<RuleCondition> createRule(String value) {
Condition condition = new Condition(Operator.GT, value);
return Collections.singletonList(RuleCondition.createNumerical(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition));

View File

@ -228,9 +228,9 @@ public class FieldConfigWriterTests extends ESTestCase {
createFieldConfigWriter().write();
verify(writer).write("detector.0.clause = count\n" +
"filter.filter_1 = [\"a\",\"b\"]\n" +
"filter.filter_2 = [\"c\",\"d\"]\n");
verify(writer).write("filter.filter_1 = [\"a\",\"b\"]\n" +
"filter.filter_2 = [\"c\",\"d\"]\n" +
"detector.0.clause = count\n");
verifyNoMoreInteractions(writer);
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class MlFilterWriterTests extends ESTestCase {
public void testWrite_GivenEmpty() throws IOException {
StringBuilder buffer = new StringBuilder();
new MlFilterWriter(Collections.emptyList(), buffer).write();
assertThat(buffer.toString().isEmpty(), is(true));
}
public void testWrite() throws IOException {
List<MlFilter> filters = new ArrayList<>();
filters.add(new MlFilter("filter_1", Arrays.asList("a", "b")));
filters.add(new MlFilter("filter_2", Arrays.asList("c", "d")));
StringBuilder buffer = new StringBuilder();
new MlFilterWriter(filters, buffer).write();
assertThat(buffer.toString(), equalTo("filter.filter_1 = [\"a\",\"b\"]\nfilter.filter_2 = [\"c\",\"d\"]\n"));
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.calendars.ScheduledEvent;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class ScheduledEventsWriterTests extends ESTestCase {
public void testWrite_GivenEmpty() throws IOException {
StringBuilder buffer = new StringBuilder();
new ScheduledEventsWriter(Collections.emptyList(), TimeValue.timeValueHours(1), buffer).write();
assertThat(buffer.toString().isEmpty(), is(true));
}
public void testWrite() throws IOException {
List<ScheduledEvent> events = new ArrayList<>();
events.add(new ScheduledEvent.Builder().description("Black Friday")
.startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC))
.endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC))
.calendarId("calendar_id").build());
events.add(new ScheduledEvent.Builder().description("Blue Monday")
.startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC))
.endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC))
.calendarId("calendar_id").build());
StringBuilder buffer = new StringBuilder();
new ScheduledEventsWriter(events, TimeValue.timeValueHours(1), buffer).write();
String expectedString = "scheduledevent.0.description = Black Friday\n" +
"scheduledevent.0.rules = [{\"actions\":[\"filter_results\",\"skip_sampling\"],\"conditions_connective\":\"and\"," +
"\"conditions\":[{\"type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1511395200\"}}," +
"{\"type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1515369600\"}}]}]\n" +
"scheduledevent.1.description = Blue Monday\n" +
"scheduledevent.1.rules = [{\"actions\":[\"filter_results\",\"skip_sampling\"],\"conditions_connective\":\"and\"," +
"\"conditions\":[{\"type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1519603200\"}}," +
"{\"type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1519862400\"}}]}]" +
"\n";
assertThat(buffer.toString(), equalTo(expectedString));
}
}