* Add Special Event

* Add special events to update process

* Add time condition and skip rule actions.

* Update special events

* Address review comments

Original commit: elastic/x-pack-elasticsearch@80500ded76
This commit is contained in:
David Kyle 2017-12-07 11:44:12 +00:00 committed by GitHub
parent 628dfaa843
commit e9d9199205
36 changed files with 1120 additions and 165 deletions

View File

@ -358,7 +358,7 @@ public class MachineLearning implements ActionPlugin {
throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e);
}
} else {
autodetectProcessFactory = (job, modelSnapshot, quantiles, filters, executorService, onProcessCrash) ->
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
new BlackHoleAutodetectProcess(job.getId());
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization,

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
import java.io.IOException;
@ -26,10 +27,18 @@ public final class MlMetaIndex {
public static XContentBuilder docMapping() throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.startObject(TYPE);
ElasticsearchMappings.addDefaultMapping(builder);
builder.endObject();
builder.endObject();
builder.startObject(TYPE);
ElasticsearchMappings.addDefaultMapping(builder);
builder.startObject(ElasticsearchMappings.PROPERTIES)
.startObject(SpecialEvent.START_TIME.getPreferredName())
.field(ElasticsearchMappings.TYPE, ElasticsearchMappings.DATE)
.endObject()
.startObject(SpecialEvent.END_TIME.getPreferredName())
.field(ElasticsearchMappings.TYPE, ElasticsearchMappings.DATE)
.endObject()
.endObject()
.endObject()
.endObject();
return builder;
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
@ -23,9 +24,11 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import java.io.IOException;
import java.util.List;
@ -119,14 +122,17 @@ public class UpdateProcessAction extends
private ModelPlotConfig modelPlotConfig;
private List<JobUpdate.DetectorUpdate> detectorUpdates;
private boolean updateSpecialEvents = false;
Request() {
}
public Request(String jobId, ModelPlotConfig modelPlotConfig, List<JobUpdate.DetectorUpdate> detectorUpdates) {
public Request(String jobId, ModelPlotConfig modelPlotConfig, List<JobUpdate.DetectorUpdate> detectorUpdates,
boolean updateSpecialEvents) {
super(jobId);
this.modelPlotConfig = modelPlotConfig;
this.detectorUpdates = detectorUpdates;
this.updateSpecialEvents = updateSpecialEvents;
}
public ModelPlotConfig getModelPlotConfig() {
@ -137,6 +143,10 @@ public class UpdateProcessAction extends
return detectorUpdates;
}
public boolean isUpdateSpecialEvents() {
return updateSpecialEvents;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -144,6 +154,9 @@ public class UpdateProcessAction extends
if (in.readBoolean()) {
detectorUpdates = in.readList(JobUpdate.DetectorUpdate::new);
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
updateSpecialEvents = in.readBoolean();
}
}
@Override
@ -155,11 +168,14 @@ public class UpdateProcessAction extends
if (hasDetectorUpdates) {
out.writeList(detectorUpdates);
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeBoolean(updateSpecialEvents);
}
}
@Override
public int hashCode() {
return Objects.hash(getJobId(), modelPlotConfig, detectorUpdates);
return Objects.hash(getJobId(), modelPlotConfig, detectorUpdates, updateSpecialEvents);
}
@Override
@ -174,7 +190,8 @@ public class UpdateProcessAction extends
return Objects.equals(getJobId(), other.getJobId()) &&
Objects.equals(modelPlotConfig, other.modelPlotConfig) &&
Objects.equals(detectorUpdates, other.detectorUpdates);
Objects.equals(detectorUpdates, other.detectorUpdates) &&
Objects.equals(updateSpecialEvents, other.updateSpecialEvents);
}
}
@ -199,8 +216,10 @@ public class UpdateProcessAction extends
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
try {
processManager.writeUpdateProcessMessage(task, request.getDetectorUpdates(),
request.getModelPlotConfig(), e -> {
processManager.writeUpdateProcessMessage(task,
new UpdateParams(request.getModelPlotConfig(),
request.getDetectorUpdates(), request.isUpdateSpecialEvents()),
e -> {
if (e == null) {
listener.onResponse(new Response());
} else {

View File

@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.calendars;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.job.config.Connective;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Operator;
import org.elasticsearch.xpack.ml.job.config.RuleAction;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class SpecialEvent implements ToXContentObject, Writeable {
public static final ParseField ID = new ParseField("id");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField START_TIME = new ParseField("start_time");
public static final ParseField END_TIME = new ParseField("end_time");
public static final ParseField TYPE = new ParseField("type");
public static final ParseField JOB_IDS = new ParseField("job_ids");
public static final String SPECIAL_EVENT_TYPE = "special_event";
public static final String DOCUMENT_ID_PREFIX = "event_";
public static final ConstructingObjectParser<SpecialEvent, Void> PARSER =
new ConstructingObjectParser<>("special_event", a -> new SpecialEvent((String) a[0], (String) a[1], (ZonedDateTime) a[2],
(ZonedDateTime) a[3], (List<String>) a[4]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), DESCRIPTION);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(p.longValue()), ZoneOffset.UTC);
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(TimeUtils.dateStringToEpoch(p.text())), ZoneOffset.UTC);
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + START_TIME.getPreferredName() + "]");
}, START_TIME, ObjectParser.ValueType.VALUE);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(p.longValue()), ZoneOffset.UTC);
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(TimeUtils.dateStringToEpoch(p.text())), ZoneOffset.UTC);
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + END_TIME.getPreferredName() + "]");
}, END_TIME, ObjectParser.ValueType.VALUE);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), JOB_IDS);
PARSER.declareString((builder, s) -> {}, TYPE);
}
public static String documentId(String eventId) {
return DOCUMENT_ID_PREFIX + eventId;
}
private final String id;
private final String description;
private final ZonedDateTime startTime;
private final ZonedDateTime endTime;
private final Set<String> jobIds;
public SpecialEvent(String id, String description, ZonedDateTime startTime, ZonedDateTime endTime, List<String> jobIds) {
this.id = Objects.requireNonNull(id);
this.description = Objects.requireNonNull(description);
this.startTime = Objects.requireNonNull(startTime);
this.endTime = Objects.requireNonNull(endTime);
this.jobIds = new HashSet<>(jobIds);
}
public SpecialEvent(StreamInput in) throws IOException {
id = in.readString();
description = in.readString();
startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readVLong()), ZoneOffset.UTC);
endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readVLong()), ZoneOffset.UTC);
jobIds = new HashSet<>(Arrays.asList(in.readStringArray()));
}
public String getId() {
return id;
}
public String getDescription() {
return description;
}
public ZonedDateTime getStartTime() {
return startTime;
}
public ZonedDateTime getEndTime() {
return endTime;
}
public Set<String> getJobIds() {
return jobIds;
}
public String documentId() {
return documentId(id);
}
public DetectionRule toDetectionRule() {
List<RuleCondition> conditions = new ArrayList<>();
conditions.add(RuleCondition.createTime(Operator.GTE, this.getStartTime().toEpochSecond()));
conditions.add(RuleCondition.createTime(Operator.LT, this.getEndTime().toEpochSecond()));
DetectionRule.Builder builder = new DetectionRule.Builder(conditions);
builder.setRuleAction(RuleAction.SKIP_SAMPLING_AND_FILTER_RESULTS);
builder.setConditionsConnective(Connective.AND);
return builder.build();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeString(description);
out.writeVLong(startTime.toInstant().toEpochMilli());
out.writeVLong(endTime.toInstant().toEpochMilli());
out.writeStringArray(jobIds.toArray(new String [0]));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
builder.field(DESCRIPTION.getPreferredName(), description);
builder.dateField(START_TIME.getPreferredName(), START_TIME.getPreferredName() + "_string", startTime.toInstant().toEpochMilli());
builder.dateField(END_TIME.getPreferredName(), END_TIME.getPreferredName() + "_string", endTime.toInstant().toEpochMilli());
builder.field(JOB_IDS.getPreferredName(), jobIds);
builder.field(TYPE.getPreferredName(), SPECIAL_EVENT_TYPE);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof SpecialEvent)) {
return false;
}
SpecialEvent other = (SpecialEvent) obj;
return id.equals(other.id) && description.equals(other.description) && startTime.equals(other.startTime)
&& endTime.equals(other.endTime) && jobIds.equals(other.jobIds);
}
@Override
public int hashCode() {
return Objects.hash(id, description, startTime, endTime, jobIds);
}
}

