[ML] Use TimeValue in job and datafeed configs (elastic/x-pack-elasticsearch#732)

relates elastic/x-pack-elasticsearch#679

Original commit: elastic/x-pack-elasticsearch@891edd5bfe
This commit is contained in:
Dimitris Athanasiou 2017-03-16 14:07:47 +00:00 committed by GitHub
parent 6c9727c2db
commit 16efd4e474
55 changed files with 506 additions and 495 deletions

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -18,23 +19,21 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.DomainSplitFunction;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Datafeed configuration options. Describes where to proactively pull input
@ -75,8 +74,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
PARSER.declareString(Builder::setJobId, Job.ID);
PARSER.declareStringArray(Builder::setIndexes, INDEXES);
PARSER.declareStringArray(Builder::setTypes, TYPES);
PARSER.declareLong(Builder::setQueryDelay, QUERY_DELAY);
PARSER.declareLong(Builder::setFrequency, FREQUENCY);
PARSER.declareString((builder, val) ->
builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY);
PARSER.declareString((builder, val) ->
builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY);
PARSER.declareObject(Builder::setQuery,
(p, c) -> new QueryParseContext(p).parseInnerQueryBuilder(), QUERY);
PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(new QueryParseContext(p)),
@ -99,14 +100,14 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private final String jobId;
/**
* The delay in seconds before starting to query a period of time
* The delay before starting to query a period of time
*/
private final Long queryDelay;
private final TimeValue queryDelay;
/**
* The frequency in seconds with which queries are executed
* The frequency with which queries are executed
*/
private final Long frequency;
private final TimeValue frequency;
private final List<String> indexes;
private final List<String> types;
@ -117,7 +118,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private final boolean source;
private final ChunkingConfig chunkingConfig;
private DatafeedConfig(String id, String jobId, Long queryDelay, Long frequency, List<String> indexes, List<String> types,
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indexes, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, boolean source, ChunkingConfig chunkingConfig) {
this.id = id;
@ -137,8 +138,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public DatafeedConfig(StreamInput in) throws IOException {
this.id = in.readString();
this.jobId = in.readString();
this.queryDelay = in.readOptionalLong();
this.frequency = in.readOptionalLong();
this.queryDelay = in.readOptionalWriteable(TimeValue::new);
this.frequency = in.readOptionalWriteable(TimeValue::new);
if (in.readBoolean()) {
this.indexes = in.readList(StreamInput::readString);
} else {
@ -169,11 +170,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return jobId;
}
public Long getQueryDelay() {
public TimeValue getQueryDelay() {
return queryDelay;
}
public Long getFrequency() {
public TimeValue getFrequency() {
return frequency;
}
@ -220,8 +221,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeString(jobId);
out.writeOptionalLong(queryDelay);
out.writeOptionalLong(frequency);
out.writeOptionalWriteable(queryDelay);
out.writeOptionalWriteable(frequency);
if (indexes != null) {
out.writeBoolean(true);
out.writeStringList(indexes);
@ -258,9 +259,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(ID.getPreferredName(), id);
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(QUERY_DELAY.getPreferredName(), queryDelay);
builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency);
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}
builder.field(INDEXES.getPreferredName(), indexes);
builder.field(TYPES.getPreferredName(), types);
@ -330,12 +331,12 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static class Builder {
private static final int DEFAULT_SCROLL_SIZE = 1000;
private static final long DEFAULT_ELASTICSEARCH_QUERY_DELAY = 60L;
private static final TimeValue DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1);
private String id;
private String jobId;
private Long queryDelay = DEFAULT_ELASTICSEARCH_QUERY_DELAY;
private Long frequency;
private TimeValue queryDelay = DEFAULT_QUERY_DELAY;
private TimeValue frequency;
private List<String> indexes = Collections.emptyList();
private List<String> types = Collections.emptyList();
private QueryBuilder query = QueryBuilders.matchAllQuery();
@ -385,21 +386,13 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.types = ExceptionsHelper.requireNonNull(types, TYPES.getPreferredName());
}
public void setQueryDelay(long queryDelay) {
if (queryDelay < 0) {
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE,
DatafeedConfig.QUERY_DELAY.getPreferredName(), queryDelay);
throw new IllegalArgumentException(msg);
}
public void setQueryDelay(TimeValue queryDelay) {
TimeUtils.checkNonNegativeMultiple(queryDelay, TimeUnit.MILLISECONDS, QUERY_DELAY);
this.queryDelay = queryDelay;
}
public void setFrequency(long frequency) {
if (frequency <= 0) {
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE,
DatafeedConfig.FREQUENCY.getPreferredName(), frequency);
throw new IllegalArgumentException(msg);
}
public void setFrequency(TimeValue frequency) {
TimeUtils.checkPositiveMultiple(frequency, TimeUnit.SECONDS, FREQUENCY);
this.frequency = frequency;
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
@ -90,9 +89,9 @@ public class DatafeedJobRunner extends AbstractComponent {
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
gatherInformation(job.getId(), (buckets, dataCounts) -> {
long latestFinalBucketEndMs = -1L;
Duration bucketSpan = Duration.ofSeconds(job.getAnalysisConfig().getBucketSpan());
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
if (buckets.results().size() == 1) {
latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.toMillis() - 1;
latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.millis() - 1;
}
long latestRecordTimeMs = -1L;
if (dataCounts.getLatestRecordTimeStamp() != null) {
@ -200,7 +199,7 @@ public class DatafeedJobRunner extends AbstractComponent {
Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs,
Consumer<Exception> handler, StartDatafeedAction.DatafeedTask task) {
Duration frequency = getFrequencyOrDefault(datafeed, job);
Duration queryDelay = Duration.ofSeconds(datafeed.getQueryDelay());
Duration queryDelay = Duration.ofMillis(datafeed.getQueryDelay().millis());
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job);
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
@ -249,9 +248,9 @@ public class DatafeedJobRunner extends AbstractComponent {
}
private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) {
Long frequency = datafeed.getFrequency();
Long bucketSpan = job.getAnalysisConfig().getBucketSpan();
return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan) : Duration.ofSeconds(frequency);
TimeValue frequency = datafeed.getFrequency();
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan.seconds()) : Duration.ofSeconds(frequency.seconds());
}
private TimeValue computeNextDelay(long next) {

View File

@ -21,7 +21,7 @@ public final class DatafeedJobValidator {
*/
public static void validate(DatafeedConfig datafeedConfig, Job job) {
AnalysisConfig analysisConfig = job.getAnalysisConfig();
if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) {
if (analysisConfig.getLatency() != null && analysisConfig.getLatency().seconds() > 0) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY));
}
if (datafeedConfig.hasAggregations() && Strings.isNullOrEmpty(analysisConfig.getSummaryCountFieldName())) {

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -41,8 +42,10 @@ public class DatafeedUpdate implements Writeable, ToXContent {
PARSER.declareString(Builder::setJobId, Job.ID);
PARSER.declareStringArray(Builder::setIndexes, DatafeedConfig.INDEXES);
PARSER.declareStringArray(Builder::setTypes, DatafeedConfig.TYPES);
PARSER.declareLong(Builder::setQueryDelay, DatafeedConfig.QUERY_DELAY);
PARSER.declareLong(Builder::setFrequency, DatafeedConfig.FREQUENCY);
PARSER.declareString((builder, val) -> builder.setQueryDelay(
TimeValue.parseTimeValue(val, DatafeedConfig.QUERY_DELAY.getPreferredName())), DatafeedConfig.QUERY_DELAY);
PARSER.declareString((builder, val) -> builder.setFrequency(
TimeValue.parseTimeValue(val, DatafeedConfig.FREQUENCY.getPreferredName())), DatafeedConfig.FREQUENCY);
PARSER.declareObject(Builder::setQuery,
(p, c) -> new QueryParseContext(p).parseInnerQueryBuilder(), DatafeedConfig.QUERY);
PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(new QueryParseContext(p)),
@ -64,8 +67,8 @@ public class DatafeedUpdate implements Writeable, ToXContent {
private final String id;
private final String jobId;
private final Long queryDelay;
private final Long frequency;
private final TimeValue queryDelay;
private final TimeValue frequency;
private final List<String> indexes;
private final List<String> types;
private final QueryBuilder query;
@ -75,7 +78,7 @@ public class DatafeedUpdate implements Writeable, ToXContent {
private final Boolean source;
private final ChunkingConfig chunkingConfig;
private DatafeedUpdate(String id, String jobId, Long queryDelay, Long frequency, List<String> indexes, List<String> types,
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indexes, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, Boolean source, ChunkingConfig chunkingConfig) {
this.id = id;
@ -95,8 +98,8 @@ public class DatafeedUpdate implements Writeable, ToXContent {
public DatafeedUpdate(StreamInput in) throws IOException {
this.id = in.readString();
this.jobId = in.readOptionalString();
this.queryDelay = in.readOptionalLong();
this.frequency = in.readOptionalLong();
this.queryDelay = in.readOptionalWriteable(TimeValue::new);
this.frequency = in.readOptionalWriteable(TimeValue::new);
if (in.readBoolean()) {
this.indexes = in.readList(StreamInput::readString);
} else {
@ -130,8 +133,8 @@ public class DatafeedUpdate implements Writeable, ToXContent {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeOptionalString(jobId);
out.writeOptionalLong(queryDelay);
out.writeOptionalLong(frequency);
out.writeOptionalWriteable(queryDelay);
out.writeOptionalWriteable(frequency);
if (indexes != null) {
out.writeBoolean(true);
out.writeStringList(indexes);
@ -162,8 +165,12 @@ public class DatafeedUpdate implements Writeable, ToXContent {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), id);
addOptionalField(builder, Job.ID, jobId);
addOptionalField(builder, DatafeedConfig.QUERY_DELAY, queryDelay);
addOptionalField(builder, DatafeedConfig.FREQUENCY, frequency);
if (queryDelay != null) {
builder.field(DatafeedConfig.QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
}
if (frequency != null) {
builder.field(DatafeedConfig.FREQUENCY.getPreferredName(), frequency.getStringRep());
}
addOptionalField(builder, DatafeedConfig.INDEXES, indexes);
addOptionalField(builder, DatafeedConfig.TYPES, types);
addOptionalField(builder, DatafeedConfig.QUERY, query);
@ -280,8 +287,8 @@ public class DatafeedUpdate implements Writeable, ToXContent {
private String id;
private String jobId;
private Long queryDelay;
private Long frequency;
private TimeValue queryDelay;
private TimeValue frequency;
private List<String> indexes;
private List<String> types;
private QueryBuilder query;
@ -329,11 +336,11 @@ public class DatafeedUpdate implements Writeable, ToXContent {
this.types = types;
}
public void setQueryDelay(long queryDelay) {
public void setQueryDelay(TimeValue queryDelay) {
this.queryDelay = queryDelay;
}
public void setFrequency(long frequency) {
public void setFrequency(TimeValue frequency) {
this.frequency = frequency;
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.config;
package org.elasticsearch.xpack.ml.datafeed;
import java.time.Duration;

View File

@ -12,9 +12,11 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
import java.util.ArrayList;
@ -24,6 +26,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@ -74,29 +77,34 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Detector.PARSER.apply(p, c).build(), DETECTORS);
PARSER.declareLong(Builder::setBucketSpan, BUCKET_SPAN);
PARSER.declareLong(Builder::setBatchSpan, BATCH_SPAN);
PARSER.declareString((builder, val) ->
builder.setBucketSpan(TimeValue.parseTimeValue(val, BUCKET_SPAN.getPreferredName())), BUCKET_SPAN);
PARSER.declareString((builder, val) ->
builder.setBatchSpan(TimeValue.parseTimeValue(val, BATCH_SPAN.getPreferredName())), BATCH_SPAN);
PARSER.declareString(Builder::setCategorizationFieldName, CATEGORIZATION_FIELD_NAME);
PARSER.declareStringArray(Builder::setCategorizationFilters, CATEGORIZATION_FILTERS);
PARSER.declareLong(Builder::setLatency, LATENCY);
PARSER.declareString((builder, val) ->
builder.setLatency(TimeValue.parseTimeValue(val, LATENCY.getPreferredName())), LATENCY);
PARSER.declareLong(Builder::setPeriod, PERIOD);
PARSER.declareString(Builder::setSummaryCountFieldName, SUMMARY_COUNT_FIELD_NAME);
PARSER.declareStringArray(Builder::setInfluencers, INFLUENCERS);
PARSER.declareBoolean(Builder::setOverlappingBuckets, OVERLAPPING_BUCKETS);
PARSER.declareLong(Builder::setResultFinalizationWindow, RESULT_FINALIZATION_WINDOW);
PARSER.declareBoolean(Builder::setMultivariateByFields, MULTIVARIATE_BY_FIELDS);
PARSER.declareLongArray(Builder::setMultipleBucketSpans, MULTIPLE_BUCKET_SPANS);
PARSER.declareStringArray((builder, values) -> builder.setMultipleBucketSpans(
values.stream().map(v -> TimeValue.parseTimeValue(v, MULTIPLE_BUCKET_SPANS.getPreferredName()))
.collect(Collectors.toList())), MULTIPLE_BUCKET_SPANS);
PARSER.declareBoolean(Builder::setUsePerPartitionNormalization, USER_PER_PARTITION_NORMALIZATION);
}
/**
* These values apply to all detectors
*/
private final long bucketSpan;
private final Long batchSpan;
private final TimeValue bucketSpan;
private final TimeValue batchSpan;
private final String categorizationFieldName;
private final List<String> categorizationFilters;
private final long latency;
private final TimeValue latency;
private final Long period;
private final String summaryCountFieldName;
private final List<Detector> detectors;
@ -104,13 +112,13 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
private final Boolean overlappingBuckets;
private final Long resultFinalizationWindow;
private final Boolean multivariateByFields;
private final List<Long> multipleBucketSpans;
private final List<TimeValue> multipleBucketSpans;
private final boolean usePerPartitionNormalization;
private AnalysisConfig(Long bucketSpan, Long batchSpan, String categorizationFieldName, List<String> categorizationFilters,
long latency, Long period, String summaryCountFieldName, List<Detector> detectors, List<String> influencers,
Boolean overlappingBuckets, Long resultFinalizationWindow, Boolean multivariateByFields,
List<Long> multipleBucketSpans, boolean usePerPartitionNormalization) {
private AnalysisConfig(TimeValue bucketSpan, TimeValue batchSpan, String categorizationFieldName, List<String> categorizationFilters,
TimeValue latency, Long period, String summaryCountFieldName, List<Detector> detectors,
List<String> influencers, Boolean overlappingBuckets, Long resultFinalizationWindow,
Boolean multivariateByFields, List<TimeValue> multipleBucketSpans, boolean usePerPartitionNormalization) {
this.detectors = detectors;
this.bucketSpan = bucketSpan;
this.batchSpan = batchSpan;
@ -128,11 +136,11 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
}
public AnalysisConfig(StreamInput in) throws IOException {
bucketSpan = in.readLong();
batchSpan = in.readOptionalLong();
bucketSpan = new TimeValue(in);
batchSpan = in.readOptionalWriteable(TimeValue::new);
categorizationFieldName = in.readOptionalString();
categorizationFilters = in.readBoolean() ? in.readList(StreamInput::readString) : null;
latency = in.readLong();
latency = in.readOptionalWriteable(TimeValue::new);
period = in.readOptionalLong();
summaryCountFieldName = in.readOptionalString();
detectors = in.readList(Detector::new);
@ -140,14 +148,14 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
overlappingBuckets = in.readOptionalBoolean();
resultFinalizationWindow = in.readOptionalLong();
multivariateByFields = in.readOptionalBoolean();
multipleBucketSpans = in.readBoolean() ? in.readList(StreamInput::readLong) : null;
multipleBucketSpans = in.readBoolean() ? in.readList(TimeValue::new) : null;
usePerPartitionNormalization = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(bucketSpan);
out.writeOptionalLong(batchSpan);
bucketSpan.writeTo(out);
out.writeOptionalWriteable(batchSpan);
out.writeOptionalString(categorizationFieldName);
if (categorizationFilters != null) {
out.writeBoolean(true);
@ -155,7 +163,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
} else {
out.writeBoolean(false);
}
out.writeLong(latency);
out.writeOptionalWriteable(latency);
out.writeOptionalLong(period);
out.writeOptionalString(summaryCountFieldName);
out.writeList(detectors);
@ -165,10 +173,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
out.writeOptionalBoolean(multivariateByFields);
if (multipleBucketSpans != null) {
out.writeBoolean(true);
out.writeVInt(multipleBucketSpans.size());
for (Long bucketSpan : multipleBucketSpans) {
out.writeLong(bucketSpan);
}
out.writeList(multipleBucketSpans);
} else {
out.writeBoolean(false);
}
@ -176,25 +181,20 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
}
/**
* The size of the interval the analysis is aggregated into measured in
* seconds
* The analysis bucket span
*
* @return The bucketspan or <code>null</code> if not set
*/
public Long getBucketSpan() {
return bucketSpan;
}
public long getBucketSpanOrDefault() {
public TimeValue getBucketSpan() {
return bucketSpan;
}
/**
* Interval into which to batch seasonal data measured in seconds
* Interval into which to batch seasonal data
*
* @return The batchspan or <code>null</code> if not set
*/
public Long getBatchSpan() {
public TimeValue getBatchSpan() {
return batchSpan;
}
@ -207,11 +207,11 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
}
/**
* The latency interval (seconds) during which out-of-order records should be handled.
* The latency interval during which out-of-order records should be handled.
*
* @return The latency interval (seconds) or <code>null</code> if not set
* @return The latency interval or <code>null</code> if not set
*/
public Long getLatency() {
public TimeValue getLatency() {
return latency;
}
@ -296,7 +296,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
return multivariateByFields;
}
public List<Long> getMultipleBucketSpans() {
public List<TimeValue> getMultipleBucketSpans() {
return multipleBucketSpans;
}
@ -370,9 +370,9 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan.getStringRep());
if (batchSpan != null) {
builder.field(BATCH_SPAN.getPreferredName(), batchSpan);
builder.field(BATCH_SPAN.getPreferredName(), batchSpan.getStringRep());
}
if (categorizationFieldName != null) {
builder.field(CATEGORIZATION_FIELD_NAME.getPreferredName(), categorizationFieldName);
@ -380,7 +380,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
if (categorizationFilters != null) {
builder.field(CATEGORIZATION_FILTERS.getPreferredName(), categorizationFilters);
}
builder.field(LATENCY.getPreferredName(), latency);
builder.field(LATENCY.getPreferredName(), latency.getStringRep());
if (period != null) {
builder.field(PERIOD.getPreferredName(), period);
}
@ -399,7 +399,8 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
builder.field(MULTIVARIATE_BY_FIELDS.getPreferredName(), multivariateByFields);
}
if (multipleBucketSpans != null) {
builder.field(MULTIPLE_BUCKET_SPANS.getPreferredName(), multipleBucketSpans);
builder.field(MULTIPLE_BUCKET_SPANS.getPreferredName(),
multipleBucketSpans.stream().map(s -> s.getStringRep()).collect(Collectors.toList()));
}
builder.field(USER_PER_PARTITION_NORMALIZATION.getPreferredName(), usePerPartitionNormalization);
builder.endObject();
@ -411,7 +412,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AnalysisConfig that = (AnalysisConfig) o;
return latency == that.latency &&
return Objects.equals(latency, that.latency) &&
usePerPartitionNormalization == that.usePerPartitionNormalization &&
Objects.equals(bucketSpan, that.bucketSpan) &&
Objects.equals(batchSpan, that.batchSpan) &&
@ -438,12 +439,12 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
public static class Builder {
public static final long DEFAULT_BUCKET_SPAN = 300L;
public static final TimeValue DEFAULT_BUCKET_SPAN = TimeValue.timeValueMinutes(5);
private List<Detector> detectors;
private long bucketSpan = DEFAULT_BUCKET_SPAN;
private Long batchSpan;
private long latency = 0L;
private TimeValue bucketSpan = DEFAULT_BUCKET_SPAN;
private TimeValue batchSpan;
private TimeValue latency = TimeValue.ZERO;
private Long period;
private String categorizationFieldName;
private List<String> categorizationFilters;
@ -452,7 +453,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
private Boolean overlappingBuckets;
private Long resultFinalizationWindow;
private Boolean multivariateByFields;
private List<Long> multipleBucketSpans;
private List<TimeValue> multipleBucketSpans;
private boolean usePerPartitionNormalization = false;
public Builder(List<Detector> detectors) {
@ -480,15 +481,15 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
this.detectors = detectors;
}
public void setBucketSpan(long bucketSpan) {
public void setBucketSpan(TimeValue bucketSpan) {
this.bucketSpan = bucketSpan;
}
public void setBatchSpan(long batchSpan) {
public void setBatchSpan(TimeValue batchSpan) {
this.batchSpan = batchSpan;
}
public void setLatency(long latency) {
public void setLatency(TimeValue latency) {
this.latency = latency;
}
@ -524,7 +525,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
this.multivariateByFields = multivariateByFields;
}
public void setMultipleBucketSpans(List<Long> multipleBucketSpans) {
public void setMultipleBucketSpans(List<TimeValue> multipleBucketSpans) {
this.multipleBucketSpans = multipleBucketSpans;
}
@ -547,9 +548,11 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
* </ol>
*/
public AnalysisConfig build() {
checkFieldIsNotNegativeIfSpecified(BUCKET_SPAN.getPreferredName(), bucketSpan);
checkFieldIsNotNegativeIfSpecified(BATCH_SPAN.getPreferredName(), batchSpan);
checkFieldIsNotNegativeIfSpecified(LATENCY.getPreferredName(), latency);
TimeUtils.checkPositiveMultiple(bucketSpan, TimeUnit.SECONDS, BUCKET_SPAN);
if (batchSpan != null) {
TimeUtils.checkPositiveMultiple(batchSpan, TimeUnit.SECONDS, BATCH_SPAN);
}
TimeUtils.checkNonNegativeMultiple(latency, TimeUnit.SECONDS, LATENCY);
checkFieldIsNotNegativeIfSpecified(PERIOD.getPreferredName(), period);
verifyDetectorAreDefined(detectors);
@ -557,7 +560,8 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
verifyFieldName(categorizationFieldName);
verifyCategorizationFilters(categorizationFilters, categorizationFieldName);
verifyMultipleBucketSpans(multipleBucketSpans, bucketSpan);
checkFieldIsNotNegativeIfSpecified(RESULT_FINALIZATION_WINDOW.getPreferredName(), resultFinalizationWindow);
verifyMultipleBucketSpans();
overlappingBuckets = verifyOverlappingBucketsConfig(overlappingBuckets, detectors);
@ -623,16 +627,13 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
}
}
private static void verifyMultipleBucketSpans(List<Long> multipleBucketSpans, Long bucketSpan) {
private void verifyMultipleBucketSpans() {
if (multipleBucketSpans == null) {
return;
}
if (bucketSpan == null) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MULTIPLE_BUCKETSPANS_REQUIRE_BUCKETSPAN));
}
for (Long span : multipleBucketSpans) {
if ((span % bucketSpan != 0L) || (span <= bucketSpan)) {
for (TimeValue span : multipleBucketSpans) {
if ((span.getSeconds() % bucketSpan.getSeconds() != 0L) || (span.compareTo(bucketSpan) <= 0)) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE, span, bucketSpan));
}

View File

@ -11,14 +11,15 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -30,6 +31,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
/**
* This class represents a configured and created Job. The creation time is set
@ -72,7 +74,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", Builder::new);
public static final int MAX_JOB_ID_LENGTH = 64;
public static final long MIN_BACKGROUND_PERSIST_INTERVAL = 3600;
public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1);
static {
PARSER.declareString(Builder::setId, ID);
@ -108,7 +110,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
PARSER.declareObject(Builder::setDataDescription, DataDescription.PARSER, DATA_DESCRIPTION);
PARSER.declareObject(Builder::setModelDebugConfig, ModelDebugConfig.PARSER, MODEL_DEBUG_CONFIG);
PARSER.declareLong(Builder::setRenormalizationWindowDays, RENORMALIZATION_WINDOW_DAYS);
PARSER.declareLong(Builder::setBackgroundPersistInterval, BACKGROUND_PERSIST_INTERVAL);
PARSER.declareString((builder, val) -> builder.setBackgroundPersistInterval(
TimeValue.parseTimeValue(val, BACKGROUND_PERSIST_INTERVAL.getPreferredName())), BACKGROUND_PERSIST_INTERVAL);
PARSER.declareLong(Builder::setResultsRetentionDays, RESULTS_RETENTION_DAYS);
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT);
@ -128,7 +131,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
private final DataDescription dataDescription;
private final ModelDebugConfig modelDebugConfig;
private final Long renormalizationWindowDays;
private final Long backgroundPersistInterval;
private final TimeValue backgroundPersistInterval;
private final Long modelSnapshotRetentionDays;
private final Long resultsRetentionDays;
private final Map<String, Object> customSettings;
@ -138,7 +141,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
private Job(String jobId, String description, Date createTime, Date finishedTime, Date lastDataTime,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelDebugConfig modelDebugConfig, Long renormalizationWindowDays, Long backgroundPersistInterval,
ModelDebugConfig modelDebugConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, String resultsIndexName, boolean deleted) {
@ -172,7 +175,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
dataDescription = in.readOptionalWriteable(DataDescription::new);
modelDebugConfig = in.readOptionalWriteable(ModelDebugConfig::new);
renormalizationWindowDays = in.readOptionalLong();
backgroundPersistInterval = in.readOptionalLong();
backgroundPersistInterval = in.readOptionalWriteable(TimeValue::new);
modelSnapshotRetentionDays = in.readOptionalLong();
resultsRetentionDays = in.readOptionalLong();
customSettings = in.readMap();
@ -299,11 +302,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
}
/**
* The background persistence interval in seconds
* The background persistence interval
*
* @return background persistence interval in seconds
* @return background persistence interval
*/
public Long getBackgroundPersistInterval() {
public TimeValue getBackgroundPersistInterval() {
return backgroundPersistInterval;
}
@ -377,7 +380,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
out.writeOptionalWriteable(dataDescription);
out.writeOptionalWriteable(modelDebugConfig);
out.writeOptionalLong(renormalizationWindowDays);
out.writeOptionalLong(backgroundPersistInterval);
out.writeOptionalWriteable(backgroundPersistInterval);
out.writeOptionalLong(modelSnapshotRetentionDays);
out.writeOptionalLong(resultsRetentionDays);
out.writeMap(customSettings);
@ -424,7 +427,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
builder.field(RENORMALIZATION_WINDOW_DAYS.getPreferredName(), renormalizationWindowDays);
}
if (backgroundPersistInterval != null) {
builder.field(BACKGROUND_PERSIST_INTERVAL.getPreferredName(), backgroundPersistInterval);
builder.field(BACKGROUND_PERSIST_INTERVAL.getPreferredName(), backgroundPersistInterval.getStringRep());
}
if (modelSnapshotRetentionDays != null) {
builder.field(MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays);
@ -506,7 +509,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
private Date lastDataTime;
private ModelDebugConfig modelDebugConfig;
private Long renormalizationWindowDays;
private Long backgroundPersistInterval;
private TimeValue backgroundPersistInterval;
private Long modelSnapshotRetentionDays;
private Long resultsRetentionDays;
private Map<String, Object> customSettings;
@ -612,7 +615,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
return this;
}
public Builder setBackgroundPersistInterval(Long backgroundPersistInterval) {
public Builder setBackgroundPersistInterval(TimeValue backgroundPersistInterval) {
this.backgroundPersistInterval = backgroundPersistInterval;
return this;
}
@ -663,8 +666,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MISSING_ANALYSISCONFIG));
}
checkValidBackgroundPersistInterval();
checkValueNotLessThan(0, "renormalizationWindowDays", renormalizationWindowDays);
checkValueNotLessThan(MIN_BACKGROUND_PERSIST_INTERVAL, "backgroundPersistInterval", backgroundPersistInterval);
checkValueNotLessThan(0, "modelSnapshotRetentionDays", modelSnapshotRetentionDays);
checkValueNotLessThan(0, "resultsRetentionDays", resultsRetentionDays);
@ -694,5 +697,13 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
resultsIndexName, deleted);
}
private void checkValidBackgroundPersistInterval() {
if (backgroundPersistInterval != null) {
TimeUtils.checkMultiple(backgroundPersistInterval, TimeUnit.SECONDS, BACKGROUND_PERSIST_INTERVAL);
checkValueNotLessThan(MIN_BACKGROUND_PERSIST_INTERVAL.getSeconds(), BACKGROUND_PERSIST_INTERVAL.getPreferredName(),
backgroundPersistInterval.getSeconds());
}
}
}
}

View File

@ -5,11 +5,12 @@
*/
package org.elasticsearch.xpack.ml.job.config;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
@ -30,7 +31,8 @@ public class JobUpdate implements Writeable, ToXContent {
PARSER.declareObjectArray(Builder::setDetectorUpdates, DetectorUpdate.PARSER, DETECTORS);
PARSER.declareObject(Builder::setModelDebugConfig, ModelDebugConfig.PARSER, Job.MODEL_DEBUG_CONFIG);
PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, Job.ANALYSIS_LIMITS);
PARSER.declareLong(Builder::setBackgroundPersistInterval, Job.BACKGROUND_PERSIST_INTERVAL);
PARSER.declareString((builder, val) -> builder.setBackgroundPersistInterval(
TimeValue.parseTimeValue(val, Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName())), Job.BACKGROUND_PERSIST_INTERVAL);
PARSER.declareLong(Builder::setRenormalizationWindowDays, Job.RENORMALIZATION_WINDOW_DAYS);
PARSER.declareLong(Builder::setResultsRetentionDays, Job.RESULTS_RETENTION_DAYS);
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
@ -44,7 +46,7 @@ public class JobUpdate implements Writeable, ToXContent {
private final ModelDebugConfig modelDebugConfig;
private final AnalysisLimits analysisLimits;
private final Long renormalizationWindowDays;
private final Long backgroundPersistInterval;
private final TimeValue backgroundPersistInterval;
private final Long modelSnapshotRetentionDays;
private final Long resultsRetentionDays;
private final List<String> categorizationFilters;
@ -53,7 +55,7 @@ public class JobUpdate implements Writeable, ToXContent {
private JobUpdate(@Nullable String description, @Nullable List<DetectorUpdate> detectorUpdates,
@Nullable ModelDebugConfig modelDebugConfig, @Nullable AnalysisLimits analysisLimits,
@Nullable Long backgroundPersistInterval, @Nullable Long renormalizationWindowDays,
@Nullable TimeValue backgroundPersistInterval, @Nullable Long renormalizationWindowDays,
@Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays,
@Nullable List<String> categorisationFilters, @Nullable Map<String, Object> customSettings,
@Nullable String modelSnapshotId) {
@ -80,7 +82,7 @@ public class JobUpdate implements Writeable, ToXContent {
modelDebugConfig = in.readOptionalWriteable(ModelDebugConfig::new);
analysisLimits = in.readOptionalWriteable(AnalysisLimits::new);
renormalizationWindowDays = in.readOptionalLong();
backgroundPersistInterval = in.readOptionalLong();
backgroundPersistInterval = in.readOptionalWriteable(TimeValue::new);
modelSnapshotRetentionDays = in.readOptionalLong();
resultsRetentionDays = in.readOptionalLong();
if (in.readBoolean()) {
@ -101,7 +103,7 @@ public class JobUpdate implements Writeable, ToXContent {
out.writeOptionalWriteable(modelDebugConfig);
out.writeOptionalWriteable(analysisLimits);
out.writeOptionalLong(renormalizationWindowDays);
out.writeOptionalLong(backgroundPersistInterval);
out.writeOptionalWriteable(backgroundPersistInterval);
out.writeOptionalLong(modelSnapshotRetentionDays);
out.writeOptionalLong(resultsRetentionDays);
out.writeBoolean(categorizationFilters != null);
@ -132,7 +134,7 @@ public class JobUpdate implements Writeable, ToXContent {
return renormalizationWindowDays;
}
public Long getBackgroundPersistInterval() {
public TimeValue getBackgroundPersistInterval() {
return backgroundPersistInterval;
}
@ -396,7 +398,7 @@ public class JobUpdate implements Writeable, ToXContent {
private ModelDebugConfig modelDebugConfig;
private AnalysisLimits analysisLimits;
private Long renormalizationWindowDays;
private Long backgroundPersistInterval;
private TimeValue backgroundPersistInterval;
private Long modelSnapshotRetentionDays;
private Long resultsRetentionDays;
private List<String> categorizationFilters;
@ -430,7 +432,7 @@ public class JobUpdate implements Writeable, ToXContent {
return this;
}
public Builder setBackgroundPersistInterval(Long backgroundPersistInterval) {
public Builder setBackgroundPersistInterval(TimeValue backgroundPersistInterval) {
this.backgroundPersistInterval = backgroundPersistInterval;
return this;
}

View File

@ -97,8 +97,6 @@ public final class Messages {
public static final String JOB_CONFIG_MISSING_ANALYSISCONFIG = "An analysis_config must be set";
public static final String JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE =
"Multiple bucket_span ''{0}'' must be a multiple of the main bucket_span ''{1}''";
public static final String JOB_CONFIG_MULTIPLE_BUCKETSPANS_REQUIRE_BUCKETSPAN =
"Multiple bucket_spans require a bucket_span to be specified";
public static final String JOB_CONFIG_NO_ANALYSIS_FIELD_NOT_COUNT =
"Unless the function is 'count' one of field_name, by_field_name or over_field_name must be set";
public static final String JOB_CONFIG_NO_DETECTORS = "No detectors configured";

View File

@ -53,8 +53,8 @@ public class DataStreamDiagnostics {
public DataStreamDiagnostics(DataCountsReporter dataCountsReporter, AnalysisConfig analysisConfig, Logger logger) {
this.dataCountsReporter = dataCountsReporter;
this.logger = logger;
bucketSpan = analysisConfig.getBucketSpanOrDefault();
latency = analysisConfig.getLatency();
bucketSpan = analysisConfig.getBucketSpan().seconds();
latency = analysisConfig.getLatency().seconds();
}
/**

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
@ -25,6 +26,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
/**
@ -208,7 +210,7 @@ public class ProcessCtrl {
// Persist model state every few hours even if the job isn't closed
long persistInterval = (job.getBackgroundPersistInterval() == null) ?
(DEFAULT_BASE_PERSIST_INTERVAL + intervalStagger) :
job.getBackgroundPersistInterval();
job.getBackgroundPersistInterval().getSeconds();
command.add(PERSIST_INTERVAL_ARG + persistInterval);
}
@ -234,6 +236,16 @@ public class ProcessCtrl {
return useDefault ? DataDescription.DEFAULT_TIME_FIELD : dataDescription.getTimeField();
}
private static void addIfNotNull(TimeValue timeValue, String argKey, List<String> command) {
addIfNotNull(timeValue == null ? null : timeValue.getSeconds(), argKey, command);
}
private static void addIfNotNull(List<TimeValue> timeValues, String argKey, List<String> command) {
if (timeValues != null) {
addIfNotNull(timeValues.stream().map(v -> v.getSeconds()).collect(Collectors.toList()), argKey, command);
}
}
private static <T> void addIfNotNull(T object, String argKey, List<String> command) {
if (object != null) {
String param = argKey + object;

View File

@ -8,9 +8,9 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.DataStreamDiagnostics;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import java.io.IOException;
import java.util.ArrayList;
@ -42,6 +42,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
private final Logger logger;
private final DateTransformer dateTransformer;
private final DataStreamDiagnostics diagnostics;
private long latencySeconds;
protected Map<String, Integer> inFieldIndexes;
protected List<InputOutputMap> inputOutputMap;
@ -60,6 +61,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter);
this.logger = Objects.requireNonNull(logger);
this.diagnostics = new DataStreamDiagnostics(this.dataCountsReporter, this.analysisConfig, this.logger);
this.latencySeconds = analysisConfig.getLatency().seconds();
Date date = dataCountsReporter.getLatestRecordTime();
latestEpochMsThisUpload = 0;
@ -141,7 +143,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
record[TIME_FIELD_OUT_INDEX] = Long.toString(epochMs / MS_IN_SECOND);
// Records have epoch seconds timestamp so compare for out of order in seconds
if (epochMs / MS_IN_SECOND < latestEpochMs / MS_IN_SECOND - analysisConfig.getLatency()) {
if (epochMs / MS_IN_SECOND < latestEpochMs / MS_IN_SECOND - latencySeconds) {
// out of order
dataCountsReporter.reportOutOfOrderRecord(numberOfFieldsRead);

View File

@ -59,20 +59,11 @@ public class ScoresUpdater {
this.jobProvider = Objects.requireNonNull(jobProvider);
updatesPersister = Objects.requireNonNull(jobRenormalizedResultsPersister);
this.normalizerFactory = Objects.requireNonNull(normalizerFactory);
bucketSpan = getBucketSpanOrDefault(job.getAnalysisConfig());
bucketSpan = ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue();
normalizationWindow = getNormalizationWindowOrDefault(job);
perPartitionNormalization = getPerPartitionNormalizationOrDefault(job.getAnalysisConfig());
}
private static int getBucketSpanOrDefault(AnalysisConfig analysisConfig) {
if (analysisConfig != null && analysisConfig.getBucketSpan() != null) {
return analysisConfig.getBucketSpan().intValue();
}
// A bucketSpan value of 0 will result to the default
// bucketSpan value being used in the back-end.
return 0;
}
private long getNormalizationWindowOrDefault(Job job) {
if (job.getRenormalizationWindowDays() != null) {
return job.getRenormalizationWindowDays() * SECONDS_IN_DAY * MILLISECONDS_IN_SECOND;

View File

@ -5,7 +5,13 @@
*/
package org.elasticsearch.xpack.ml.utils.time;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
public final class TimeUtils {
private TimeUtils() {
@ -43,4 +49,58 @@ public final class TimeUtils {
// Could not do the conversion
return -1;
}
/**
* Checks that the given {@code timeValue} is a non-negative multiple value of the {@code baseUnit}.
*
* <ul>
* <li>400ms is valid for base unit of seconds</li>
* <li>450ms is invalid for base unit of seconds but valid for base unit of milliseconds</li>
* </ul>
*/
public static void checkNonNegativeMultiple(TimeValue timeValue, TimeUnit baseUnit, ParseField field) {
checkNonNegative(timeValue, field);
checkMultiple(timeValue, baseUnit, field);
}
/**
* Checks that the given {@code timeValue} is a positive multiple value of the {@code baseUnit}.
*
* <ul>
* <li>400ms is valid for base unit of seconds</li>
* <li>450ms is invalid for base unit of seconds but valid for base unit of milliseconds</li>
* </ul>
*/
public static void checkPositiveMultiple(TimeValue timeValue, TimeUnit baseUnit, ParseField field) {
checkPositive(timeValue, field);
checkMultiple(timeValue, baseUnit, field);
}
private static void checkNonNegative(TimeValue timeValue, ParseField field) {
long nanos = timeValue.getNanos();
if (nanos < 0) {
throw new IllegalArgumentException(field.getPreferredName() + " cannot be less than 0. Value = " + timeValue.toString());
}
}
private static void checkPositive(TimeValue timeValue, ParseField field) {
long nanos = timeValue.getNanos();
if (nanos <= 0) {
throw new IllegalArgumentException(field.getPreferredName() + " cannot be less or equal than 0. Value = "
+ timeValue.toString());
}
}
/**
* Check the given {@code timeValue} is a multiple of the {@code baseUnit}
*/
public static void checkMultiple(TimeValue timeValue, TimeUnit baseUnit, ParseField field) {
long nanos = timeValue.getNanos();
TimeValue base = new TimeValue(1, baseUnit);
long baseNanos = base.getNanos();
if (nanos % baseNanos != 0) {
throw new IllegalArgumentException(field.getPreferredName() + " has to be a multiple of " + base.toString() + "; actual was '"
+ timeValue.toString() + "'");
}
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -49,14 +50,13 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
for (int i = 0; i < numJobs; i++) {
Job job = JobTests.createRandomizedJob();
if (randomBoolean()) {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig());
analysisConfig.setLatency(TimeValue.ZERO);
DatafeedConfig datafeedConfig = DatafeedConfigTests.createRandomizedDatafeedConfig(job.getId());
if (datafeedConfig.hasAggregations()) {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig().getDetectors());
analysisConfig.setSummaryCountFieldName("doc_count");
Job.Builder jobBuilder = new Job.Builder(job);
jobBuilder.setAnalysisConfig(analysisConfig);
job = jobBuilder.build();
}
job = new Job.Builder(job).setAnalysisConfig(analysisConfig).build();
builder.putJob(job, false);
builder.putDatafeed(datafeedConfig);
} else {
@ -226,7 +226,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
public void testPutDatafeed_failBecauseJobIsNotCompatibleForDatafeed() {
Job.Builder job1 = createDatafeedJob();
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job1.build().getAnalysisConfig());
analysisConfig.setLatency(3600L);
analysisConfig.setLatency(TimeValue.timeValueHours(1));
job1.setAnalysisConfig(analysisConfig);
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();

View File

@ -5,11 +5,6 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.action.GetDatafeedsAction.Response;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
@ -17,57 +12,22 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class GetDatafeedsActionResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
final Response result;
int listSize = randomInt(10);
List<DatafeedConfig> datafeedList = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
String datafeedId = DatafeedConfigTests.randomValidDatafeedId();
String jobId = randomAsciiOfLength(10);
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, jobId);
datafeedConfig.setIndexes(randomSubsetOf(2, Arrays.asList("index-1", "index-2", "index-3")));
datafeedConfig.setTypes(randomSubsetOf(2, Arrays.asList("type-1", "type-2", "type-3")));
datafeedConfig.setFrequency(randomNonNegativeLong());
datafeedConfig.setQueryDelay(randomNonNegativeLong());
if (randomBoolean()) {
datafeedConfig.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
int scriptsSize = randomInt(3);
if (randomBoolean()) {
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)),
randomBoolean()));
}
datafeedConfig.setScriptFields(scriptFields);
}
if (randomBoolean()) {
datafeedConfig.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean() && scriptsSize == 0) {
AggregatorFactories.Builder aggsBuilder = new AggregatorFactories.Builder();
aggsBuilder.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)));
datafeedConfig.setAggregations(aggsBuilder);
}
datafeedList.add(datafeedConfig.build());
datafeedList.add(DatafeedConfigTests.createRandomizedDatafeedConfig(randomAsciiOfLength(10)));
}
result = new Response(new QueryPage<>(datafeedList, datafeedList.size(), DatafeedConfig.RESULTS_FIELD));
return result;
return new Response(new QueryPage<>(datafeedList, datafeedList.size(), DatafeedConfig.RESULTS_FIELD));
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -7,76 +7,14 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction.Request;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
public class ValidateJobConfigActionRequestTests extends AbstractStreamableXContentTestCase<ValidateJobConfigAction.Request> {
@Override
protected Request createTestInstance() {
List<Detector> detectors = new ArrayList<>();
detectors.add(new Detector.Builder(randomFrom(Detector.FIELD_NAME_FUNCTIONS), randomAsciiOfLengthBetween(1, 20)).build());
detectors.add(new Detector.Builder(randomFrom(Detector.COUNT_WITHOUT_FIELD_FUNCTIONS), null).build());
AnalysisConfig.Builder analysisConfigBuilder = new AnalysisConfig.Builder(detectors);
analysisConfigBuilder.setBucketSpan(randomIntBetween(60, 86400));
if (randomBoolean()) {
analysisConfigBuilder.setLatency(randomIntBetween(0, 12));
}
if (randomBoolean()) {
analysisConfigBuilder.setCategorizationFieldName(randomAsciiOfLengthBetween(1, 20));
}
if (randomBoolean()) {
analysisConfigBuilder.setSummaryCountFieldName(randomAsciiOfLengthBetween(1, 20));
}
if (randomBoolean()) {
List<String> influencers = new ArrayList<>();
for (int i = randomIntBetween(1, 5); i > 0; --i) {
influencers.add(randomAsciiOfLengthBetween(1, 20));
}
analysisConfigBuilder.setInfluencers(influencers);
}
if (randomBoolean()) {
analysisConfigBuilder.setOverlappingBuckets(randomBoolean());
}
if (randomBoolean()) {
analysisConfigBuilder.setMultivariateByFields(randomBoolean());
}
Job.Builder job = new Job.Builder("ok");
job.setAnalysisConfig(analysisConfigBuilder);
if (randomBoolean()) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
if (randomBoolean()) {
dataDescription.setFormat(DataDescription.DataFormat.DELIMITED);
if (randomBoolean()) {
dataDescription.setFieldDelimiter(new Character(';'));
}
if (randomBoolean()) {
dataDescription.setQuoteCharacter(new Character('\''));
}
} else {
dataDescription.setFormat(DataDescription.DataFormat.JSON);
}
dataDescription.setTimeField(randomAsciiOfLengthBetween(1, 20));
if (randomBoolean()) {
dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ssX");
}
job.setDataDescription(dataDescription);
}
if (randomBoolean()) {
job.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong()));
}
job.setCreateTime(new Date());
job.setId(randomAsciiOfLength(5).replace(".", "-").replace("_", "-").toLowerCase(Locale.ENGLISH));
return new Request(job.build());
return new Request(JobTests.createRandomizedJob());
}
@Override
@ -88,5 +26,4 @@ public class ValidateJobConfigActionRequestTests extends AbstractStreamableXCont
protected Request parseInstance(XContentParser parser) {
return Request.parseRequest(parser);
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
@ -62,10 +63,10 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
builder.setFrequency(randomNonNegativeLong());
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
}
if (randomBoolean()) {
builder.setQueryDelay(randomNonNegativeLong());
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, 1_000_000)));
}
if (randomBoolean()) {
builder.setSource(randomBoolean());
@ -107,7 +108,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
DatafeedConfig.Builder expectedDatafeedConfig = new DatafeedConfig.Builder("datafeed1", "job1");
expectedDatafeedConfig.setIndexes(Arrays.asList("index"));
expectedDatafeedConfig.setTypes(Arrays.asList("type"));
expectedDatafeedConfig.setQueryDelay(60L);
expectedDatafeedConfig.setQueryDelay(TimeValue.timeValueMinutes(1));
expectedDatafeedConfig.setScrollSize(1000);
DatafeedConfig.Builder defaultedDatafeedConfig = new DatafeedConfig.Builder("datafeed1", "job1");
defaultedDatafeedConfig.setIndexes(Arrays.asList("index"));
@ -119,7 +120,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
public void testEquals_GivenDifferentQueryDelay() {
DatafeedConfig.Builder b1 = createFullyPopulated();
DatafeedConfig.Builder b2 = createFullyPopulated();
b2.setQueryDelay(120L);
b2.setQueryDelay(TimeValue.timeValueMinutes(2));
DatafeedConfig sc1 = b1.build();
DatafeedConfig sc2 = b2.build();
@ -141,7 +142,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
public void testEquals_GivenDifferentFrequency() {
DatafeedConfig.Builder b1 = createFullyPopulated();
DatafeedConfig.Builder b2 = createFullyPopulated();
b2.setFrequency(120L);
b2.setFrequency(TimeValue.timeValueSeconds(90));
DatafeedConfig sc1 = b1.build();
DatafeedConfig sc2 = b2.build();
@ -191,11 +192,11 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
DatafeedConfig.Builder sc = new DatafeedConfig.Builder("datafeed1", "job1");
sc.setIndexes(Arrays.asList("myIndex"));
sc.setTypes(Arrays.asList("myType1", "myType2"));
sc.setFrequency(60L);
sc.setFrequency(TimeValue.timeValueSeconds(60));
sc.setScrollSize(5000);
sc.setQuery(QueryBuilders.matchAllQuery());
sc.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
sc.setQueryDelay(90L);
sc.setQueryDelay(TimeValue.timeValueMillis(900));
return sc;
}
@ -233,20 +234,22 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
public void testCheckValid_GivenNegativeQueryDelay() throws IOException {
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setQueryDelay(-10L));
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "query_delay", -10L), e.getMessage());
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
() -> conf.setQueryDelay(TimeValue.timeValueMillis(-10)));
assertEquals("query_delay cannot be less than 0. Value = -10", e.getMessage());
}
public void testCheckValid_GivenZeroFrequency() throws IOException {
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(0L));
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "frequency", 0L), e.getMessage());
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.ZERO));
assertEquals("frequency cannot be less or equal than 0. Value = 0s", e.getMessage());
}
public void testCheckValid_GivenNegativeFrequency() throws IOException {
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(-600L));
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "frequency", -600L), e.getMessage());
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
() -> conf.setFrequency(TimeValue.timeValueMinutes(-1)));
assertEquals("frequency cannot be less or equal than 0. Value = -1", e.getMessage());
}
public void testCheckValid_GivenNegativeScrollSize() throws IOException {

View File

@ -294,7 +294,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
public static Job.Builder createDatafeedJob() {
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(3600L);
acBuilder.setBucketSpan(TimeValue.timeValueHours(1));
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
Job.Builder builder = new Job.Builder("job_id");

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase;
@ -24,8 +25,8 @@ public class DatafeedJobValidatorTests extends ESTestCase {
String errorMessage = Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY);
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
ac.setLatency(3600L);
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
ac.setLatency(TimeValue.timeValueSeconds(3600));
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfig().build();
@ -39,8 +40,8 @@ public class DatafeedJobValidatorTests extends ESTestCase {
public void testVerify_GivenZeroLatency() {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
ac.setLatency(0L);
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
ac.setLatency(TimeValue.ZERO);
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfig().build();
@ -51,8 +52,8 @@ public class DatafeedJobValidatorTests extends ESTestCase {
public void testVerify_GivenNoLatency() {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBatchSpan(1800L);
ac.setBucketSpan(100L);
ac.setBatchSpan(TimeValue.timeValueSeconds(1800));
ac.setBucketSpan(TimeValue.timeValueSeconds(100));
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfig().build();
@ -66,7 +67,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setSummaryCountFieldName(null);
ac.setBucketSpan(1800L);
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build();
@ -83,7 +84,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setSummaryCountFieldName("");
ac.setBucketSpan(1800L);
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build();
@ -97,8 +98,8 @@ public class DatafeedJobValidatorTests extends ESTestCase {
public void testVerify_GivenAggsAndSummaryCountField() throws IOException {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
ac.setSummaryCountFieldName("some_count");
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build();

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
@ -34,10 +35,10 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
builder.setJobId(randomAsciiOfLength(10));
}
if (randomBoolean()) {
builder.setQueryDelay(randomNonNegativeLong());
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE)));
}
if (randomBoolean()) {
builder.setFrequency(randomNonNegativeLong());
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, Integer.MAX_VALUE)));
}
if (randomBoolean()) {
builder.setIndexes(DatafeedConfigTests.randomStringList(1, 10));
@ -122,8 +123,8 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
update.setJobId("bar");
update.setIndexes(Arrays.asList("i_2"));
update.setTypes(Arrays.asList("t_2"));
update.setQueryDelay(42L);
update.setFrequency(142L);
update.setQueryDelay(TimeValue.timeValueSeconds(42));
update.setFrequency(TimeValue.timeValueSeconds(142));
update.setQuery(QueryBuilders.termQuery("a", "b"));
update.setScriptFields(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false)));
update.setScrollSize(8000);
@ -135,8 +136,8 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
assertThat(updatedDatafeed.getJobId(), equalTo("bar"));
assertThat(updatedDatafeed.getIndexes(), equalTo(Arrays.asList("i_2")));
assertThat(updatedDatafeed.getTypes(), equalTo(Arrays.asList("t_2")));
assertThat(updatedDatafeed.getQueryDelay(), equalTo(42L));
assertThat(updatedDatafeed.getFrequency(), equalTo(142L));
assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42)));
assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142)));
assertThat(updatedDatafeed.getQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
assertThat(updatedDatafeed.hasAggregations(), is(false));
assertThat(updatedDatafeed.getScriptFields(),

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.config;
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.test.ESTestCase;

View File

@ -10,6 +10,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
@ -86,7 +87,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*"));
configBuilder.setFrequency(120);
configBuilder.setFrequency(TimeValue.timeValueMinutes(2));
DatafeedConfig config = configBuilder.build();
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();

View File

@ -231,7 +231,8 @@ public class DatafeedJobIT extends ESRestTestCase {
public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception {
String jobId = "aggs-histogram-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":3600,\"summary_count_field_name\":\"doc_count\","
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"1h\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
@ -256,7 +257,8 @@ public class DatafeedJobIT extends ESRestTestCase {
public void testLookbackOnlyGivenAggregationsWithDateHistogram() throws Exception {
String jobId = "aggs-date-histogram-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":3600,\"summary_count_field_name\":\"doc_count\","
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"3600s\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
@ -419,7 +421,7 @@ public class DatafeedJobIT extends ESRestTestCase {
private Response createJob(String id) throws Exception {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":\"1h\",\n"
+ " \"detectors\" :[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
+ " },\n" + " \"data_description\" : {\n" + " \"format\":\"JSON\",\n"
+ " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n"
@ -440,7 +442,7 @@ public class DatafeedJobIT extends ESRestTestCase {
}
private void executeTestLookbackOnlyWithNestedFields(String jobId, boolean source) throws Exception {
String job = "{\"description\":\"Nested job\", \"analysis_config\" : {\"bucket_span\":3600,\"detectors\" :"
String job = "{\"description\":\"Nested job\", \"analysis_config\" : {\"bucket_span\":\"1h\",\"detectors\" :"
+ "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),

View File

@ -219,7 +219,7 @@ public class DeleteExpiredDataIT extends SecurityIntegTestCase {
Detector.Builder detector = new Detector.Builder();
detector.setFunction("count");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
analysisConfig.setBucketSpan(3600L);
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = new Job.Builder(id);

View File

@ -138,7 +138,7 @@ public class MlJobIT extends ESRestTestCase {
private Response createFarequoteJob(String jobId) throws Exception {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\": \"3600s\",\n"
+ " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
+ " },\n" + " \"data_description\" : {\n" + " \"field_delimiter\":\",\",\n" + " " +
"\"time_field\":\"time\",\n"

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.config;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -19,11 +20,14 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisConfig> {
@Override
protected AnalysisConfig createTestInstance() {
return createRandomized().build();
}
public static AnalysisConfig.Builder createRandomized() {
List<Detector> detectors = new ArrayList<>();
int numDetectors = randomIntBetween(1, 10);
for (int i = 0; i < numDetectors; i++) {
@ -31,13 +35,12 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
}
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(detectors);
if (randomBoolean()) {
builder.setBatchSpan(randomNonNegativeLong());
builder.setBatchSpan(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
}
long bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN;
TimeValue bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN;
if (randomBoolean()) {
bucketSpan = randomIntBetween(1, 1_000_000);
bucketSpan = TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000));
builder.setBucketSpan(bucketSpan);
}
if (randomBoolean()) {
@ -48,13 +51,13 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
builder.setInfluencers(Arrays.asList(generateRandomStringArray(10, 10, false)));
}
if (randomBoolean()) {
builder.setLatency(randomNonNegativeLong());
builder.setLatency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
}
if (randomBoolean()) {
int numBucketSpans = randomIntBetween(0, 10);
List<Long> multipleBucketSpans = new ArrayList<>();
List<TimeValue> multipleBucketSpans = new ArrayList<>();
for (int i = 2; i <= numBucketSpans; i++) {
multipleBucketSpans.add(bucketSpan * i);
multipleBucketSpans.add(TimeValue.timeValueSeconds(bucketSpan.getSeconds() * i));
}
builder.setMultipleBucketSpans(multipleBucketSpans);
}
@ -69,7 +72,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
}
builder.setUsePerPartitionNormalization(false);
return builder.build();
return builder;
}
@Override
@ -219,12 +222,13 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
assertEquals("summaryCount", ac.getSummaryCountFieldName());
builder = createConfigBuilder();
builder.setBucketSpan(1000L);
builder.setMultipleBucketSpans(Arrays.asList(5000L, 10000L, 24000L));
builder.setBucketSpan(TimeValue.timeValueSeconds(1000));
builder.setMultipleBucketSpans(Arrays.asList(
TimeValue.timeValueSeconds(5000), TimeValue.timeValueSeconds(10000), TimeValue.timeValueSeconds(24000)));
ac = builder.build();
assertTrue(ac.getMultipleBucketSpans().contains(5000L));
assertTrue(ac.getMultipleBucketSpans().contains(10000L));
assertTrue(ac.getMultipleBucketSpans().contains(24000L));
assertTrue(ac.getMultipleBucketSpans().contains(TimeValue.timeValueSeconds(5000)));
assertTrue(ac.getMultipleBucketSpans().contains(TimeValue.timeValueSeconds(10000)));
assertTrue(ac.getMultipleBucketSpans().contains(TimeValue.timeValueSeconds(24000)));
}
@ -256,11 +260,11 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
public void testEquals_GivenDifferentBatchSpan() {
AnalysisConfig.Builder builder = createConfigBuilder();
builder.setBatchSpan(86400L);
builder.setBatchSpan(TimeValue.timeValueHours(3));
AnalysisConfig config1 = builder.build();
builder = createConfigBuilder();
builder.setBatchSpan(0L);
builder.setBatchSpan(TimeValue.timeValueHours(4));
AnalysisConfig config2 = builder.build();
assertFalse(config1.equals(config2));
@ -270,11 +274,11 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
public void testEquals_GivenDifferentBucketSpan() {
AnalysisConfig.Builder builder = createConfigBuilder();
builder.setBucketSpan(1800L);
builder.setBucketSpan(TimeValue.timeValueSeconds(1800));
AnalysisConfig config1 = builder.build();
builder = createConfigBuilder();
builder.setBucketSpan(3600L);
builder.setBucketSpan(TimeValue.timeValueHours(1));
AnalysisConfig config2 = builder.build();
assertFalse(config1.equals(config2));
@ -322,11 +326,11 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
public void testEquals_GivenDifferentLatency() {
AnalysisConfig.Builder builder = createConfigBuilder();
builder.setLatency(1800L);
builder.setLatency(TimeValue.timeValueSeconds(1800));
AnalysisConfig config1 = builder.build();
builder = createConfigBuilder();
builder.setLatency(3600L);
builder.setLatency(TimeValue.timeValueSeconds(1801));
AnalysisConfig config2 = builder.build();
assertFalse(config1.equals(config2));
@ -387,16 +391,6 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
assertFalse(config2.equals(config1));
}
public void testBucketSpanOrDefault() {
AnalysisConfig config1 = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("min", "count").build())).build();
assertEquals(AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN, config1.getBucketSpanOrDefault());
AnalysisConfig.Builder builder = createConfigBuilder();
builder.setBucketSpan(100L);
config1 = builder.build();
assertEquals(100L, config1.getBucketSpanOrDefault());
}
public void testExtractReferencedLists() {
DetectionRule rule1 =
new DetectionRule(null, null, Connective.OR, Arrays.asList(RuleCondition.createCategorical("foo", "filter1")));
@ -417,12 +411,12 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
private static AnalysisConfig createFullyPopulatedConfig() {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("min", "count").build()));
builder.setBatchSpan(86400L);
builder.setBucketSpan(3600L);
builder.setBucketSpan(TimeValue.timeValueHours(1));
builder.setBatchSpan(TimeValue.timeValueHours(24));
builder.setCategorizationFieldName("cat");
builder.setCategorizationFilters(Arrays.asList("foo"));
builder.setInfluencers(Arrays.asList("myInfluencer"));
builder.setLatency(3600L);
builder.setLatency(TimeValue.timeValueSeconds(3600));
builder.setPeriod(100L);
builder.setSummaryCountFieldName("sumCount");
return builder.build();
@ -471,71 +465,34 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
} catch (IllegalArgumentException e) {
assertEquals("Unknown function 'made_up_function'", e.getMessage());
}
builder = new Detector.Builder("distinct_count", "somefield");
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Collections.singletonList(builder.build()));
acBuilder.setBatchSpan(-1L);
try {
acBuilder.build();
assertTrue(false); // shouldn't get here
} catch (IllegalArgumentException e) {
assertEquals("batch_span cannot be less than 0. Value = -1", e.getMessage());
}
acBuilder.setBatchSpan(10L);
acBuilder.setBucketSpan(-1L);
try {
acBuilder.build();
assertTrue(false); // shouldn't get here
} catch (IllegalArgumentException e) {
assertEquals("bucket_span cannot be less than 0. Value = -1", e.getMessage());
}
acBuilder.setBucketSpan(3600L);
acBuilder.setPeriod(-1L);
try {
acBuilder.build();
assertTrue(false); // shouldn't get here
} catch (IllegalArgumentException e) {
assertEquals("period cannot be less than 0. Value = -1", e.getMessage());
}
acBuilder.setPeriod(1L);
acBuilder.setLatency(-1L);
try {
acBuilder.build();
assertTrue(false); // shouldn't get here
} catch (IllegalArgumentException e) {
assertEquals("latency cannot be less than 0. Value = -1", e.getMessage());
}
}
public void testVerify_GivenNegativeBucketSpan() {
AnalysisConfig.Builder config = createValidConfig();
config.setBucketSpan(-1L);
config.setBucketSpan(TimeValue.timeValueSeconds(-1));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> config.build());
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "bucket_span", 0, -1), e.getMessage());
assertEquals("bucket_span cannot be less or equal than 0. Value = -1", e.getMessage());
}
public void testVerify_GivenNegativeBatchSpan() {
AnalysisConfig.Builder analysisConfig = createValidConfig();
analysisConfig.setBatchSpan(-1L);
analysisConfig.setBatchSpan(TimeValue.timeValueSeconds(-1));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> analysisConfig.build());
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "batch_span", 0, -1), e.getMessage());
assertEquals("batch_span cannot be less or equal than 0. Value = -1", e.getMessage());
}
public void testVerify_GivenNegativeLatency() {
AnalysisConfig.Builder analysisConfig = createValidConfig();
analysisConfig.setLatency(-1L);
analysisConfig.setLatency(TimeValue.timeValueSeconds(-1));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> analysisConfig.build());
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "latency", 0, -1), e.getMessage());
assertEquals("latency cannot be less than 0. Value = -1", e.getMessage());
}
@ -584,8 +541,8 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
if (onByDefault) {
// Test overlappingBuckets unset
AnalysisConfig.Builder analysisConfig = createValidConfig();
analysisConfig.setBucketSpan(5000L);
analysisConfig.setBatchSpan(0L);
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(5000L));
analysisConfig.setBatchSpan(TimeValue.ZERO);
detectors = new ArrayList<>();
detector = new Detector.Builder("count", null).build();
detectors.add(detector);
@ -597,8 +554,8 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
// Test overlappingBuckets unset
analysisConfig = createValidConfig();
analysisConfig.setBucketSpan(5000L);
analysisConfig.setBatchSpan(0L);
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(5000L));
analysisConfig.setBatchSpan(TimeValue.ZERO);
detectors = new ArrayList<>();
detector = new Detector.Builder("count", null).build();
detectors.add(detector);
@ -610,8 +567,8 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
// Test overlappingBuckets unset
analysisConfig = createValidConfig();
analysisConfig.setBucketSpan(5000L);
analysisConfig.setBatchSpan(0L);
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(5000L));
analysisConfig.setBatchSpan(TimeValue.ZERO);
detectors = new ArrayList<>();
detector = new Detector.Builder("count", null).build();
detectors.add(detector);
@ -626,8 +583,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
// Test overlappingBuckets set
AnalysisConfig.Builder analysisConfig = createValidConfig();
analysisConfig.setBucketSpan(5000L);
analysisConfig.setBatchSpan(0L);
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(5000L));
detectors = new ArrayList<>();
detector = new Detector.Builder("count", null).build();
detectors.add(detector);
@ -640,8 +596,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
// Test overlappingBuckets set
analysisConfig = createValidConfig();
analysisConfig.setBucketSpan(5000L);
analysisConfig.setBatchSpan(0L);
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(5000L));
analysisConfig.setOverlappingBuckets(true);
detectors = new ArrayList<>();
detector = new Detector.Builder("count", null).build();
@ -655,8 +610,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
// Test overlappingBuckets set
analysisConfig = createValidConfig();
analysisConfig.setBucketSpan(5000L);
analysisConfig.setBatchSpan(0L);
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(5000L));
analysisConfig.setOverlappingBuckets(false);
detectors = new ArrayList<>();
detector = new Detector.Builder("count", null).build();
@ -671,37 +625,54 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
public void testMultipleBucketsConfig() {
AnalysisConfig.Builder ac = createValidConfig();
ac.setMultipleBucketSpans(Arrays.asList(10L, 15L, 20L, 25L, 30L, 35L));
ac.setMultipleBucketSpans(Arrays.asList(
TimeValue.timeValueSeconds(10L),
TimeValue.timeValueSeconds(15L),
TimeValue.timeValueSeconds(20L),
TimeValue.timeValueSeconds(25L),
TimeValue.timeValueSeconds(30L),
TimeValue.timeValueSeconds(35L)));
List<Detector> detectors = new ArrayList<>();
Detector detector = new Detector.Builder("count", null).build();
detectors.add(detector);
ac.setDetectors(detectors);
ac.setBucketSpan(4L);
ac.setBucketSpan(TimeValue.timeValueSeconds(4L));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, ac::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE, 10, 4), e.getMessage());
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE, "10s", "4s"), e.getMessage());
ac.setBucketSpan(5L);
ac.setBucketSpan(TimeValue.timeValueSeconds(5L));
ac.build();
AnalysisConfig.Builder ac2 = createValidConfig();
ac2.setBucketSpan(5L);
ac2.setBucketSpan(TimeValue.timeValueSeconds(5L));
ac2.setDetectors(detectors);
ac2.setMultipleBucketSpans(Arrays.asList(10L, 15L, 20L, 25L, 30L));
ac2.setMultipleBucketSpans(Arrays.asList(
TimeValue.timeValueSeconds(10L),
TimeValue.timeValueSeconds(15L),
TimeValue.timeValueSeconds(20L),
TimeValue.timeValueSeconds(25L),
TimeValue.timeValueSeconds(30L)));
assertFalse(ac.equals(ac2));
ac2.setMultipleBucketSpans(Arrays.asList(10L, 15L, 20L, 25L, 30L, 35L));
ac2.setMultipleBucketSpans(Arrays.asList(
TimeValue.timeValueSeconds(10L),
TimeValue.timeValueSeconds(15L),
TimeValue.timeValueSeconds(20L),
TimeValue.timeValueSeconds(25L),
TimeValue.timeValueSeconds(30L),
TimeValue.timeValueSeconds(35L)));
ac.setBucketSpan(222L);
ac.setBucketSpan(TimeValue.timeValueSeconds(222L));
ac.setMultipleBucketSpans(Arrays.asList());
ac.build();
ac.setMultipleBucketSpans(Arrays.asList(222L));
ac.setMultipleBucketSpans(Arrays.asList(TimeValue.timeValueSeconds(222L)));
e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> ac.build());
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE, 222, 222), e.getMessage());
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE, "3.7m", "3.7m"), e.getMessage());
ac.setMultipleBucketSpans(Arrays.asList(-444L, -888L));
ac.setMultipleBucketSpans(Arrays.asList(TimeValue.timeValueSeconds(-444L), TimeValue.timeValueSeconds(-888L)));
e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> ac.build());
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE, -444, 222), e.getMessage());
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE, -444, "3.7m"), e.getMessage());
}
@ -793,9 +764,9 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
Detector detector = new Detector.Builder("count", null).build();
detectors.add(detector);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors);
analysisConfig.setBucketSpan(3600L);
analysisConfig.setBatchSpan(0L);
analysisConfig.setLatency(0L);
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
analysisConfig.setBatchSpan(TimeValue.timeValueHours(2));
analysisConfig.setLatency(TimeValue.ZERO);
analysisConfig.setPeriod(0L);
return analysisConfig;
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.config;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -94,10 +95,10 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
Date date = new Date();
Job.Builder jobDetails1 = new Job.Builder("foo");
jobDetails1.setAnalysisConfig(createAnalysisConfig());
jobDetails1.setBackgroundPersistInterval(10000L);
jobDetails1.setBackgroundPersistInterval(TimeValue.timeValueSeconds(10000L));
jobDetails1.setCreateTime(date);
Job.Builder jobDetails2 = new Job.Builder("foo");
jobDetails2.setBackgroundPersistInterval(8000L);
jobDetails2.setBackgroundPersistInterval(TimeValue.timeValueSeconds(8000L));
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
@ -305,9 +306,9 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
}
public void testVerify_GivenLowBackgroundPersistInterval() {
String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "backgroundPersistInterval", 3600, 3599);
String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "background_persist_interval", 3600, 3599);
Job.Builder builder = buildJobBuilder("foo");
builder.setBackgroundPersistInterval(3599L);
builder.setBackgroundPersistInterval(TimeValue.timeValueSeconds(3599L));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build);
assertEquals(errorMessage, e.getMessage());
}
@ -381,22 +382,13 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
if (randomBoolean()) {
builder.setLastDataTime(new Date(randomNonNegativeLong()));
}
AnalysisConfig.Builder analysisConfig = createAnalysisConfig();
analysisConfig.setBucketSpan(100L);
builder.setAnalysisConfig(analysisConfig);
builder.setAnalysisConfig(AnalysisConfigTests.createRandomized());
builder.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(), randomNonNegativeLong()));
if (randomBoolean()) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(randomFrom(DataDescription.DataFormat.values()));
builder.setDataDescription(dataDescription);
}
String[] outputs;
AnalysisConfig ac = analysisConfig.build();
if (randomBoolean()) {
outputs = new String[] {ac.getDetectors().get(0).getFieldName(), ac.getDetectors().get(0).getOverFieldName()};
} else {
outputs = new String[] {ac.getDetectors().get(0).getFieldName()};
}
if (randomBoolean()) {
builder.setModelDebugConfig(new ModelDebugConfig(randomBoolean(), randomAsciiOfLength(10)));
}
@ -404,7 +396,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
builder.setRenormalizationWindowDays(randomNonNegativeLong());
}
if (randomBoolean()) {
builder.setBackgroundPersistInterval(randomNonNegativeLong());
builder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24)));
}
if (randomBoolean()) {
builder.setModelSnapshotRetentionDays(randomNonNegativeLong());

View File

@ -6,10 +6,10 @@
package org.elasticsearch.xpack.ml.job.config;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -57,7 +57,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
update.setRenormalizationWindowDays(randomNonNegativeLong());
}
if (randomBoolean()) {
update.setBackgroundPersistInterval(randomNonNegativeLong());
update.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24)));
}
if (randomBoolean()) {
update.setModelSnapshotRetentionDays(randomNonNegativeLong());
@ -109,7 +109,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
updateBuilder.setDetectorUpdates(detectorUpdates);
updateBuilder.setModelDebugConfig(modelDebugConfig);
updateBuilder.setAnalysisLimits(analysisLimits);
updateBuilder.setBackgroundPersistInterval(randomNonNegativeLong());
updateBuilder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24)));
updateBuilder.setResultsRetentionDays(randomNonNegativeLong());
updateBuilder.setModelSnapshotRetentionDays(randomNonNegativeLong());
updateBuilder.setRenormalizationWindowDays(randomNonNegativeLong());

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector;
@ -26,7 +27,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
logger = Loggers.getLogger(DataStreamDiagnosticsTests.class);
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(60L);
acBuilder.setBucketSpan(TimeValue.timeValueSeconds(60));
analysisConfig = acBuilder.build();
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
@ -35,9 +36,9 @@ public class ProcessCtrlTests extends ESTestCase {
Detector.Builder detectorBuilder = new Detector.Builder("metric", "value");
detectorBuilder.setPartitionFieldName("foo");
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Collections.singletonList(detectorBuilder.build()));
acBuilder.setBatchSpan(100L);
acBuilder.setBucketSpan(120L);
acBuilder.setLatency(360L);
acBuilder.setBatchSpan(TimeValue.timeValueSeconds(100));
acBuilder.setBucketSpan(TimeValue.timeValueSeconds(120));
acBuilder.setLatency(TimeValue.timeValueSeconds(360));
acBuilder.setPeriod(20L);
acBuilder.setSummaryCountFieldName("summaryField");
acBuilder.setOverlappingBuckets(true);

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -140,7 +141,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws IOException {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build()));
builder.setLatency(2L);
builder.setLatency(TimeValue.timeValueSeconds(2));
analysisConfig = builder.build();
StringBuilder input = new StringBuilder();
@ -173,7 +174,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
public void testWrite_NullByte() throws IOException {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build()));
builder.setLatency(0L);
builder.setLatency(TimeValue.ZERO);
analysisConfig = builder.build();
StringBuilder input = new StringBuilder();
@ -210,7 +211,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
public void testWrite_EmptyInput() throws IOException {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build()));
builder.setLatency(0L);
builder.setLatency(TimeValue.ZERO);
analysisConfig = builder.build();
when(dataCountsReporter.incrementalStats()).thenReturn(new DataCounts("foo"));

