[7.x] Report exponential_avg_bucket_processing_time which gives more weight to recent buckets (#43189) (#43263)

This commit is contained in:
Przemysław Witek 2019-06-17 08:58:26 +02:00 committed by GitHub
parent a191ebabba
commit b2613a123d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 299 additions and 104 deletions

View File

@ -42,12 +42,15 @@ public class TimingStats implements ToXContentObject {
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS =
new ParseField("exponential_average_bucket_processing_time_ms");
public static final ConstructingObjectParser<TimingStats, Void> PARSER =
new ConstructingObjectParser<>(
"timing_stats",
true,
args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
args ->
new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));
static {
PARSER.declareString(constructorArg(), Job.ID);
@ -55,6 +58,7 @@ public class TimingStats implements ToXContentObject {
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS);
}
private final String jobId;
@ -62,18 +66,21 @@ public class TimingStats implements ToXContentObject {
private Double minBucketProcessingTimeMs;
private Double maxBucketProcessingTimeMs;
private Double avgBucketProcessingTimeMs;
private Double exponentialAvgBucketProcessingTimeMs;
public TimingStats(
String jobId,
long bucketCount,
@Nullable Double minBucketProcessingTimeMs,
@Nullable Double maxBucketProcessingTimeMs,
@Nullable Double avgBucketProcessingTimeMs) {
@Nullable Double avgBucketProcessingTimeMs,
@Nullable Double exponentialAvgBucketProcessingTimeMs) {
this.jobId = jobId;
this.bucketCount = bucketCount;
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs;
}
public String getJobId() {
@ -96,6 +103,10 @@ public class TimingStats implements ToXContentObject {
return avgBucketProcessingTimeMs;
}
public Double getExponentialAvgBucketProcessingTimeMs() {
return exponentialAvgBucketProcessingTimeMs;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
@ -110,6 +121,9 @@ public class TimingStats implements ToXContentObject {
if (avgBucketProcessingTimeMs != null) {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
if (exponentialAvgBucketProcessingTimeMs != null) {
builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
}
@ -123,12 +137,19 @@ public class TimingStats implements ToXContentObject {
&& this.bucketCount == that.bucketCount
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
&& Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs);
}
@Override
public int hashCode() {
return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
return Objects.hash(
jobId,
bucketCount,
minBucketProcessingTimeMs,
maxBucketProcessingTimeMs,
avgBucketProcessingTimeMs,
exponentialAvgBucketProcessingTimeMs);
}
@Override

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
@ -33,6 +34,7 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
randomLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}
@ -52,29 +54,31 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
}
public void testConstructor() {
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), equalTo(7.89));
}
public void testConstructor_NullValues() {
TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null);
TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null, null);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertNull(stats.getMinBucketProcessingTimeMs());
assertNull(stats.getMaxBucketProcessingTimeMs());
assertNull(stats.getAvgBucketProcessingTimeMs());
assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
}
public void testEquals() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
assertTrue(stats1.equals(stats1));
assertTrue(stats1.equals(stats2));
@ -82,9 +86,9 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
}
public void testHashCode() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
assertEquals(stats1.hashCode(), stats1.hashCode());
assertEquals(stats1.hashCode(), stats2.hashCode());

View File

@ -105,7 +105,14 @@ The API returns the following results:
"log_time": 1491948163000,
"timestamp": 1455234600000
},
"state": "closed"
"state": "closed",
"timing_stats": {
"job_id": "farequote",
"minimum_bucket_processing_time_ms": 0.0,
"maximum_bucket_processing_time_ms": 15.0,
"average_bucket_processing_time_ms": 8.75,
"exponential_average_bucket_processing_time_ms": 6.1435899
}
}
]
}

View File