View File

@ -99,7 +99,9 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
}
void executeRemoteJob(JobUpdate update) {
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates());
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(),
update.isUpdateSpecialEvents());
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request,
new ActionListener<Response>() {
@Override

View File

@ -29,6 +29,7 @@ import java.util.Objects;
public class JobUpdate implements Writeable, ToXContentObject {
public static final ParseField DETECTORS = new ParseField("detectors");
public static final ParseField UPDATE_SPECIAL_EVENTS = new ParseField("update_special_events");
public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"job_update", args -> new Builder((String) args[0]));
@ -49,6 +50,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
PARSER.declareBoolean(Builder::setUpdateSpecialEvents, UPDATE_SPECIAL_EVENTS);
}
/**
@ -73,6 +75,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private final Long establishedModelMemory;
private final boolean updateSpecialEvents;
private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
@Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@ -80,7 +83,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
@Nullable Long establishedModelMemory) {
@Nullable Long establishedModelMemory, boolean updateSpecialEvents) {
this.jobId = jobId;
this.groups = groups;
this.description = description;
@ -95,6 +98,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
this.customSettings = customSettings;
this.modelSnapshotId = modelSnapshotId;
this.establishedModelMemory = establishedModelMemory;
this.updateSpecialEvents = updateSpecialEvents;
}
public JobUpdate(StreamInput in) throws IOException {
@ -129,6 +133,12 @@ public class JobUpdate implements Writeable, ToXContentObject {
} else {
establishedModelMemory = null;
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
updateSpecialEvents = in.readBoolean();
} else {
updateSpecialEvents = false;
}
}
@Override
@ -158,6 +168,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeBoolean(updateSpecialEvents);
}
}
public String getJobId() {
@ -220,6 +234,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
return modelPlotConfig != null || detectorUpdates != null;
}
public boolean isUpdateSpecialEvents() {
return updateSpecialEvents;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -263,6 +281,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (establishedModelMemory != null) {
builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
}
builder.field(UPDATE_SPECIAL_EVENTS.getPreferredName(), updateSpecialEvents);
builder.endObject();
return builder;
}
@ -399,14 +418,15 @@ public class JobUpdate implements Writeable, ToXContentObject {
&& Objects.equals(this.categorizationFilters, that.categorizationFilters)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory);
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
&& Objects.equals(this.updateSpecialEvents, that.updateSpecialEvents);
}
@Override
public int hashCode() {
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, establishedModelMemory);
modelSnapshotId, establishedModelMemory, updateSpecialEvents);
}
public static class DetectorUpdate implements Writeable, ToXContentObject {
@ -518,6 +538,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
private Map<String, Object> customSettings;
private String modelSnapshotId;
private Long establishedModelMemory;
private boolean updateSpecialEvents = false;
public Builder(String jobId) {
this.jobId = jobId;
@ -593,10 +614,15 @@ public class JobUpdate implements Writeable, ToXContentObject {
return this;
}
public Builder setUpdateSpecialEvents(boolean updateSpecialEvents) {
this.updateSpecialEvents = updateSpecialEvents;
return this;
}
public JobUpdate build() {
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, establishedModelMemory);
modelSnapshotId, establishedModelMemory, updateSpecialEvents);
}
}
}

View File

@ -13,7 +13,9 @@ import java.io.IOException;
import java.util.Locale;
public enum RuleAction implements Writeable {
FILTER_RESULTS;
FILTER_RESULTS,
SKIP_SAMPLING,
SKIP_SAMPLING_AND_FILTER_RESULTS;
/**
* Case-insensitive from string method.

View File

@ -84,7 +84,7 @@ public class RuleCondition implements ToXContentObject, Writeable {
out.writeOptionalString(valueFilter);
}
public RuleCondition(RuleConditionType conditionType, String fieldName, String fieldValue, Condition condition, String valueFilter) {
RuleCondition(RuleConditionType conditionType, String fieldName, String fieldValue, Condition condition, String valueFilter) {
this.conditionType = conditionType;
this.fieldName = fieldName;
this.fieldValue = fieldValue;
@ -182,6 +182,18 @@ public class RuleCondition implements ToXContentObject, Writeable {
return new RuleCondition(RuleConditionType.CATEGORICAL, fieldName, null, null, valueFilter);
}
public static RuleCondition createNumerical(RuleConditionType conditionType, String fieldName, String fieldValue,
Condition condition ) {
if (conditionType.isNumerical() == false) {
throw new IllegalStateException("Rule condition type [" + conditionType + "] not valid for a numerical condition");
}
return new RuleCondition(conditionType, fieldName, fieldValue, condition, null);
}
public static RuleCondition createTime(Operator operator, long epochSeconds) {
return new RuleCondition(RuleConditionType.TIME, null, null, new Condition(operator, Long.toString(epochSeconds)), null);
}
private static void verifyFieldsBoundToType(RuleCondition ruleCondition) throws ElasticsearchParseException {
switch (ruleCondition.getConditionType()) {
case CATEGORICAL:
@ -192,6 +204,9 @@ public class RuleCondition implements ToXContentObject, Writeable {
case NUMERICAL_DIFF_ABS:
verifyNumerical(ruleCondition);
break;
case TIME:
verifyTimeRule(ruleCondition);
break;
default:
throw new IllegalStateException();
}
@ -258,4 +273,8 @@ public class RuleCondition implements ToXContentObject, Writeable {
throw ExceptionsHelper.badRequestException(msg);
}
}
private static void verifyTimeRule(RuleCondition ruleCondition) {
checkNumericalConditionOparatorsAreValid(ruleCondition);
}
}

View File

@ -14,10 +14,21 @@ import java.io.IOException;
import java.util.Locale;
public enum RuleConditionType implements Writeable {
CATEGORICAL,
NUMERICAL_ACTUAL,
NUMERICAL_TYPICAL,
NUMERICAL_DIFF_ABS;
CATEGORICAL(false),
NUMERICAL_ACTUAL(true),
NUMERICAL_TYPICAL(true),
NUMERICAL_DIFF_ABS(true),
TIME(false);
private final boolean isNumerical;
RuleConditionType(boolean isNumerical) {
this.isNumerical = isNumerical;
}
public boolean isNumerical() {
return isNumerical;
}
/**
* Case-insensitive from string method.

View File

@ -58,15 +58,15 @@ public class ElasticsearchMappings {
/**
* String constants used in mappings
*/
static final String ENABLED = "enabled";
static final String ANALYZER = "analyzer";
static final String WHITESPACE = "whitespace";
static final String NESTED = "nested";
static final String COPY_TO = "copy_to";
static final String PROPERTIES = "properties";
static final String TYPE = "type";
static final String DYNAMIC = "dynamic";
static final String FIELDS = "fields";
public static final String ENABLED = "enabled";
public static final String ANALYZER = "analyzer";
public static final String WHITESPACE = "whitespace";
public static final String NESTED = "nested";
public static final String COPY_TO = "copy_to";
public static final String PROPERTIES = "properties";
public static final String TYPE = "type";
public static final String DYNAMIC = "dynamic";
public static final String FIELDS = "fields";
/**
* Name of the custom 'all' field for results
@ -81,13 +81,13 @@ public class ElasticsearchMappings {
/**
* Elasticsearch data types
*/
static final String BOOLEAN = "boolean";
static final String DATE = "date";
static final String DOUBLE = "double";
static final String INTEGER = "integer";
static final String KEYWORD = "keyword";
static final String LONG = "long";
static final String TEXT = "text";
public static final String BOOLEAN = "boolean";
public static final String DATE = "date";
public static final String DOUBLE = "double";
public static final String INTEGER = "integer";
public static final String KEYWORD = "keyword";
public static final String LONG = "long";
public static final String TEXT = "text";
static final String RAW = "raw";