View File

@ -6,11 +6,11 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfigTests;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import static org.mockito.Mockito.mock;
@ -31,6 +31,6 @@ public class DataToProcessWriterFactoryTests extends ESTestCase {
private static DataToProcessWriter createWriter(DataDescription dataDescription) {
return DataToProcessWriterFactory.create(true, mock(AutodetectProcess.class), dataDescription,
mock(AnalysisConfig.class), mock(DataCountsReporter.class));
AnalysisConfigTests.createRandomized().build(), mock(DataCountsReporter.class));
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
@ -114,7 +115,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder()
throws Exception {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build()));
builder.setLatency(2L);
builder.setLatency(TimeValue.timeValueSeconds(2));
analysisConfig = builder.build();
StringBuilder input = new StringBuilder();
@ -145,7 +146,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
public void testWrite_GivenMalformedJsonWithoutNestedLevels()
throws Exception {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build()));
builder.setLatency(2L);
builder.setLatency(TimeValue.timeValueSeconds(2));
analysisConfig = builder.build();
StringBuilder input = new StringBuilder();
@ -174,7 +175,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
throws Exception {
Detector detector = new Detector.Builder("metric", "nested.value").build();
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(detector));
builder.setLatency(2L);
builder.setLatency(TimeValue.timeValueSeconds(2));
analysisConfig = builder.build();
StringBuilder input = new StringBuilder();
@ -201,7 +202,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
public void testWrite_GivenMalformedJsonThatNeverRecovers()
throws Exception {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("count", null).build()));
builder.setLatency(2L);
builder.setLatency(TimeValue.timeValueSeconds(2));
analysisConfig = builder.build();
StringBuilder input = new StringBuilder();
@ -217,7 +218,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
public void testWrite_GivenJsonWithArrayField()
throws Exception {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build()));
builder.setLatency(2L);
builder.setLatency(TimeValue.timeValueSeconds(2));
analysisConfig = builder.build();
StringBuilder input = new StringBuilder();
@ -242,7 +243,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
public void testWrite_GivenJsonWithMissingFields()
throws Exception {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build()));
builder.setLatency(2L);
builder.setLatency(TimeValue.timeValueSeconds(2));
analysisConfig = builder.build();
StringBuilder input = new StringBuilder();