@ -19,11 +19,15 @@ progress of a job.
`model_size_stats`::
(object) An object that provides information about the size and contents of the model.
See <<ml-modelsizestats,model size stats objects>>
See <<ml-modelsizestats,model size stats objects>>.
`forecasts_stats`::
(object) An object that provides statistical information about forecasts
of this job. See <<ml-forecastsstats, forecasts stats objects>>
of this job. See <<ml-forecastsstats, forecasts stats objects>>.
`timing_stats`::
(object) An object that provides statistical information about timing aspect
of this job. See <<ml-timingstats, timing stats objects>>.
`node`::
(object) For open jobs only, contains information about the node where the
@ -209,6 +213,31 @@ The `forecasts_stats` object shows statistics about forecasts. It has the follow
NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise
these fields are omitted.
[float]
[[ml-timingstats]]
==== Timing Stats Objects
The `timing_stats` object shows timing-related statistics about the job's progress. It has the following properties:
`job_id`::
(string) A numerical character string that uniquely identifies the job.
`bucket_count`::
(long) The number of buckets processed.
`minimum_bucket_processing_time_ms`::
(double) Minimum among all bucket processing times in milliseconds.
`maximum_bucket_processing_time_ms`::
(double) Maximum among all bucket processing times in milliseconds.
`average_bucket_processing_time_ms`::
(double) Average of all bucket processing times in milliseconds.
`exponential_average_bucket_processing_time_ms`::
(double) Exponential moving average of all bucket processing times in milliseconds.
[float]
[[ml-stats-node]]
==== Node Objects

View File

@ -863,6 +863,9 @@ public class ElasticsearchMappings {
.endObject()
.startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject();
}

View File