View File

@ -62,6 +62,7 @@ import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
import org.elasticsearch.xpack.ml.action.GetInfluencersAction;
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
@ -360,46 +361,47 @@ public class JobProvider {
.add(createDocIdSearch(stateIndex, Quantiles.documentId(jobId)));
for (String filterId : job.getAnalysisConfig().extractReferencedFilters()) {
msearch.add(createDocIdSearch(MlMetaIndex.INDEX_NAME, filterId));
msearch.add(createDocIdSearch(MlMetaIndex.INDEX_NAME, MlFilter.documentId(filterId)));
}
msearch.add(createSpecialEventSearch(jobId));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, msearch.request(),
ActionListener.<MultiSearchResponse>wrap(
response -> {
for (int i = 0; i < response.getResponses().length; i++) {
MultiSearchResponse.Item itemResponse = response.getResponses()[i];
if (itemResponse.isFailure()) {
errorHandler.accept(itemResponse.getFailure());
ActionListener.<MultiSearchResponse>wrap(
response -> {
for (int i = 0; i < response.getResponses().length; i++) {
MultiSearchResponse.Item itemResponse = response.getResponses()[i];
if (itemResponse.isFailure()) {
errorHandler.accept(itemResponse.getFailure());
} else {
SearchResponse searchResponse = itemResponse.getResponse();
ShardSearchFailure[] shardFailures = searchResponse.getShardFailures();
int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards();
if (shardFailures != null && shardFailures.length > 0) {
LOGGER.error("[{}] Search request returned shard failures: {}", jobId,
Arrays.toString(shardFailures));
errorHandler.accept(new ElasticsearchException(
ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures)));
} else if (unavailableShards > 0) {
errorHandler.accept(new ElasticsearchException("[" + jobId
+ "] Search request encountered [" + unavailableShards + "] unavailable shards"));
} else {
SearchHits hits = searchResponse.getHits();
long hitsCount = hits.getHits().length;
if (hitsCount == 0) {
SearchRequest searchRequest = msearch.request().requests().get(i);
LOGGER.debug("Found 0 hits for [{}]", new Object[]{searchRequest.indices()});
} else {
SearchResponse searchResponse = itemResponse.getResponse();
ShardSearchFailure[] shardFailures = searchResponse.getShardFailures();
int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards();
if (shardFailures != null && shardFailures.length > 0) {
LOGGER.error("[{}] Search request returned shard failures: {}", jobId,
Arrays.toString(shardFailures));
errorHandler.accept(new ElasticsearchException(
ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures)));
} else if (unavailableShards > 0) {
errorHandler.accept(new ElasticsearchException("[" + jobId
+ "] Search request encountered [" + unavailableShards + "] unavailable shards"));
} else {
SearchHits hits = searchResponse.getHits();
long hitsCount = hits.getHits().length;
if (hitsCount == 0) {
SearchRequest searchRequest = msearch.request().requests().get(i);
LOGGER.debug("Found 0 hits for [{}/{}]", searchRequest.indices(), searchRequest.types());
} else if (hitsCount == 1) {
parseAutodetectParamSearchHit(jobId, paramsBuilder, hits.getAt(0), errorHandler);
} else {
errorHandler.accept(new IllegalStateException("Expected hits count to be 0 or 1, but got ["
+ hitsCount + "]"));
}
for (SearchHit hit : hits) {
parseAutodetectParamSearchHit(jobId, paramsBuilder, hit, errorHandler);
}
}
}
consumer.accept(paramsBuilder.build());
},
errorHandler
}
}
consumer.accept(paramsBuilder.build());
},
errorHandler
), client::multiSearch);
}
@ -410,6 +412,17 @@ public class JobProvider {
.setRouting(id);
}
private SearchRequestBuilder createSpecialEventSearch(String jobId) {
QueryBuilder qb = new BoolQueryBuilder()
.filter(new TermsQueryBuilder(SpecialEvent.TYPE.getPreferredName(), SpecialEvent.SPECIAL_EVENT_TYPE))
.filter(new TermsQueryBuilder(SpecialEvent.JOB_IDS.getPreferredName(), jobId));
return client.prepareSearch(MlMetaIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(qb);
}
private void parseAutodetectParamSearchHit(String jobId, AutodetectParams.Builder paramsBuilder, SearchHit hit,
Consumer<Exception> errorHandler) {
String hitId = hit.getId();
@ -425,6 +438,8 @@ public class JobProvider {
paramsBuilder.setQuantiles(parseSearchHit(hit, Quantiles.PARSER, errorHandler));
} else if (hitId.startsWith(MlFilter.DOCUMENT_ID_PREFIX)) {
paramsBuilder.addFilter(parseSearchHit(hit, MlFilter.PARSER, errorHandler).build());
} else if (hitId.startsWith(SpecialEvent.DOCUMENT_ID_PREFIX)) {
paramsBuilder.addSpecialEvent(parseSearchHit(hit, SpecialEvent.PARSER, errorHandler));
} else {
errorHandler.accept(new IllegalStateException("Unexpected type [" + hit.getType() + "]"));
}
@ -966,6 +981,23 @@ public class JobProvider {
});
}
public void specialEvents(String jobId, Consumer<List<SpecialEvent>> handler, Consumer<Exception> errorHandler) {
SearchRequestBuilder request = createSpecialEventSearch(jobId);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request.request(),
ActionListener.<SearchResponse>wrap(
response -> {
List<SpecialEvent> specialEvents = new ArrayList<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
specialEvents.add(parseSearchHit(hit, SpecialEvent.PARSER, errorHandler));
}
handler.accept(specialEvents);
},
errorHandler)
, client::search);
}
private void handleLatestModelSizeStats(String jobId, ModelSizeStats latestModelSizeStats, Consumer<Long> handler,
Consumer<Exception> errorHandler) {
if (latestModelSizeStats != null) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
@ -25,6 +26,7 @@ import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@ -44,6 +46,7 @@ public class AutodetectBuilder {
private List<Path> filesToDelete;
private Logger logger;
private Set<MlFilter> referencedFilters;
private List<SpecialEvent> specialEvents;
private Quantiles quantiles;
private Environment env;
private Settings settings;
@ -68,6 +71,7 @@ public class AutodetectBuilder {
this.filesToDelete = Objects.requireNonNull(filesToDelete);
this.logger = Objects.requireNonNull(logger);
referencedFilters = new HashSet<>();
specialEvents = Collections.emptyList();
}
public AutodetectBuilder referencedFilters(Set<MlFilter> filters) {
@ -85,6 +89,11 @@ public class AutodetectBuilder {
return this;
}
public AutodetectBuilder specialEvents(List<SpecialEvent> specialEvents) {
this.specialEvents = specialEvents;
return this;
}
/**
* Requests that the controller daemon start an autodetect process.
*/
@ -161,7 +170,7 @@ public class AutodetectBuilder {
try (OutputStreamWriter osw = new OutputStreamWriter(
Files.newOutputStream(fieldConfigFile),
StandardCharsets.UTF_8)) {
new FieldConfigWriter(job.getAnalysisConfig(), referencedFilters, osw, logger).write();
new FieldConfigWriter(job.getAnalysisConfig(), referencedFilters, specialEvents, osw, logger).write();
}
String fieldConfig = FIELD_CONFIG_ARG + fieldConfigFile.toString();

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
@ -184,19 +185,24 @@ public class AutodetectCommunicator implements Closeable {
}
}
public void writeUpdateProcessMessage(ModelPlotConfig config, List<JobUpdate.DetectorUpdate> updates,
public void writeUpdateProcessMessage(UpdateParams updateParams, List<SpecialEvent> specialEvents,
BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
if (config != null) {
autodetectProcess.writeUpdateModelPlotMessage(config);
if (updateParams.getModelPlotConfig() != null) {
autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig());
}
if (updates != null) {
for (JobUpdate.DetectorUpdate update : updates) {
if (updateParams.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) {
if (update.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
}
}
}
if (updateParams.isUpdateSpecialEvents()) {
autodetectProcess.writeUpdateSpecialEventsMessage(job.getAnalysisConfig().getDetectors().size(), specialEvents);
}
return null;
}, handler);
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
@ -71,7 +72,19 @@ public interface AutodetectProcess extends Closeable {
* @param rules Detector rules
* @throws IOException If the write fails
*/
void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException;
void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules)
throws IOException;
/**
* Write the updated special events overwriting any previous events.
* Writing an empty list of special events removes any previously set events.
*
* @param numberOfDetectors The number of detectors in the job. All will be
* updated with the special events
* @param specialEvents List of events to update
* @throws IOException If the write fails
*/
void writeUpdateSpecialEventsMessage(int numberOfDetectors, List<SpecialEvent> specialEvents) throws IOException;
/**
* Flush the job pushing any stale data into autodetect.

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
@ -21,15 +22,15 @@ public interface AutodetectProcessFactory {
/**
* Create an implementation of {@link AutodetectProcess}
*
* @param job Job configuration for the analysis process
* @param modelSnapshot The model snapshot to restore from
* @param quantiles The quantiles to push to the native process
* @param filters The filters to push to the native process
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @param onProcessCrash Callback to execute if the process stops unexpectedly
* @param job Job configuration for the analysis process
* @param autodetectParams Autodetect parameters including The model snapshot to restore from
* and the quantiles to push to the native process
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @param onProcessCrash Callback to execute if the process stops unexpectedly
* @return The process
*/
AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters,
AutodetectProcess createAutodetectProcess(Job job,
AutodetectParams autodetectParams,
ExecutorService executorService,
Runnable onProcessCrash);
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
@ -57,6 +58,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -249,7 +251,7 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
public void writeUpdateProcessMessage(JobTask jobTask, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config,
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams,
Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
@ -258,13 +260,25 @@ public class AutodetectProcessManager extends AbstractComponent {
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
}
communicator.writeUpdateProcessMessage(config, updates, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
Consumer<List<SpecialEvent>> eventConsumer = specialEvents -> {
communicator.writeUpdateProcessMessage(updateParams, specialEvents, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
};
if (updateParams.isUpdateSpecialEvents()) {
jobProvider.specialEvents(jobTask.getJobId(), eventConsumer, handler::accept);
} else {
eventConsumer.accept(Collections.emptyList());
}
}
public void openJob(JobTask jobTask, Consumer<Exception> handler) {
@ -377,8 +391,8 @@ public class AutodetectProcessManager extends AbstractComponent {
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService, onProcessCrash(jobTask));
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService,
onProcessCrash(jobTask));
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(),
autodetectParams.modelSnapshot() != null);

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
@ -71,6 +72,10 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
}
@Override
public void writeUpdateSpecialEventsMessage(int numberOfDetectors, List<SpecialEvent> specialEvents) throws IOException {
}
/**
* Accept the request do nothing with it but write the flush acknowledgement to {@link #readAutodetectResults()}
* @param params Should interim results be generated

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
@ -41,6 +42,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* Autodetect process using native code.
@ -159,6 +161,16 @@ class NativeAutodetectProcess implements AutodetectProcess {
writer.writeUpdateDetectorRulesMessage(detectorIndex, rules);
}
@Override
public void writeUpdateSpecialEventsMessage(int numberOfEvents, List<SpecialEvent> specialEvents) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields);
List<DetectionRule> eventsAsRules = specialEvents.stream().map(SpecialEvent::toDetectionRule).collect(Collectors.toList());
for (int i = 0; i < numberOfEvents; i++) {
writer.writeUpdateDetectorRulesMessage(i, eventsAsRules);
}
}
@Override
public String flushJob(FlushJobParams params) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields);

View File

@ -19,6 +19,7 @@ import org.elasticsearch.xpack.ml.job.process.ProcessCtrl;
import org.elasticsearch.xpack.ml.job.process.ProcessPipes;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -53,14 +54,14 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
}
@Override
public AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot,
Quantiles quantiles, Set<MlFilter> filters,
public AutodetectProcess createAutodetectProcess(Job job,
AutodetectParams params,
ExecutorService executorService,
Runnable onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, ProcessCtrl.AUTODETECT, job.getId(),
true, false, true, true, modelSnapshot != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings));
createNativeProcess(job, quantiles, filters, processPipes, filesToDelete);
true, false, true, true, params.modelSnapshot() != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings));
createNativeProcess(job, params, processPipes, filesToDelete);
int numberOfAnalysisFields = job.getAnalysisConfig().analysisFields().size();
StateProcessor stateProcessor = new StateProcessor(settings, client);
@ -82,17 +83,18 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
}
}
private void createNativeProcess(Job job, Quantiles quantiles, Set<MlFilter> filters, ProcessPipes processPipes,
private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) {
try {
AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env,
settings, nativeController, processPipes)
.referencedFilters(filters);
.referencedFilters(autodetectParams.filters())
.specialEvents(autodetectParams.specialEvents());
// if state is null or empty it will be ignored
// else it is used to restore the quantiles
if (quantiles != null) {
autodetectBuilder.quantiles(quantiles);
if (autodetectParams.quantiles() != null) {
autodetectBuilder.quantiles(autodetectParams.quantiles());
}
autodetectBuilder.build();

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import java.util.List;
public final class UpdateParams {
private final ModelPlotConfig modelPlotConfig;
private final List<JobUpdate.DetectorUpdate> detectorUpdates;
private final boolean updateSpecialEvents;
public UpdateParams(@Nullable ModelPlotConfig modelPlotConfig,
@Nullable List<JobUpdate.DetectorUpdate> detectorUpdates,
boolean updateSpecialEvents) {
this.modelPlotConfig = modelPlotConfig;
this.detectorUpdates = detectorUpdates;
this.updateSpecialEvents = updateSpecialEvents;
}
public ModelPlotConfig getModelPlotConfig() {
return modelPlotConfig;
}
public List<JobUpdate.DetectorUpdate> getDetectorUpdates() {
return detectorUpdates;
}
public boolean isUpdateSpecialEvents() {
return updateSpecialEvents;
}
}

View File

@ -5,14 +5,19 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.Build;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -20,23 +25,24 @@ public class AutodetectParams {
private final DataCounts dataCounts;
private final ModelSizeStats modelSizeStats;
@Nullable
private final ModelSnapshot modelSnapshot;
@Nullable
private final Quantiles quantiles;
private final Set<MlFilter> filters;
private final List<SpecialEvent> specialEvents;
private AutodetectParams(DataCounts dataCounts, ModelSizeStats modelSizeStats,
@Nullable ModelSnapshot modelSnapshot,
@Nullable Quantiles quantiles, Set<MlFilter> filters) {
@Nullable Quantiles quantiles, Set<MlFilter> filters,
List<SpecialEvent> specialEvents) {
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = Objects.requireNonNull(modelSizeStats);
this.modelSnapshot = modelSnapshot;
this.quantiles = quantiles;
this.filters = filters;
this.specialEvents = specialEvents;
}
public DataCounts dataCounts() {
@ -61,6 +67,10 @@ public class AutodetectParams {
return filters;
}
public List<SpecialEvent> specialEvents() {
return specialEvents;
}
@Override
public boolean equals(Object other) {
if (this == other) {
@ -77,12 +87,13 @@ public class AutodetectParams {
&& Objects.equals(this.modelSizeStats, that.modelSizeStats)
&& Objects.equals(this.modelSnapshot, that.modelSnapshot)
&& Objects.equals(this.quantiles, that.quantiles)
&& Objects.equals(this.filters, that.filters);
&& Objects.equals(this.filters, that.filters)
&& Objects.equals(this.specialEvents, that.specialEvents);
}
@Override
public int hashCode() {
return Objects.hash(dataCounts, modelSizeStats, modelSnapshot, quantiles, filters);
return Objects.hash(dataCounts, modelSizeStats, modelSnapshot, quantiles, filters, specialEvents);
}
public static class Builder {
@ -92,11 +103,13 @@ public class AutodetectParams {
private ModelSnapshot modelSnapshot;
private Quantiles quantiles;
private Set<MlFilter> filters;
private List<SpecialEvent> specialEvents;
public Builder(String jobId) {
dataCounts = new DataCounts(jobId);
modelSizeStats = new ModelSizeStats.Builder(jobId).build();
filters = new HashSet<>();
specialEvents = new ArrayList<>();
}
public Builder setDataCounts(DataCounts dataCounts) {
@ -119,6 +132,15 @@ public class AutodetectParams {
return this;
}
public Builder addSpecialEvent(SpecialEvent specialEvent) {
specialEvents.add(specialEvent);
return this;
}
public Builder setSpecialEvents(List<SpecialEvent> specialEvents) {
this.specialEvents = specialEvents;
return this;
}
public Builder addFilter(MlFilter filter) {
filters.add(filter);
return this;
@ -131,7 +153,7 @@ public class AutodetectParams {
public AutodetectParams build() {
return new AutodetectParams(dataCounts, modelSizeStats, modelSnapshot, quantiles,
filters);
filters, specialEvents);
}
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -199,21 +200,22 @@ public class ControlMsgToProcessWriter {
}
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
StringWriter configWriter = new StringWriter();
configWriter.append(UPDATE_MESSAGE_CODE).append("[detectorRules]\n");
configWriter.append("detectorIndex=").append(Integer.toString(detectorIndex)).append("\n");
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(UPDATE_MESSAGE_CODE).append("[detectorRules]\n");
stringBuilder.append("detectorIndex=").append(Integer.toString(detectorIndex)).append("\n");
configWriter.append("rulesJson=");
stringBuilder.append("rulesJson=");
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startArray();
for (DetectionRule rule : rules) {
rule.toXContent(builder, ToXContent.EMPTY_PARAMS);
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startArray();
for (DetectionRule rule : rules) {
rule.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endArray();
stringBuilder.append(builder.string());
}
builder.endArray();
configWriter.append(builder.string());
writeMessage(configWriter.toString());
writeMessage(stringBuilder.toString());
}
/**

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DefaultDetectorDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
@ -19,9 +20,11 @@ import org.elasticsearch.xpack.ml.utils.MlStrings;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.job.process.autodetect.writer.WriterConstants.EQUALS;
@ -33,6 +36,7 @@ public class FieldConfigWriter {
private static final String CATEGORIZATION_FIELD_OPTION = " categorizationfield=";
private static final String CATEGORIZATION_FILTER_PREFIX = "categorizationfilter.";
private static final String FILTER_PREFIX = "filter.";
private static final String SPECIAL_EVENT_PREFIX = "specialevent.";
// Note: for the Engine API summarycountfield is currently passed as a
// command line option to autodetect rather than in the field config file
@ -41,13 +45,15 @@ public class FieldConfigWriter {
private final AnalysisConfig config;
private final Set<MlFilter> filters;
private final List<SpecialEvent> specialEvents;
private final OutputStreamWriter writer;
private final Logger logger;
public FieldConfigWriter(AnalysisConfig config, Set<MlFilter> filters,
public FieldConfigWriter(AnalysisConfig config, Set<MlFilter> filters, List<SpecialEvent> specialEvents,
OutputStreamWriter writer, Logger logger) {
this.config = Objects.requireNonNull(config);
this.filters = Objects.requireNonNull(filters);
this.specialEvents = Objects.requireNonNull(specialEvents);
this.writer = Objects.requireNonNull(writer);
this.logger = Objects.requireNonNull(logger);
}
@ -68,16 +74,18 @@ public class FieldConfigWriter {
writeAsEnumeratedSettings(INFLUENCER_PREFIX, config.getInfluencers(), contents, false);
logger.debug("FieldConfig:\n" + contents.toString());
writer.write(contents.toString());
}
private void writeDetectors(StringBuilder contents) throws IOException {
int counter = 0;
List<DetectionRule> events = specialEvents.stream().map(SpecialEvent::toDetectionRule).collect(Collectors.toList());
for (Detector detector : config.getDetectors()) {
int detectorId = counter++;
writeDetectorClause(detectorId, detector, contents);
writeDetectorRules(detectorId, detector, contents);
writeDetectorRules(detectorId, detector, events, contents);
}
}
@ -95,18 +103,24 @@ public class FieldConfigWriter {
contents.append(NEW_LINE);
}
private void writeDetectorRules(int detectorId, Detector detector, StringBuilder contents) throws IOException {
List<DetectionRule> rules = detector.getDetectorRules();
if (rules == null || rules.isEmpty()) {
private void writeDetectorRules(int detectorId, Detector detector, List<DetectionRule> specialEvents,
StringBuilder contents) throws IOException {
List<DetectionRule> rules = new ArrayList<>();
if (detector.getDetectorRules() != null) {
rules.addAll(detector.getDetectorRules());
}
rules.addAll(specialEvents);
if (rules.isEmpty()) {
return;
}
contents.append(DETECTOR_PREFIX).append(detectorId)
.append(DETECTOR_RULES_SUFFIX).append(EQUALS);
contents.append(DETECTOR_PREFIX).append(detectorId).append(DETECTOR_RULES_SUFFIX).append(EQUALS);
contents.append('[');
boolean first = true;
for (DetectionRule rule : detector.getDetectorRules()) {
for (DetectionRule rule : rules) {
if (first) {
first = false;
} else {
@ -117,7 +131,6 @@ public class FieldConfigWriter {
}
}
contents.append(']');
contents.append(NEW_LINE);
}

View File

@ -6,10 +6,16 @@
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.joda.time.DateTime;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class UpdateProcessActionRequestTests extends AbstractStreamableTestCase<UpdateProcessAction.Request> {
@ -29,7 +35,7 @@ public class UpdateProcessActionRequestTests extends AbstractStreamableTestCase<
updates.add(new JobUpdate.DetectorUpdate(randomInt(), randomAlphaOfLength(10), null));
}
}
return new UpdateProcessAction.Request(randomAlphaOfLength(10), config, updates);
return new UpdateProcessAction.Request(randomAlphaOfLength(10), config, updates, randomBoolean());
}
@Override

View File

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.calendars;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.ml.job.config.Connective;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Operator;
import org.elasticsearch.xpack.ml.job.config.RuleAction;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.config.RuleConditionType;
import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent> {
@Override
protected SpecialEvent createTestInstance() {
int size = randomInt(10);
List<String> jobIds = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
jobIds.add(randomAlphaOfLengthBetween(1, 20));
}
return new SpecialEvent(randomAlphaOfLength(10), randomAlphaOfLength(10),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC),
jobIds);
}
@Override
protected Writeable.Reader<SpecialEvent> instanceReader() {
return SpecialEvent::new;
}
@Override
protected SpecialEvent doParseInstance(XContentParser parser) throws IOException {
return SpecialEvent.PARSER.apply(parser, null);
}
public void testToDetectionRule() {
SpecialEvent event = createTestInstance();
DetectionRule rule = event.toDetectionRule();
assertEquals(Connective.AND, rule.getConditionsConnective());
assertEquals(RuleAction.SKIP_SAMPLING_AND_FILTER_RESULTS, rule.getRuleAction());
assertNull(rule.getTargetFieldName());
assertNull(rule.getTargetFieldValue());
List<RuleCondition> conditions = rule.getRuleConditions();
assertEquals(2, conditions.size());
assertEquals(RuleConditionType.TIME, conditions.get(0).getConditionType());
assertEquals(Operator.GTE, conditions.get(0).getCondition().getOperator());
assertEquals(event.getStartTime().toEpochSecond(), Long.parseLong(conditions.get(0).getCondition().getValue()));
assertEquals(RuleConditionType.TIME, conditions.get(1).getConditionType());
assertEquals(Operator.LT, conditions.get(1).getCondition().getOperator());
assertEquals(event.getEndTime().toEpochSecond(), Long.parseLong(conditions.get(1).getCondition().getValue()));
}
}

View File

@ -43,24 +43,21 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase {
}
public void test() throws Exception {
RuleCondition condition1 = new RuleCondition(
RuleCondition condition1 = RuleCondition.createNumerical(
RuleConditionType.NUMERICAL_ACTUAL,
"by_field",
"by_field_value_1",
new Condition(Operator.LT, "1000"),
null);
RuleCondition condition2 = new RuleCondition(
new Condition(Operator.LT, "1000"));
RuleCondition condition2 = RuleCondition.createNumerical(
RuleConditionType.NUMERICAL_ACTUAL,
"by_field",
"by_field_value_2",
new Condition(Operator.LT, "500"),
null);
RuleCondition condition3 = new RuleCondition(
new Condition(Operator.LT, "500"));
RuleCondition condition3 = RuleCondition.createNumerical(
RuleConditionType.NUMERICAL_ACTUAL,
"by_field",
"by_field_value_3",
new Condition(Operator.LT, "100"),
null);
new Condition(Operator.LT, "100"));
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(condition1, condition2, condition3)).build();
Detector.Builder detector = new Detector.Builder("max", "value");
@ -112,24 +109,21 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase {
{
// Update rules so that the anomalies suppression is inverted
RuleCondition newCondition1 = new RuleCondition(
RuleCondition newCondition1 = RuleCondition.createNumerical(
RuleConditionType.NUMERICAL_ACTUAL,
"by_field",
"by_field_value_1",
new Condition(Operator.GT, "1000"),
null);
RuleCondition newCondition2 = new RuleCondition(
new Condition(Operator.GT, "1000"));
RuleCondition newCondition2 = RuleCondition.createNumerical(
RuleConditionType.NUMERICAL_ACTUAL,
"by_field",
"by_field_value_2",
new Condition(Operator.GT, "500"),
null);
RuleCondition newCondition3 = new RuleCondition(
new Condition(Operator.GT, "500"));
RuleCondition newCondition3 = RuleCondition.createNumerical(
RuleConditionType.NUMERICAL_ACTUAL,
"by_field",
"by_field_value_3",
new Condition(Operator.GT, "0"),
null);
new Condition(Operator.GT, "0"));
DetectionRule newRule = new DetectionRule.Builder(Arrays.asList(newCondition1, newCondition2, newCondition3)).build();
JobUpdate.Builder update = new JobUpdate.Builder(job.getId());
update.setDetectorUpdates(Arrays.asList(new JobUpdate.DetectorUpdate(0, null, Arrays.asList(newRule))));

View File

@ -0,0 +1,349 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.XPackSingleNodeTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Connective;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.RuleAction;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.junit.Before;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
public class JobProviderIT extends XPackSingleNodeTestCase {
private JobProvider jobProvider;
@Override
protected Settings nodeSettings() {
Settings.Builder newSettings = Settings.builder();
newSettings.put(super.nodeSettings());
newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
return newSettings.build();
}
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(XPackPlugin.class);
}
@Before
public void createComponents() throws Exception {
Settings.Builder builder = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
jobProvider = new JobProvider(client(), builder.build());
waitForMlTemplates();
}
private void waitForMlTemplates() throws Exception {
// block until the templates are installed
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertTrue("Timed out waiting for the ML templates to be installed",
MachineLearning.allTemplatesInstalled(state));
});
}
public void testSpecialEvents() throws Exception {
List<SpecialEvent> events = new ArrayList<>();
events.add(new SpecialEvent("A_and_B_downtime", "downtime", createZonedDateTime(1000L), createZonedDateTime(2000L),
Arrays.asList("job_a", "job_b")));
events.add(new SpecialEvent("A_downtime", "downtime", createZonedDateTime(5000L), createZonedDateTime(10000L),
Collections.singletonList("job_a")));
indexSpecialEvents(events);
Job.Builder job = createJob("job_b");
List<SpecialEvent> returnedEvents = getSpecialEvents(job.getId());
assertEquals(1, returnedEvents.size());
assertEquals(events.get(0), returnedEvents.get(0));
job = createJob("job_a");
returnedEvents = getSpecialEvents(job.getId());
assertEquals(2, returnedEvents.size());
assertEquals(events.get(0), returnedEvents.get(0));
assertEquals(events.get(1), returnedEvents.get(1));
job = createJob("job_c");
returnedEvents = getSpecialEvents(job.getId());
assertEquals(0, returnedEvents.size());
}
public void testGetAutodetectParams() throws Exception {
String jobId = "test_get_autodetect_params";
Job.Builder job = createJob(jobId, Arrays.asList("fruit", "tea"));
// index the param docs
List<SpecialEvent> events = new ArrayList<>();
events.add(new SpecialEvent("A_downtime", "downtime", createZonedDateTime(5000L), createZonedDateTime(10000L),
Collections.singletonList(jobId)));
events.add(new SpecialEvent("A_downtime2", "downtime", createZonedDateTime(20000L), createZonedDateTime(21000L),
Collections.singletonList(jobId)));
indexSpecialEvents(events);
List<MlFilter> filters = new ArrayList<>();
filters.add(new MlFilter("fruit", Arrays.asList("apple", "pear")));
filters.add(new MlFilter("tea", Arrays.asList("green", "builders")));
indexFilters(filters);
DataCounts earliestCounts = DataCountsTests.createTestInstance(jobId);
earliestCounts.setLatestRecordTimeStamp(new Date(1500000000000L));
indexDataCounts(earliestCounts, jobId);
DataCounts latestCounts = DataCountsTests.createTestInstance(jobId);
latestCounts.setLatestRecordTimeStamp(new Date(1510000000000L));
indexDataCounts(latestCounts, jobId);
ModelSizeStats earliestSizeStats = new ModelSizeStats.Builder(jobId).setLogTime(new Date(1500000000000L)).build();
ModelSizeStats latestSizeStats = new ModelSizeStats.Builder(jobId).setLogTime(new Date(1510000000000L)).build();
indexModelSizeStats(earliestSizeStats);
indexModelSizeStats(latestSizeStats);
job.setModelSnapshotId("snap_1");
ModelSnapshot snapshot = new ModelSnapshot.Builder(jobId).setSnapshotId("snap_1").build();
indexModelSnapshot(snapshot);
Quantiles quantiles = new Quantiles(jobId, new Date(), "quantile-state");
indexQuantiles(quantiles);
client().admin().indices().prepareRefresh(MlMetaIndex.INDEX_NAME, AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).get();
AutodetectParams params = getAutodetectParams(job.build(new Date()));
// special events
assertNotNull(params.specialEvents());
assertEquals(2, params.specialEvents().size());
assertEquals(events.get(0), params.specialEvents().get(0));
assertEquals(events.get(1), params.specialEvents().get(1));
// filters
assertNotNull(params.filters());
assertEquals(2, params.filters().size());
assertTrue(params.filters().contains(filters.get(0)));
assertTrue(params.filters().contains(filters.get(1)));
// datacounts
assertNotNull(params.dataCounts());
assertEquals(latestCounts, params.dataCounts());
// model size stats
assertNotNull(params.modelSizeStats());
assertEquals(latestSizeStats, params.modelSizeStats());
// model snapshot
assertNotNull(params.modelSnapshot());
assertEquals(snapshot, params.modelSnapshot());
// quantiles
assertNotNull(params.quantiles());
assertEquals(quantiles, params.quantiles());
}
private AutodetectParams getAutodetectParams(Job job) throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<AutodetectParams> searchResultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.getAutodetectParams(job, params -> {
searchResultHolder.set(params);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
return searchResultHolder.get();
}
private List<SpecialEvent> getSpecialEvents(String jobId) throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<List<SpecialEvent>> searchResultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.specialEvents(jobId, params -> {
searchResultHolder.set(params);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
return searchResultHolder.get();
}
private Job.Builder createJob(String jobId) {
return createJob(jobId, Collections.emptyList());
}
private Job.Builder createJob(String jobId, List<String> filterIds) {
Job.Builder builder = new Job.Builder(jobId);
AnalysisConfig.Builder ac = createAnalysisConfig(filterIds);
DataDescription.Builder dc = new DataDescription.Builder();
builder.setAnalysisConfig(ac);
builder.setDataDescription(dc);
PutJobAction.Request request = new PutJobAction.Request(builder);
PutJobAction.Response response = client().execute(PutJobAction.INSTANCE, request).actionGet();
assertTrue(response.isAcknowledged());
return builder;
}
private AnalysisConfig.Builder createAnalysisConfig(List<String> filterIds) {
Detector.Builder detector = new Detector.Builder("mean", "field");
detector.setByFieldName("by_field");
if (!filterIds.isEmpty()) {
List<RuleCondition> conditions = new ArrayList<>();
for (String filterId : filterIds) {
conditions.add(RuleCondition.createCategorical("by_field", filterId));
}
DetectionRule.Builder rule = new DetectionRule.Builder(conditions)
.setRuleAction(RuleAction.FILTER_RESULTS)
.setConditionsConnective(Connective.OR);
detector.setDetectorRules(Collections.singletonList(rule.build()));
}
return new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
}
private void indexSpecialEvents(List<SpecialEvent> events) throws IOException {
BulkRequestBuilder bulkRequest = client().prepareBulk();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (SpecialEvent event : events) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, event.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(event.toXContent(builder, ToXContent.EMPTY_PARAMS));
bulkRequest.add(indexRequest);
}
}
BulkResponse response = bulkRequest.execute().actionGet();
if (response.hasFailures()) {
throw new IllegalStateException(Strings.toString(response));
}
}
private void indexDataCounts(DataCounts counts, String jobId) throws Exception {
JobDataCountsPersister persister = new JobDataCountsPersister(nodeSettings(), client());
AtomicReference<Exception> errorHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
persister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
assertTrue(aBoolean);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
errorHolder.set(e);
latch.countDown();
}
});
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
}
private void indexFilters(List<MlFilter> filters) throws IOException {
BulkRequestBuilder bulkRequest = client().prepareBulk();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (MlFilter filter : filters) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS));
bulkRequest.add(indexRequest);
}
}
bulkRequest.execute().actionGet();
}
private void indexModelSizeStats(ModelSizeStats modelSizeStats) {
JobResultsPersister persister = new JobResultsPersister(nodeSettings(), client());
persister.persistModelSizeStats(modelSizeStats);
}
private void indexModelSnapshot(ModelSnapshot snapshot) {
JobResultsPersister persister = new JobResultsPersister(nodeSettings(), client());
persister.persistModelSnapshot(snapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
}
private void indexQuantiles(Quantiles quantiles) {
JobResultsPersister persister = new JobResultsPersister(nodeSettings(), client());
persister.persistQuantiles(quantiles);
}
private ZonedDateTime createZonedDateTime(long epochMs) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(epochMs), ZoneOffset.UTC);
}
}

View File

@ -17,6 +17,10 @@ import static org.hamcrest.Matchers.equalTo;
public class MlFilterTests extends AbstractSerializingTestCase<MlFilter> {
public static MlFilter createTestFilter() {
return new MlFilterTests().createTestInstance();
}
@Override
protected MlFilter createTestInstance() {
int size = randomInt(10);

View File

@ -11,12 +11,14 @@ import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class RuleActionTests extends ESTestCase {
public class
RuleActionTests extends ESTestCase {
public void testForString() {
assertEquals(RuleAction.FILTER_RESULTS, RuleAction.fromString("filter_results"));
assertEquals(RuleAction.FILTER_RESULTS, RuleAction.fromString("FILTER_RESULTS"));
assertEquals(RuleAction.FILTER_RESULTS, RuleAction.fromString("fiLTer_Results"));
assertEquals(RuleAction.SKIP_SAMPLING, RuleAction.fromString("SKip_sampLing"));
assertEquals(RuleAction.SKIP_SAMPLING_AND_FILTER_RESULTS, RuleAction.fromString("skip_sampling_and_filter_results"));
}
public void testToString() {
@ -30,5 +32,12 @@ public class RuleActionTests extends ESTestCase {
assertThat(RuleAction.readFromStream(in), equalTo(RuleAction.FILTER_RESULTS));
}
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(2);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(RuleAction.readFromStream(in), equalTo(RuleAction.SKIP_SAMPLING_AND_FILTER_RESULTS));
}
}
}
}

View File

@ -30,7 +30,7 @@ public class RuleConditionTests extends AbstractSerializingTestCase<RuleConditio
default:
// no need to randomize, it is properly randomily tested in
// ConditionTest
condition = new Condition(Operator.LT, Double.toString(randomDouble()));
condition = new Condition(Operator.LT, Long.toString(randomLong()));
if (randomBoolean()) {
fieldName = randomAlphaOfLengthBetween(1, 20);
fieldValue = randomAlphaOfLengthBetween(1, 20);
@ -218,4 +218,30 @@ public class RuleConditionTests extends AbstractSerializingTestCase<RuleConditio
new RuleCondition(RuleConditionType.NUMERICAL_DIFF_ABS, "metric", "cpu", new Condition(Operator.LT, "5"), null);
}
public void testCreateTimeBased() {
RuleCondition timeBased = RuleCondition.createTime(Operator.GTE, 100L);
assertEquals(RuleConditionType.TIME, timeBased.getConditionType());
assertEquals(Operator.GTE, timeBased.getCondition().getOperator());
assertEquals("100", timeBased.getCondition().getValue());
assertNull(timeBased.getFieldName());
assertNull(timeBased.getFieldValue());
assertNull(timeBased.getValueFilter());
}
public void testCreateTimeBased_GivenOperatorMatch() {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> RuleCondition.createTime(Operator.MATCH, 100L));
assertEquals("Invalid detector rule: operator 'match' is not allowed", e.getMessage());
}
public void testCreateNumerical() {
RuleCondition ruleCondition = RuleCondition.createNumerical(RuleConditionType.NUMERICAL_ACTUAL, "foo", "bar",
new Condition(Operator.GTE, "100"));
assertEquals(RuleConditionType.NUMERICAL_ACTUAL, ruleCondition.getConditionType());
assertEquals(Operator.GTE, ruleCondition.getCondition().getOperator());
assertEquals("100", ruleCondition.getCondition().getValue());
assertEquals("foo", ruleCondition.getFieldName());
assertEquals("bar", ruleCondition.getFieldValue());
assertNull(ruleCondition.getValueFilter());
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.EnumSet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -110,4 +111,17 @@ public class RuleConditionTypeTests extends ESTestCase {
}
}
public void testIsNumerical() {
for (RuleConditionType type : EnumSet.allOf(RuleConditionType.class)) {
boolean isNumerical = type.isNumerical();
if (type == RuleConditionType.NUMERICAL_ACTUAL ||
type == RuleConditionType.NUMERICAL_DIFF_ABS ||
type == RuleConditionType.NUMERICAL_TYPICAL) {
assertTrue(isNumerical);
} else {
assertFalse(isNumerical);
}
}
}
}

View File

@ -211,7 +211,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(autodetectProcess.isProcessAlive()).thenReturn(true);
when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator());
AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
(j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
@ -473,8 +473,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules));
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.writeUpdateProcessMessage(jobTask, detectorUpdates, modelConfig, e -> {});
verify(communicator).writeUpdateProcessMessage(same(modelConfig), same(detectorUpdates), any());
UpdateParams updateParams = new UpdateParams(modelConfig, detectorUpdates, false);
manager.writeUpdateProcessMessage(jobTask, updateParams, e -> {});
verify(communicator).writeUpdateProcessMessage(same(updateParams), eq(Collections.emptyList()), any());
}
public void testJobHasActiveAutodetectProcess() throws IOException {
@ -545,7 +546,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id"));
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
(j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
@ -618,7 +619,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown(jobId)).thenReturn(createJobDetails(jobId));
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
(j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
return new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);

View File

@ -16,9 +16,8 @@ import static org.hamcrest.Matchers.greaterThan;
public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
@Override
public DataCounts createTestInstance() {
return new DataCounts(randomAlphaOfLength(10), randomIntBetween(1, 1_000_000),
public static DataCounts createTestInstance(String jobId) {
return new DataCounts(jobId, randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
@ -27,6 +26,11 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
new DateTime(randomDateTimeZone()).toDate());
}
@Override
public DataCounts createTestInstance() {
return createTestInstance(randomAlphaOfLength(10));
}
@Override
protected Writeable.Reader<DataCounts> instanceReader() {
return DataCounts::new;

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.Condition;
import org.elasticsearch.xpack.ml.job.config.Connective;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
@ -21,8 +22,13 @@ import org.mockito.InOrder;
import org.mockito.Mockito;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
@ -207,6 +213,6 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
private static List<RuleCondition> createRule(String value) {
Condition condition = new Condition(Operator.GT, value);
return Collections.singletonList(new RuleCondition(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition, null));
return Collections.singletonList(RuleCondition.createNumerical(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition));
}
}

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Condition;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
@ -25,12 +26,17 @@ import org.mockito.ArgumentCaptor;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@ -43,12 +49,14 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
public class FieldConfigWriterTests extends ESTestCase {
private AnalysisConfig analysisConfig;
private Set<MlFilter> filters;
private List<SpecialEvent> specialEvents;
private OutputStreamWriter writer;
@Before
public void setUpDeps() {
analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(new Detector.Builder("count", null).build())).build();
filters = new LinkedHashSet<>();
specialEvents = new ArrayList<>();
}
public void testMultipleDetectorsToConfFile()
@ -184,8 +192,8 @@ public class FieldConfigWriterTests extends ESTestCase {
Detector.Builder detector = new Detector.Builder("mean", "metricValue");
detector.setByFieldName("metricName");
detector.setPartitionFieldName("instance");
RuleCondition ruleCondition =
new RuleCondition(RuleConditionType.NUMERICAL_ACTUAL, "metricName", "metricValue", new Condition(Operator.LT, "5"), null);
RuleCondition ruleCondition = RuleCondition.createNumerical
(RuleConditionType.NUMERICAL_ACTUAL, "metricName", "metricValue", new Condition(Operator.LT, "5"));
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(ruleCondition)).setTargetFieldName("instance").build();
detector.setDetectorRules(Arrays.asList(rule));
@ -226,7 +234,35 @@ public class FieldConfigWriterTests extends ESTestCase {
verifyNoMoreInteractions(writer);
}
public void testWrite_GivenSpecialEvents() throws IOException {
Detector d = new Detector.Builder("count", null).build();
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(d));
analysisConfig = builder.build();
specialEvents.add(
new SpecialEvent("1", "The Ashes", ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC), Collections.emptyList()));
specialEvents.add(
new SpecialEvent("2", "elasticon", ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC), Collections.emptyList()));
writer = mock(OutputStreamWriter.class);
createFieldConfigWriter().write();
verify(writer).write("detector.0.clause = count\n" +
"detector.0.rules = [{\"rule_action\":\"skip_sampling_and_filter_results\",\"conditions_connective\":\"and\"," +
"\"rule_conditions\":[{\"condition_type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1511395200\"}}," +
"{\"condition_type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1515369600\"}}]}," +
"{\"rule_action\":\"skip_sampling_and_filter_results\",\"conditions_connective\":\"and\"," +
"\"rule_conditions\":[{\"condition_type\":\"time\",\"condition\":{\"operator\":\"gte\",\"value\":\"1519603200\"}}," +
"{\"condition_type\":\"time\",\"condition\":{\"operator\":\"lt\",\"value\":\"1519862400\"}}]}]" +
"\n");
verifyNoMoreInteractions(writer);
}
private FieldConfigWriter createFieldConfigWriter() {
return new FieldConfigWriter(analysisConfig, filters, writer, mock(Logger.class));
return new FieldConfigWriter(analysisConfig, filters, specialEvents, writer, mock(Logger.class));
}
}