View File

@ -13,6 +13,7 @@ import java.util.Date;
import java.util.Deque;
import java.util.List;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector;
@ -73,7 +74,7 @@ public class ScoresUpdaterTests extends ESTestCase {
List<Detector> detectors = new ArrayList<>();
detectors.add(mock(Detector.class));
AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(detectors);
configBuilder.setBucketSpan(DEFAULT_BUCKET_SPAN);
configBuilder.setBucketSpan(TimeValue.timeValueSeconds(DEFAULT_BUCKET_SPAN));
jobBuilder.setAnalysisConfig(configBuilder);
jobBuilder.setCreateTime(new Date());
@ -386,7 +387,7 @@ public class ScoresUpdaterTests extends ESTestCase {
}
private void verifyNormalizerWasInvoked(int times) throws IOException {
int bucketSpan = job.getAnalysisConfig() == null ? 0 : job.getAnalysisConfig().getBucketSpan().intValue();
int bucketSpan = job.getAnalysisConfig() == null ? 0 : ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue();
verify(normalizer, times(times)).normalize(
eq(bucketSpan), eq(false), anyListOf(Normalizable.class),
eq(QUANTILES_STATE));

View File

@ -142,7 +142,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
Detector.Builder d = new Detector.Builder("metric", "responsetime");
d.setByFieldName("by_field_name");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setBucketSpan(3600L);
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
Job.Builder builder = new Job.Builder();
builder.setId(id);
@ -176,8 +176,8 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List<String> indexes) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId);
builder.setQueryDelay(1);
builder.setFrequency(2);
builder.setQueryDelay(TimeValue.timeValueSeconds(1));
builder.setFrequency(TimeValue.timeValueSeconds(2));
builder.setIndexes(indexes);
builder.setTypes(Collections.singletonList("type"));
return builder;