@ -31,6 +31,8 @@ public class TimingStats implements ToXContentObject, Writeable {
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS =
new ParseField("exponential_average_bucket_processing_time_ms");
public static final ParseField TYPE = new ParseField("timing_stats");
@ -38,7 +40,8 @@ public class TimingStats implements ToXContentObject, Writeable {
new ConstructingObjectParser<>(
TYPE.getPreferredName(),
true,
args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
args ->
new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));
static {
PARSER.declareString(constructorArg(), Job.ID);
@ -46,6 +49,7 @@ public class TimingStats implements ToXContentObject, Writeable {
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS);
}
public static String documentId(String jobId) {
@ -57,26 +61,35 @@ public class TimingStats implements ToXContentObject, Writeable {
private Double minBucketProcessingTimeMs;
private Double maxBucketProcessingTimeMs;
private Double avgBucketProcessingTimeMs;
private Double exponentialAvgBucketProcessingTimeMs;
public TimingStats(
String jobId,
long bucketCount,
@Nullable Double minBucketProcessingTimeMs,
@Nullable Double maxBucketProcessingTimeMs,
@Nullable Double avgBucketProcessingTimeMs) {
@Nullable Double avgBucketProcessingTimeMs,
@Nullable Double exponentialAvgBucketProcessingTimeMs) {
this.jobId = jobId;
this.bucketCount = bucketCount;
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs;
}
public TimingStats(String jobId) {
this(jobId, 0, null, null, null);
this(jobId, 0, null, null, null, null);
}
public TimingStats(TimingStats lhs) {
this(lhs.jobId, lhs.bucketCount, lhs.minBucketProcessingTimeMs, lhs.maxBucketProcessingTimeMs, lhs.avgBucketProcessingTimeMs);
this(
lhs.jobId,
lhs.bucketCount,
lhs.minBucketProcessingTimeMs,
lhs.maxBucketProcessingTimeMs,
lhs.avgBucketProcessingTimeMs,
lhs.exponentialAvgBucketProcessingTimeMs);
}
public TimingStats(StreamInput in) throws IOException {
@ -85,6 +98,7 @@ public class TimingStats implements ToXContentObject, Writeable {
this.minBucketProcessingTimeMs = in.readOptionalDouble();
this.maxBucketProcessingTimeMs = in.readOptionalDouble();
this.avgBucketProcessingTimeMs = in.readOptionalDouble();
this.exponentialAvgBucketProcessingTimeMs = in.readOptionalDouble();
}
public String getJobId() {
@ -107,12 +121,16 @@ public class TimingStats implements ToXContentObject, Writeable {
return avgBucketProcessingTimeMs;
}
public Double getExponentialAvgBucketProcessingTimeMs() {
return exponentialAvgBucketProcessingTimeMs;
}
/**
* Updates the statistics (min, max, avg) for the given data point (bucket processing time).
*/
public void updateStats(double bucketProcessingTimeMs) {
if (bucketProcessingTimeMs < 0.0) {
throw new IllegalArgumentException("bucketProcessingTimeMs must be positive, was: " + bucketProcessingTimeMs);
throw new IllegalArgumentException("bucketProcessingTimeMs must be non-negative, was: " + bucketProcessingTimeMs);
}
if (minBucketProcessingTimeMs == null || bucketProcessingTimeMs < minBucketProcessingTimeMs) {
minBucketProcessingTimeMs = bucketProcessingTimeMs;
@ -127,9 +145,21 @@ public class TimingStats implements ToXContentObject, Writeable {
// bucket processing times.
avgBucketProcessingTimeMs = (bucketCount * avgBucketProcessingTimeMs + bucketProcessingTimeMs) / (bucketCount + 1);
}
if (exponentialAvgBucketProcessingTimeMs == null) {
exponentialAvgBucketProcessingTimeMs = bucketProcessingTimeMs;
} else {
// Calculate the exponential moving average (see https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) of
// bucket processing times.
exponentialAvgBucketProcessingTimeMs = (1 - ALPHA) * exponentialAvgBucketProcessingTimeMs + ALPHA * bucketProcessingTimeMs;
}
bucketCount++;
}
/**
* Constant smoothing factor used for calculating exponential moving average. Represents the degree of weighting decrease.
*/
private static double ALPHA = 0.01;
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
@ -137,6 +167,7 @@ public class TimingStats implements ToXContentObject, Writeable {
out.writeOptionalDouble(minBucketProcessingTimeMs);
out.writeOptionalDouble(maxBucketProcessingTimeMs);
out.writeOptionalDouble(avgBucketProcessingTimeMs);
out.writeOptionalDouble(exponentialAvgBucketProcessingTimeMs);
}
@Override
@ -153,6 +184,9 @@ public class TimingStats implements ToXContentObject, Writeable {
if (avgBucketProcessingTimeMs != null) {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
if (exponentialAvgBucketProcessingTimeMs != null) {
builder.field(EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
}
@ -166,12 +200,19 @@ public class TimingStats implements ToXContentObject, Writeable {
&& this.bucketCount == that.bucketCount
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
&& Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs);
}
@Override
public int hashCode() {
return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
return Objects.hash(
jobId,
bucketCount,
minBucketProcessingTimeMs,
maxBucketProcessingTimeMs,
avgBucketProcessingTimeMs,
exponentialAvgBucketProcessingTimeMs);
}
@Override
@ -185,7 +226,8 @@ public class TimingStats implements ToXContentObject, Writeable {
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
|| differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs);
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs)
|| differSignificantly(stats1.exponentialAvgBucketProcessingTimeMs, stats2.exponentialAvgBucketProcessingTimeMs);
}
/**

View File

@ -179,6 +179,7 @@ public final class ReservedFieldNames {
TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
GetResult._ID,
GetResult._INDEX,

View File

@ -77,7 +77,6 @@ public class ElasticsearchMappingsTests extends ESTestCase {
// These are not reserved because they're data types, not field names
overridden.add(Result.TYPE.getPreferredName());
overridden.add(DataCounts.TYPE.getPreferredName());
overridden.add(TimingStats.TYPE.getPreferredName());
overridden.add(CategoryDefinition.TYPE.getPreferredName());
overridden.add(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
overridden.add(ModelSnapshot.TYPE.getPreferredName());

View File

@ -8,7 +8,10 @@ package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.hamcrest.CustomTypeSafeMatcher;
import org.hamcrest.Matcher;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ -23,6 +26,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
randomLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}
@ -42,9 +46,9 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
}
public void testEquals() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
assertTrue(stats1.equals(stats1));
assertTrue(stats1.equals(stats2));
@ -52,9 +56,9 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
}
public void testHashCode() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
assertEquals(stats1.hashCode(), stats1.hashCode());
assertEquals(stats1.hashCode(), stats2.hashCode());
@ -69,20 +73,22 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
}
public void testConstructor() {
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), equalTo(7.89));
}
public void testCopyConstructor() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(stats1);
assertThat(stats2.getJobId(), equalTo(JOB_ID));
@ -90,6 +96,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
assertThat(stats2.getMinBucketProcessingTimeMs(), equalTo(1.0));
assertThat(stats2.getMaxBucketProcessingTimeMs(), equalTo(2.0));
assertThat(stats2.getAvgBucketProcessingTimeMs(), equalTo(1.23));
assertThat(stats2.getExponentialAvgBucketProcessingTimeMs(), equalTo(7.89));
assertEquals(stats1, stats2);
assertEquals(stats1.hashCode(), stats2.hashCode());
}
@ -98,19 +105,19 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
TimingStats stats = new TimingStats(JOB_ID);
stats.updateStats(3);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 1, 3.0, 3.0, 3.0)));
assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 1, 3.0, 3.0, 3.0, 3.0), 1e-9));
stats.updateStats(2);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 2, 2.0, 3.0, 2.5)));
assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 2, 2.0, 3.0, 2.5, 2.99), 1e-9));
stats.updateStats(4);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 3, 2.0, 4.0, 3.0)));
assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 3, 2.0, 4.0, 3.0, 3.0001), 1e-9));
stats.updateStats(1);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 4, 1.0, 4.0, 2.5)));
assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 4, 1.0, 4.0, 2.5, 2.980099), 1e-9));
stats.updateStats(5);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0)));
assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0, 3.00029801), 1e-9));
}
public void testDocumentId() {
@ -120,15 +127,15 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
public void testTimingStatsDifferSignificantly() {
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0)),
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)),
is(false));
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0)),
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)),
is(false));
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0)),
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)),
is(true));
}
@ -143,4 +150,28 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
assertThat(TimingStats.differSignificantly(0.0, 1.0), is(true));
assertThat(TimingStats.differSignificantly(1.0, 0.0), is(true));
}
/**
* Creates a matcher of {@link TimingStats}s that matches when an examined stats are equal
* to the specified <code>operand</code>, within a range of +/- <code>error</code>.
*
* @param operand
* the expected value of matching stats
* @param error
* the delta (+/-) within which matches will be allowed
*/
private static Matcher<TimingStats> areCloseTo(TimingStats operand, double error) {
return new CustomTypeSafeMatcher<TimingStats>("TimingStats close to " + operand) {
@Override
protected boolean matchesSafely(TimingStats item) {
return equalTo(operand.getJobId()).matches(item.getJobId())
&& equalTo(operand.getBucketCount()).matches(item.getBucketCount())
&& closeTo(operand.getMinBucketProcessingTimeMs(), error).matches(item.getMinBucketProcessingTimeMs())
&& closeTo(operand.getMaxBucketProcessingTimeMs(), error).matches(item.getMaxBucketProcessingTimeMs())
&& closeTo(operand.getAvgBucketProcessingTimeMs(), error).matches(item.getAvgBucketProcessingTimeMs())
&& closeTo(operand.getExponentialAvgBucketProcessingTimeMs(), error)
.matches(item.getExponentialAvgBucketProcessingTimeMs());
}
};
}
}

View File

@ -63,6 +63,8 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@ -164,6 +166,31 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
assertEquals(quantiles, persistedQuantiles.get());
}
public void testProcessResults_TimingStats() throws Exception {
ResultsBuilder resultBuilder = new ResultsBuilder()
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000));
resultProcessor.process(resultBuilder.buildTestProcess());
resultProcessor.awaitCompletion();
TimingStats timingStats = resultProcessor.timingStats();
assertThat(timingStats.getJobId(), equalTo(JOB_ID));
assertThat(timingStats.getBucketCount(), equalTo(10L));
assertThat(timingStats.getMinBucketProcessingTimeMs(), equalTo(100.0));
assertThat(timingStats.getMaxBucketProcessingTimeMs(), equalTo(1000.0));
assertThat(timingStats.getAvgBucketProcessingTimeMs(), equalTo(550.0));
assertThat(timingStats.getExponentialAvgBucketProcessingTimeMs(), closeTo(143.244, 1e-3));
}
public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception {
when(renormalizer.isEnabled()).thenReturn(true);
@ -284,18 +311,24 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
client().execute(PutJobAction.INSTANCE, request).actionGet();
}
private Bucket createBucket(boolean isInterim) {
private static Bucket createBucket(boolean isInterim) {
Bucket bucket = new BucketTests().createTestInstance(JOB_ID);
bucket.setInterim(isInterim);
return bucket;
}
private Date randomDate() {
private static Bucket createBucket(boolean isInterim, long processingTimeMs) {
Bucket bucket = createBucket(isInterim);
bucket.setProcessingTimeMs(processingTimeMs);
return bucket;
}
private static Date randomDate() {
// between 1970 and 2065
return new Date(randomLongBetween(0, 3000000000000L));
}
private List<AnomalyRecord> createRecords(boolean isInterim) {
private static List<AnomalyRecord> createRecords(boolean isInterim) {
List<AnomalyRecord> records = new ArrayList<>();
int count = randomIntBetween(0, 100);
@ -310,7 +343,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
return records;
}
private List<Influencer> createInfluencers(boolean isInterim) {
private static List<Influencer> createInfluencers(boolean isInterim) {
List<Influencer> influencers = new ArrayList<>();
int count = randomIntBetween(0, 100);
@ -323,15 +356,15 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
return influencers;
}
private CategoryDefinition createCategoryDefinition() {
private static CategoryDefinition createCategoryDefinition() {
return new CategoryDefinitionTests().createTestInstance(JOB_ID);
}
private ModelPlot createModelPlot() {
private static ModelPlot createModelPlot() {
return new ModelPlotTests().createTestInstance(JOB_ID);
}
private ModelSizeStats createModelSizeStats() {
private static ModelSizeStats createModelSizeStats() {
ModelSizeStats.Builder builder = new ModelSizeStats.Builder(JOB_ID);
builder.setTimestamp(randomDate());
builder.setLogTime(randomDate());
@ -344,15 +377,15 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
return builder.build();
}
private ModelSnapshot createModelSnapshot() {
private static ModelSnapshot createModelSnapshot() {
return new ModelSnapshot.Builder(JOB_ID).setSnapshotId(randomAlphaOfLength(12)).build();
}
private Quantiles createQuantiles() {
private static Quantiles createQuantiles() {
return new Quantiles(JOB_ID, randomDate(), randomAlphaOfLength(100));
}
private FlushAcknowledgement createFlushAcknowledgement() {
private static FlushAcknowledgement createFlushAcknowledgement() {
return new FlushAcknowledgement(randomAlphaOfLength(5), randomDate());
}

View File

@ -204,7 +204,7 @@ public class JobResultsPersisterTests extends ESTestCase {
Client client = mockClient(bulkRequestCaptor);
JobResultsPersister persister = new JobResultsPersister(client);
TimingStats timingStats = new TimingStats("foo", 7, 1.0, 2.0, 1.23);
TimingStats timingStats = new TimingStats("foo", 7, 1.0, 2.0, 1.23, 7.89);
persister.bulkPersisterBuilder(JOB_ID).persistTimingStats(timingStats).executeRequest();
verify(client, times(1)).bulk(bulkRequestCaptor.capture());
@ -219,6 +219,7 @@ public class JobResultsPersisterTests extends ESTestCase {
expectedSourceAsMap.put("minimum_bucket_processing_time_ms", 1.0);
expectedSourceAsMap.put("maximum_bucket_processing_time_ms", 2.0);
expectedSourceAsMap.put("average_bucket_processing_time_ms", 1.23);
expectedSourceAsMap.put("exponential_average_bucket_processing_time_ms", 7.89);
assertThat(indexRequest.sourceAsMap(), equalTo(expectedSourceAsMap));
verify(client, times(1)).threadPool();

View File

@ -838,6 +838,7 @@ public class JobResultsProviderTests extends ESTestCase {
timingStatsMap.put(TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1.0);
timingStatsMap.put(TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1000.0);
timingStatsMap.put(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 666.0);
timingStatsMap.put(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 777.0);
List<Map<String, Object>> source = Arrays.asList(timingStatsMap);
SearchResponse response = createSearchResponse(source);
@ -849,7 +850,7 @@ public class JobResultsProviderTests extends ESTestCase {
JobResultsProvider provider = createProvider(client);
provider.timingStats(
"foo",
stats -> assertThat(stats, equalTo(new TimingStats("foo", 7, 1.0, 1000.0, 666.0))),
stats -> assertThat(stats, equalTo(new TimingStats("foo", 7, 1.0, 1000.0, 666.0, 777.0))),
e -> { throw new AssertionError(); });
verify(client).prepareSearch(indexName);

View File

@ -15,13 +15,13 @@ public class AutodetectParamsTests extends ESTestCase {
private static final String JOB_ID = "my-job";
public void testBuilder_WithTimingStats() {
TimingStats timingStats = new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0);
TimingStats timingStats = new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0, 1000.0);
AutodetectParams params = new AutodetectParams.Builder(JOB_ID).setTimingStats(timingStats).build();
assertThat(params.timingStats(), equalTo(timingStats));
timingStats.updateStats(2000.0);
assertThat(timingStats, equalTo(new TimingStats(JOB_ID, 8, 1.0, 2000.0, 832.75)));
assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0)));
assertThat(timingStats, equalTo(new TimingStats(JOB_ID, 8, 1.0, 2000.0, 832.75, 1010.0)));
assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0, 1000.0)));
}
public void testBuilder_WithoutTimingStats() {

View File

@ -103,7 +103,7 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
final DataCounts dataCounts = new DataCounts("_job_id", 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, date3, date4, date5, date6, date7);
final ForecastStats forecastStats = new ForecastStats();
final TimingStats timingStats = new TimingStats("_job_id", 100, 10.0, 30.0, 20.0);
final TimingStats timingStats = new TimingStats("_job_id", 100, 10.0, 30.0, 20.0, 25.0);
final JobStats jobStats = new JobStats(
"_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode, "_explanation", time, timingStats);
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);
@ -177,7 +177,8 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
+ "\"bucket_count\":100,"
+ "\"minimum_bucket_processing_time_ms\":10.0,"
+ "\"maximum_bucket_processing_time_ms\":30.0,"
+ "\"average_bucket_processing_time_ms\":20.0"
+ "\"average_bucket_processing_time_ms\":20.0,"
+ "\"exponential_average_bucket_processing_time_ms\":25.0"
+ "}"
+ "}"
+ "}", xContent.utf8ToString());

View File

@ -78,6 +78,7 @@ setup:
body: >
{"airline":"AAL","responsetime":"132.2046","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","time":"1403481600"}
{"airline":"JZA","responsetime":"244.1276","time":"1403485200"}
- do:
ml.flush_job:
@ -88,18 +89,22 @@ setup:
- do:
ml.get_job_stats:
job_id: job-stats-test
- match: { jobs.0.job_id : job-stats-test }
- match: { jobs.0.data_counts.processed_record_count: 2 }
- match: { jobs.0.data_counts.processed_field_count: 4 }
- match: { jobs.0.data_counts.input_field_count: 4 }
- match: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: opened }
- is_true: jobs.0.node.name
- is_true: jobs.0.node.transport_address
- match: { jobs.0.node.attributes.ml\.max_open_jobs: "20"}
- is_true: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- gte: { jobs.0.timing_stats.bucket_count: 0 }
- match: { jobs.0.job_id : job-stats-test }
- match: { jobs.0.data_counts.processed_record_count: 3 }
- match: { jobs.0.data_counts.processed_field_count: 6 }
- match: { jobs.0.data_counts.input_field_count: 6 }
- gte: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: opened }
- is_true: jobs.0.node.name
- is_true: jobs.0.node.transport_address
- match: { jobs.0.node.attributes.ml\.max_open_jobs: "20"}
- is_true: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- match: { jobs.0.timing_stats.bucket_count: 1 } # Records are 1h apart and bucket span is 1h so 1 bucket is produced
- gte: { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 }
- gte: { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 }
- gte: { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 }
- gte: { jobs.0.timing_stats.exponential_average_bucket_processing_time_ms: 0.0 }
---
"Test get job stats for closed job":
@ -110,6 +115,7 @@ setup:
body: >
{"airline":"AAL","responsetime":"132.2046","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","time":"1403481600"}
{"airline":"JZA","responsetime":"244.1276","time":"1403485200"}
- do:
ml.flush_job:
@ -124,16 +130,20 @@ setup:
- do:
ml.get_job_stats:
job_id: job-stats-test
- match: { jobs.0.job_id : job-stats-test }
- match: { jobs.0.data_counts.processed_record_count: 2 }
- match: { jobs.0.data_counts.processed_field_count: 4}
- match: { jobs.0.data_counts.input_field_count: 4 }
- gt: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: closed }
- match: { jobs.0.job_id : job-stats-test }
- match: { jobs.0.data_counts.processed_record_count: 3 }
- match: { jobs.0.data_counts.processed_field_count: 6 }
- match: { jobs.0.data_counts.input_field_count: 6 }
- gt: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: closed }
- is_false: jobs.0.node
- is_false: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- gte: { jobs.0.timing_stats.bucket_count: 0 }
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- match: { jobs.0.timing_stats.bucket_count: 1 } # Records are 1h apart and bucket span is 1h so 1 bucket is produced
- gte: { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 }
- gte: { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 }
- gte: { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 }
- gte: { jobs.0.timing_stats.exponential_average_bucket_processing_time_ms: 0.0 }
---
"Test get job stats of datafeed job that has not received any data":
@ -141,13 +151,17 @@ setup:
- do:
ml.get_job_stats:
job_id: jobs-get-stats-datafeed-job
- match: { jobs.0.job_id : jobs-get-stats-datafeed-job }
- match: { jobs.0.data_counts.processed_record_count: 0 }
- match: { jobs.0.model_size_stats.model_bytes : 0 }
- match: { jobs.0.state: opened }
- is_true: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job }
- match: { jobs.0.timing_stats.bucket_count: 0 }
- match: { jobs.0.job_id : jobs-get-stats-datafeed-job }
- match: { jobs.0.data_counts.processed_record_count: 0 }
- match: { jobs.0.model_size_stats.model_bytes : 0 }
- match: { jobs.0.state: opened }
- is_true: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job }
- match: { jobs.0.timing_stats.bucket_count: 0 }
- is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms
- is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms
- is_false: jobs.0.timing_stats.average_bucket_processing_time_ms
- is_false: jobs.0.timing_stats.exponential_average_bucket_processing_time_ms
---
"Test get all job stats with _all":
@ -317,24 +331,32 @@ setup:
- do:
ml.get_job_stats: {}
- match: { count: 2 }
- match: { jobs.0.job_id : job-stats-test }
- match: { jobs.0.data_counts.processed_record_count: 0 }
- match: { jobs.0.data_counts.processed_field_count: 0 }
- match: { jobs.0.data_counts.input_field_count: 0 }
- match: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: closed }
- match: { count: 2 }
- match: { jobs.0.job_id : job-stats-test }
- match: { jobs.0.data_counts.processed_record_count: 0 }
- match: { jobs.0.data_counts.processed_field_count: 0 }
- match: { jobs.0.data_counts.input_field_count: 0 }
- match: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: closed }
- is_false: jobs.0.node
- is_false: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- gte: { jobs.0.timing_stats.bucket_count: 0 }
- match: { jobs.1.job_id : jobs-get-stats-datafeed-job }
- match: { jobs.1.data_counts.processed_record_count: 0 }
- match: { jobs.1.data_counts.processed_field_count: 0 }
- match: { jobs.1.data_counts.input_field_count: 0 }
- match: { jobs.1.model_size_stats.model_bytes: 0 }
- match: { jobs.1.state: closed }
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- match: { jobs.0.timing_stats.bucket_count: 0 }
- is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms
- is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms
- is_false: jobs.0.timing_stats.average_bucket_processing_time_ms
- is_false: jobs.0.timing_stats.exponential_average_bucket_processing_time_ms
- match: { jobs.1.job_id : jobs-get-stats-datafeed-job }
- match: { jobs.1.data_counts.processed_record_count: 0 }
- match: { jobs.1.data_counts.processed_field_count: 0 }
- match: { jobs.1.data_counts.input_field_count: 0 }
- match: { jobs.1.model_size_stats.model_bytes: 0 }
- match: { jobs.1.state: closed }
- is_false: jobs.1.node
- is_false: jobs.1.open_time
- match: { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job }
- gte: { jobs.1.timing_stats.bucket_count: 0 }
- match: { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job }
- match: { jobs.1.timing_stats.bucket_count: 0 }
- is_false: jobs.1.timing_stats.minimum_bucket_processing_time_ms
- is_false: jobs.1.timing_stats.maximum_bucket_processing_time_ms
- is_false: jobs.1.timing_stats.average_bucket_processing_time_ms
- is_false: jobs.1.timing_stats.exponential_average_bucket_processing_time_ms