View File

@ -5,9 +5,14 @@
*/
package org.elasticsearch.xpack.ml.utils.time;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.TimeUnit;
public class TimeUtilsTests extends ESTestCase {
public void testdateStringToEpoch() {
assertEquals(1462096800000L, TimeUtils.dateStringToEpoch("2016-05-01T10:00:00Z"));
assertEquals(1462096800333L, TimeUtils.dateStringToEpoch("2016-05-01T10:00:00.333Z"));
@ -22,4 +27,55 @@ public class TimeUtilsTests extends ESTestCase {
assertEquals(1477058573000L, TimeUtils.dateStringToEpoch("1477058573"));
assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500"));
}
public void testCheckMultiple_GivenMultiples() {
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo"));
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo"));
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.HOURS, new ParseField("foo"));
TimeUtils.checkMultiple(TimeValue.timeValueHours(2), TimeUnit.HOURS, new ParseField("foo"));
TimeUtils.checkMultiple(TimeValue.timeValueSeconds(60), TimeUnit.SECONDS, new ParseField("foo"));
TimeUtils.checkMultiple(TimeValue.timeValueSeconds(60), TimeUnit.MILLISECONDS, new ParseField("foo"));
}
public void testCheckMultiple_GivenNonMultiple() {
expectThrows(IllegalArgumentException.class, () ->
TimeUtils.checkMultiple(TimeValue.timeValueMillis(500), TimeUnit.SECONDS, new ParseField("foo")));
}
public void testCheckPositiveMultiple_GivenNegative() {
expectThrows(IllegalArgumentException.class, () ->
TimeUtils.checkPositiveMultiple(TimeValue.timeValueMillis(-1), TimeUnit.MILLISECONDS, new ParseField("foo")));
}
public void testCheckPositiveMultiple_GivenZero() {
expectThrows(IllegalArgumentException.class, () ->
TimeUtils.checkPositiveMultiple(TimeValue.ZERO, TimeUnit.SECONDS, new ParseField("foo")));
}
public void testCheckPositiveMultiple_GivenPositiveNonMultiple() {
expectThrows(IllegalArgumentException.class, () ->
TimeUtils.checkPositiveMultiple(TimeValue.timeValueMillis(500), TimeUnit.SECONDS, new ParseField("foo")));
}
public void testCheckPositiveMultiple_GivenPositiveMultiple() {
TimeUtils.checkPositiveMultiple(TimeValue.timeValueMillis(1), TimeUnit.MILLISECONDS, new ParseField("foo"));
}
public void testCheckNonNegativeMultiple_GivenNegative() {
expectThrows(IllegalArgumentException.class, () ->
TimeUtils.checkNonNegativeMultiple(TimeValue.timeValueMillis(-1), TimeUnit.MILLISECONDS, new ParseField("foo")));
}
public void testCheckNonNegativeMultiple_GivenZero() {
TimeUtils.checkNonNegativeMultiple(TimeValue.ZERO, TimeUnit.SECONDS, new ParseField("foo"));
}
public void testCheckNonNegativeMultiple_GivenPositiveNonMultiple() {
expectThrows(IllegalArgumentException.class, () ->
TimeUtils.checkNonNegativeMultiple(TimeValue.timeValueMillis(500), TimeUnit.SECONDS, new ParseField("foo")));
}
public void testCheckNonNegativeMultiple_GivenPositiveMultiple() {
TimeUtils.checkNonNegativeMultiple(TimeValue.timeValueMillis(1), TimeUnit.MILLISECONDS, new ParseField("foo"));
}
}

View File

@ -32,7 +32,7 @@ setup:
"result_type": "record",
"timestamp": "2016-06-01T00:00:00Z",
"anomaly_score": 60.0,
"bucket_span": 1,
"bucket_span": "1s",
"by_field_value": "A by field",
"partition_field_value": "A partition field",
"over_field_value": "An over field",
@ -55,7 +55,7 @@ setup:
"job_id": "custom-all-test-2",
"result_type": "record",
"timestamp": "2016-06-01T00:00:00Z",
"bucket_span": 1,
"bucket_span": "1s",
"by_field_value": "A by field"
}

View File

@ -6,7 +6,7 @@ setup:
{
"job_id":"job-1",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
@ -22,7 +22,7 @@ setup:
body: >
{
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
@ -143,7 +143,9 @@ setup:
"job_id":"job-1",
"indexes":["index-foo"],
"types":["type-bar"],
"scroll_size": 2000
"scroll_size": 2000,
"frequency": "1m",
"query_delay": "30s"
}
- do:
@ -152,13 +154,17 @@ setup:
body: >
{
"indexes":["index-*"],
"scroll_size": 10000
"scroll_size": 10000,
"frequency": "2m",
"query_delay": "0s"
}
- match: { datafeed_id: "test-datafeed-1" }
- match: { job_id: "job-1" }
- match: { indexes: ["index-*"] }
- match: { types: ["type-bar"] }
- match: { scroll_size: 10000 }
- match: { frequency: "2m" }
- match: { query_delay: "0s" }
---
"Test update datafeed to point to different job":

View File

@ -10,7 +10,7 @@ setup:
"job_id": "foo",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span" : 3600,
"bucket_span" : "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -127,7 +127,7 @@ setup:
"job_id":"farequote2",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "3600s",
"detectors" :[{"function":"mean","field_name":"airline",
"detector_rules": [
{

View File

@ -6,7 +6,7 @@ setup:
{
"job_id":"job-1",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
@ -23,7 +23,7 @@ setup:
{
"job_id":"job-2",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {

View File

@ -6,7 +6,7 @@ setup:
{
"job_id":"job-1",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
@ -23,7 +23,7 @@ setup:
{
"job_id":"job-2",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {

View File

@ -13,7 +13,7 @@ setup:
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -33,7 +33,7 @@ setup:
"job_id":"farequote2",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -260,7 +260,7 @@ setup:
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -23,7 +23,7 @@
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -68,7 +68,7 @@
"job_id":"a_different_id",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -87,7 +87,7 @@
"job_id":"a_different_id",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "3600s",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -107,7 +107,7 @@
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "3600000ms",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -127,7 +127,7 @@
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -145,7 +145,7 @@
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -163,7 +163,7 @@
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -197,7 +197,7 @@
"model_memory_limit": 10
},
"renormalization_window_days": 1,
"background_persist_interval": 7200,
"background_persist_interval": "2h",
"model_snapshot_retention_days": 3,
"results_retention_days": 4,
"custom_settings": {
@ -229,7 +229,7 @@
"model_memory_limit": 20
},
"renormalization_window_days": 10,
"background_persist_interval": 10800,
"background_persist_interval": "3h",
"model_snapshot_retention_days": 30,
"results_retention_days": 40,
"categorization_filters" : ["cat3.*"],
@ -246,7 +246,7 @@
- match: { analysis_config.detectors.0.detector_rules.0.target_field_name: "airline" }
- match: { analysis_config.detectors.1.detector_description: "updated description" }
- match: { renormalization_window_days: 10 }
- match: { background_persist_interval: 10800 }
- match: { background_persist_interval: "3h" }
- match: { model_snapshot_retention_days: 30 }
- match: { results_retention_days: 40 }
@ -269,7 +269,7 @@
"job_id":"datafeed-job",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -7,7 +7,7 @@ setup:
"job_id":"job-1",
"description":"Job 1",
"analysis_config" : {
"bucket_span":300,
"bucket_span": "300s",
"detectors" :[{"function":"count"}]
},
"data_description" : {
@ -25,7 +25,7 @@ setup:
"job_id":"job-2",
"description":"Job 2",
"analysis_config" : {
"bucket_span":600,
"bucket_span": "600s",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -7,7 +7,7 @@ setup:
"job_id":"job-stats-test",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -29,7 +29,7 @@ setup:
"job_id":"datafeed-job",
"description":"A job with a datafeed",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -7,7 +7,7 @@ setup:
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -80,7 +80,7 @@ setup:
body: >
{
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -124,7 +124,7 @@ setup:
body: >
{
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"summary_count_field_name": "doc_count",
"detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
@ -192,7 +192,7 @@ setup:
body: >
{
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "3600s",
"summary_count_field_name": "dc_airline",
"detectors" :[{"function":"count"}]
},
@ -246,7 +246,7 @@ setup:
body: >
{
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "3600s",
"summary_count_field_name": "event_rate",
"detectors" :[{"function":"mean","field_name":"responsetime","by_field_name":"airline"}]
},
@ -329,7 +329,7 @@ setup:
body: >
{
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
@ -361,7 +361,7 @@ setup:
body: >
{
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -7,7 +7,7 @@ setup:
"job_id":"foo",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -17,7 +17,7 @@ setup:
"job_id":"datafeed-job",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {

View File

@ -5,7 +5,7 @@
body: >
{
"analysis_config": {
"bucket_span": 3600,
"bucket_span": "1h",
"detectors": [{"function": "metric", "field_name": "responsetime", "by_field_name": "airline"}]
},
"data_description": {
@ -25,7 +25,7 @@
body: >
{
"analysis_config": {
"bucket_span": 3600,
"bucket_span": "1h",
"detectors": [{"function": "metric", "field_name": "responsetime", "by_field_name": "airline"}]
},
"data_description": {
@ -44,7 +44,7 @@
{
"job_id": "farequote",
"analysis_config": {
"bucket_span": 3600,
"bucket_span": "1h",
"detectors": [{"function": "metric", "field_name": "responsetime", "by_field_name": "airline"}]
},
"data_description": {
@ -65,7 +65,7 @@
{
"job_id": "_",
"analysis_config": {
"bucket_span": 3600,
"bucket_span": "1h",
"detectors": [{"function": "metric", "field_name": "responsetime", "by_field_name": "airline"}]
},
"data_description": {

View File

@ -177,7 +177,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
xContentBuilder.field("description", "Analysis of response time by airline");
xContentBuilder.startObject("analysis_config");
xContentBuilder.field("bucket_span", 3600);
xContentBuilder.field("bucket_span", "3600s");
xContentBuilder.startArray("detectors");
xContentBuilder.startObject();
xContentBuilder.field("function", "metric");

View File

@ -30,7 +30,7 @@ public class MlPluginDisabledIT extends ESRestTestCase {
xContentBuilder.field("description", "Analysis of response time by airline");
xContentBuilder.startObject("analysis_config");
xContentBuilder.field("bucket_span", 3600);
xContentBuilder.field("bucket_span", "3600s");
xContentBuilder.startArray("detectors");
xContentBuilder.startObject();
xContentBuilder.field("function", "metric");

View File

@ -263,7 +263,7 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
String job = "{\n" +
" \"description\":\"Domain splitting\",\n" +
" \"analysis_config\" : {\n" +
" \"bucket_span\":3600,\n" +
" \"bucket_span\":\"3600s\",\n" +
" \"detectors\" :[{\"function\":\"count\", \"by_field_name\" : \"domain_split\"}]\n" +
" },\n" +
" \"data_description\" : {\n" +