[7.x] Report timing stats as part of the Job stats response (#42709) (#43193)

This commit is contained in:
Przemysław Witek 2019-06-14 09:03:14 +02:00 committed by GitHub
parent d27c0fd50d
commit 65a584b6fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1054 additions and 102 deletions

View File

@ -0,0 +1,138 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.job.process;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/**
* Stats that give more insight into timing of various operations performed as part of anomaly detection job.
*/
public class TimingStats implements ToXContentObject {
public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ConstructingObjectParser<TimingStats, Void> PARSER =
new ConstructingObjectParser<>(
"timing_stats",
true,
args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
static {
PARSER.declareString(constructorArg(), Job.ID);
PARSER.declareLong(constructorArg(), BUCKET_COUNT);
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
}
private final String jobId;
private long bucketCount;
private Double minBucketProcessingTimeMs;
private Double maxBucketProcessingTimeMs;
private Double avgBucketProcessingTimeMs;
public TimingStats(
String jobId,
long bucketCount,
@Nullable Double minBucketProcessingTimeMs,
@Nullable Double maxBucketProcessingTimeMs,
@Nullable Double avgBucketProcessingTimeMs) {
this.jobId = jobId;
this.bucketCount = bucketCount;
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
}
public String getJobId() {
return jobId;
}
public long getBucketCount() {
return bucketCount;
}
public Double getMinBucketProcessingTimeMs() {
return minBucketProcessingTimeMs;
}
public Double getMaxBucketProcessingTimeMs() {
return maxBucketProcessingTimeMs;
}
public Double getAvgBucketProcessingTimeMs() {
return avgBucketProcessingTimeMs;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
if (minBucketProcessingTimeMs != null) {
builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs);
}
if (maxBucketProcessingTimeMs != null) {
builder.field(MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), maxBucketProcessingTimeMs);
}
if (avgBucketProcessingTimeMs != null) {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (o == this) return true;
if (o == null || getClass() != o.getClass()) return false;
TimingStats that = (TimingStats) o;
return Objects.equals(this.jobId, that.jobId)
&& this.bucketCount == that.bucketCount
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
}
@Override
public int hashCode() {
return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.client.ml.job.config.JobState;
import org.elasticsearch.client.ml.job.process.DataCounts;
import org.elasticsearch.client.ml.job.process.ModelSizeStats;
import org.elasticsearch.client.ml.job.process.TimingStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
@ -42,6 +43,7 @@ public class JobStats implements ToXContentObject {
private static final ParseField DATA_COUNTS = new ParseField("data_counts");
private static final ParseField MODEL_SIZE_STATS = new ParseField("model_size_stats");
private static final ParseField TIMING_STATS = new ParseField("timing_stats");
private static final ParseField FORECASTS_STATS = new ParseField("forecasts_stats");
private static final ParseField STATE = new ParseField("state");
private static final ParseField NODE = new ParseField("node");
@ -58,6 +60,7 @@ public class JobStats implements ToXContentObject {
JobState jobState = (JobState) a[i++];
ModelSizeStats.Builder modelSizeStatsBuilder = (ModelSizeStats.Builder) a[i++];
ModelSizeStats modelSizeStats = modelSizeStatsBuilder == null ? null : modelSizeStatsBuilder.build();
TimingStats timingStats = (TimingStats) a[i++];
ForecastStats forecastStats = (ForecastStats) a[i++];
NodeAttributes node = (NodeAttributes) a[i++];
String assignmentExplanation = (String) a[i++];
@ -66,6 +69,7 @@ public class JobStats implements ToXContentObject {
dataCounts,
jobState,
modelSizeStats,
timingStats,
forecastStats,
node,
assignmentExplanation,
@ -80,6 +84,7 @@ public class JobStats implements ToXContentObject {
STATE,
ObjectParser.ValueType.VALUE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER, MODEL_SIZE_STATS);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), TimingStats.PARSER, TIMING_STATS);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ForecastStats.PARSER, FORECASTS_STATS);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), NodeAttributes.PARSER, NODE);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ASSIGNMENT_EXPLANATION);
@ -94,22 +99,24 @@ public class JobStats implements ToXContentObject {
private final DataCounts dataCounts;
private final JobState state;
private final ModelSizeStats modelSizeStats;
private final TimingStats timingStats;
private final ForecastStats forecastStats;
private final NodeAttributes node;
private final String assignmentExplanation;
private final TimeValue openTime;
JobStats(String jobId, DataCounts dataCounts, JobState state, @Nullable ModelSizeStats modelSizeStats,
@Nullable ForecastStats forecastStats, @Nullable NodeAttributes node,
@Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
@Nullable TimingStats timingStats, @Nullable ForecastStats forecastStats, @Nullable NodeAttributes node,
@Nullable String assignmentExplanation, @Nullable TimeValue openTime) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.state = Objects.requireNonNull(state);
this.modelSizeStats = modelSizeStats;
this.timingStats = timingStats;
this.forecastStats = forecastStats;
this.node = node;
this.assignmentExplanation = assignmentExplanation;
this.openTime = opentime;
this.openTime = openTime;
}
/**
@ -135,6 +142,10 @@ public class JobStats implements ToXContentObject {
return modelSizeStats;
}
public TimingStats getTimingStats() {
return timingStats;
}
/**
* An object that provides statistical information about forecasts of this job.
* See {@link ForecastStats}
@ -182,6 +193,9 @@ public class JobStats implements ToXContentObject {
if (modelSizeStats != null) {
builder.field(MODEL_SIZE_STATS.getPreferredName(), modelSizeStats);
}
if (timingStats != null) {
builder.field(TIMING_STATS.getPreferredName(), timingStats);
}
if (forecastStats != null) {
builder.field(FORECASTS_STATS.getPreferredName(), forecastStats);
}
@ -199,7 +213,7 @@ public class JobStats implements ToXContentObject {
@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime);
return Objects.hash(jobId, dataCounts, modelSizeStats, timingStats, forecastStats, state, node, assignmentExplanation, openTime);
}
@Override
@ -216,6 +230,7 @@ public class JobStats implements ToXContentObject {
return Objects.equals(jobId, other.jobId) &&
Objects.equals(this.dataCounts, other.dataCounts) &&
Objects.equals(this.modelSizeStats, other.modelSizeStats) &&
Objects.equals(this.timingStats, other.timingStats) &&
Objects.equals(this.forecastStats, other.forecastStats) &&
Objects.equals(this.state, other.state) &&
Objects.equals(this.node, other.node) &&

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.job.process;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import static org.hamcrest.Matchers.equalTo;
public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
private static final String JOB_ID = "my-job-id";
public static TimingStats createTestInstance(String jobId) {
return new TimingStats(
jobId,
randomLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}
@Override
public TimingStats createTestInstance() {
return createTestInstance(randomAlphaOfLength(10));
}
@Override
protected TimingStats doParseInstance(XContentParser parser) {
return TimingStats.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
public void testConstructor() {
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
}
public void testConstructor_NullValues() {
TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertNull(stats.getMinBucketProcessingTimeMs());
assertNull(stats.getMaxBucketProcessingTimeMs());
assertNull(stats.getAvgBucketProcessingTimeMs());
}
public void testEquals() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
assertTrue(stats1.equals(stats1));
assertTrue(stats1.equals(stats2));
assertFalse(stats2.equals(stats3));
}
public void testHashCode() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
assertEquals(stats1.hashCode(), stats1.hashCode());
assertEquals(stats1.hashCode(), stats2.hashCode());
assertNotEquals(stats2.hashCode(), stats3.hashCode());
}
}

View File

@ -24,6 +24,8 @@ import org.elasticsearch.client.ml.job.process.DataCounts;
import org.elasticsearch.client.ml.job.process.DataCountsTests;
import org.elasticsearch.client.ml.job.process.ModelSizeStats;
import org.elasticsearch.client.ml.job.process.ModelSizeStatsTests;
import org.elasticsearch.client.ml.job.process.TimingStats;
import org.elasticsearch.client.ml.job.process.TimingStatsTests;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.client.ml.job.config.JobState;
@ -42,12 +44,14 @@ public class JobStatsTests extends AbstractXContentTestCase<JobStats> {
DataCounts dataCounts = DataCountsTests.createTestInstance(jobId);
ModelSizeStats modelSizeStats = randomBoolean() ? ModelSizeStatsTests.createRandomized() : null;
TimingStats timingStats = randomBoolean() ? TimingStatsTests.createTestInstance(jobId) : null;
ForecastStats forecastStats = randomBoolean() ? ForecastStatsTests.createRandom(1, 22) : null;
NodeAttributes nodeAttributes = randomBoolean() ? NodeAttributesTests.createRandom() : null;
String assigmentExplanation = randomBoolean() ? randomAlphaOfLength(10) : null;
TimeValue openTime = randomBoolean() ? TimeValue.timeValueMillis(randomIntBetween(1, 10000)) : null;
return new JobStats(jobId, dataCounts, state, modelSizeStats, forecastStats, nodeAttributes, assigmentExplanation, openTime);
return new JobStats(
jobId, dataCounts, state, modelSizeStats, timingStats, forecastStats, nodeAttributes, assigmentExplanation, openTime);
}
@Override

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.elasticsearch.common;
/**
* Represents an operation that accepts three arguments and returns no result.
*
* @param <S> the type of the first argument
* @param <T> the type of the second argument
* @param <U> the type of the third argument
*/
@FunctionalInterface
public interface TriConsumer<S, T, U> {
/**
* Applies this function to the given arguments.
*
* @param s the first function argument
* @param t the second function argument
* @param u the third function argument
*/
void apply(S s, T t, U u);
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@ -39,6 +40,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.Version.V_7_3_0;
public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
public static final GetJobsStatsAction INSTANCE = new GetJobsStatsAction();
@ -49,6 +52,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
private static final String FORECASTS_STATS = "forecasts_stats";
private static final String STATE = "state";
private static final String NODE = "node";
private static final String TIMING_STATS = "timing_stats";
private GetJobsStatsAction() {
super(NAME);
@ -155,22 +159,24 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
public static class JobStats implements ToXContentObject, Writeable {
private final String jobId;
private DataCounts dataCounts;
private final DataCounts dataCounts;
@Nullable
private ModelSizeStats modelSizeStats;
private final ModelSizeStats modelSizeStats;
@Nullable
private ForecastStats forecastStats;
private final ForecastStats forecastStats;
@Nullable
private TimeValue openTime;
private JobState state;
private final TimeValue openTime;
private final JobState state;
@Nullable
private DiscoveryNode node;
private final DiscoveryNode node;
@Nullable
private String assignmentExplanation;
private final String assignmentExplanation;
@Nullable
private final TimingStats timingStats;
public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
@Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node,
@Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
@Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node,
@Nullable String assignmentExplanation, @Nullable TimeValue openTime, @Nullable TimingStats timingStats) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
@ -178,7 +184,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
this.state = Objects.requireNonNull(state);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
this.openTime = opentime;
this.openTime = openTime;
this.timingStats = timingStats;
}
public JobStats(StreamInput in) throws IOException {
@ -191,6 +198,13 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
openTime = in.readOptionalTimeValue();
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
forecastStats = in.readOptionalWriteable(ForecastStats::new);
} else {
forecastStats = null;
}
if (in.getVersion().onOrAfter(V_7_3_0)) {
timingStats = in.readOptionalWriteable(TimingStats::new);
} else {
timingStats = null;
}
}
@ -226,6 +240,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
return openTime;
}
public TimingStats getTimingStats() {
return timingStats;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// TODO: Have callers wrap the content with an object as they choose rather than forcing it upon them
@ -267,6 +285,9 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
if (openTime != null) {
builder.field("open_time", openTime.getStringRep());
}
if (timingStats != null) {
builder.field(TIMING_STATS, timingStats);
}
return builder;
}
@ -282,11 +303,15 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
out.writeOptionalWriteable(forecastStats);
}
if (out.getVersion().onOrAfter(V_7_3_0)) {
out.writeOptionalWriteable(timingStats);
}
}
@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime);
return Objects.hash(
jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime, timingStats);
}
@Override
@ -298,14 +323,15 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
return false;
}
JobStats other = (JobStats) obj;
return Objects.equals(jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.forecastStats, other.forecastStats)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation)
&& Objects.equals(this.openTime, other.openTime);
return Objects.equals(this.jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.forecastStats, other.forecastStats)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation)
&& Objects.equals(this.openTime, other.openTime)
&& Objects.equals(this.timingStats, other.timingStats);
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyCause;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
@ -449,6 +450,7 @@ public class ElasticsearchMappings {
addResultsMapping(builder);
addCategoryDefinitionMapping(builder);
addDataCountsMapping(builder);
addTimingStatsExceptBucketCountMapping(builder);
addModelSnapshotMapping(builder);
addTermFields(builder, extraTermFields);
@ -790,8 +792,6 @@ public class ElasticsearchMappings {
/**
* {@link DataCounts} mapping.
* The type is disabled so {@link DataCounts} aren't searchable and
* the '_all' field is disabled
*
* @throws IOException On builder write error
*/
@ -846,6 +846,26 @@ public class ElasticsearchMappings {
.endObject();
}
/**
* {@link TimingStats} mapping.
* Does not include mapping for BUCKET_COUNT as this mapping is added by {@link #addDataCountsMapping} method.
*
* @throws IOException On builder write error
*/
private static void addTimingStatsExceptBucketCountMapping(XContentBuilder builder) throws IOException {
builder
// re-used: BUCKET_COUNT
.startObject(TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain CategoryDefinition}.
* The '_all' field is disabled as the document isn't meant to be searched.

View File

@ -0,0 +1,210 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/**
* Stats that give more insight into timing of various operations performed as part of anomaly detection job.
*/
public class TimingStats implements ToXContentObject, Writeable {
public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField TYPE = new ParseField("timing_stats");
public static final ConstructingObjectParser<TimingStats, Void> PARSER =
new ConstructingObjectParser<>(
TYPE.getPreferredName(),
true,
args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
static {
PARSER.declareString(constructorArg(), Job.ID);
PARSER.declareLong(constructorArg(), BUCKET_COUNT);
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
}
public static String documentId(String jobId) {
return jobId + "_timing_stats";
}
private final String jobId;
private long bucketCount;
private Double minBucketProcessingTimeMs;
private Double maxBucketProcessingTimeMs;
private Double avgBucketProcessingTimeMs;
public TimingStats(
String jobId,
long bucketCount,
@Nullable Double minBucketProcessingTimeMs,
@Nullable Double maxBucketProcessingTimeMs,
@Nullable Double avgBucketProcessingTimeMs) {
this.jobId = jobId;
this.bucketCount = bucketCount;
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
}
public TimingStats(String jobId) {
this(jobId, 0, null, null, null);
}
public TimingStats(TimingStats lhs) {
this(lhs.jobId, lhs.bucketCount, lhs.minBucketProcessingTimeMs, lhs.maxBucketProcessingTimeMs, lhs.avgBucketProcessingTimeMs);
}
public TimingStats(StreamInput in) throws IOException {
this.jobId = in.readString();
this.bucketCount = in.readLong();
this.minBucketProcessingTimeMs = in.readOptionalDouble();
this.maxBucketProcessingTimeMs = in.readOptionalDouble();
this.avgBucketProcessingTimeMs = in.readOptionalDouble();
}
public String getJobId() {
return jobId;
}
public long getBucketCount() {
return bucketCount;
}
public Double getMinBucketProcessingTimeMs() {
return minBucketProcessingTimeMs;
}
public Double getMaxBucketProcessingTimeMs() {
return maxBucketProcessingTimeMs;
}
public Double getAvgBucketProcessingTimeMs() {
return avgBucketProcessingTimeMs;
}
/**
* Updates the statistics (min, max, avg) for the given data point (bucket processing time).
*/
public void updateStats(double bucketProcessingTimeMs) {
if (bucketProcessingTimeMs < 0.0) {
throw new IllegalArgumentException("bucketProcessingTimeMs must be positive, was: " + bucketProcessingTimeMs);
}
if (minBucketProcessingTimeMs == null || bucketProcessingTimeMs < minBucketProcessingTimeMs) {
minBucketProcessingTimeMs = bucketProcessingTimeMs;
}
if (maxBucketProcessingTimeMs == null || bucketProcessingTimeMs > maxBucketProcessingTimeMs) {
maxBucketProcessingTimeMs = bucketProcessingTimeMs;
}
if (avgBucketProcessingTimeMs == null) {
avgBucketProcessingTimeMs = bucketProcessingTimeMs;
} else {
// Calculate the cumulative moving average (see https://en.wikipedia.org/wiki/Moving_average#Cumulative_moving_average) of
// bucket processing times.
avgBucketProcessingTimeMs = (bucketCount * avgBucketProcessingTimeMs + bucketProcessingTimeMs) / (bucketCount + 1);
}
bucketCount++;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(bucketCount);
out.writeOptionalDouble(minBucketProcessingTimeMs);
out.writeOptionalDouble(maxBucketProcessingTimeMs);
out.writeOptionalDouble(avgBucketProcessingTimeMs);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
if (minBucketProcessingTimeMs != null) {
builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs);
}
if (maxBucketProcessingTimeMs != null) {
builder.field(MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), maxBucketProcessingTimeMs);
}
if (avgBucketProcessingTimeMs != null) {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (o == this) return true;
if (o == null || getClass() != o.getClass()) return false;
TimingStats that = (TimingStats) o;
return Objects.equals(this.jobId, that.jobId)
&& this.bucketCount == that.bucketCount
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
}
@Override
public int hashCode() {
return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
}
@Override
public String toString() {
return Strings.toString(this);
}
/**
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
*/
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
|| differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs);
}
/**
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
* This can be interpreted as values { value1, value2 } differing significantly from each other.
* This method also returns:
* - {@code true} in case one value is {@code null} while the other is not.
* - {@code false} in case both values are {@code null}.
*/
static boolean differSignificantly(Double value1, Double value2) {
if (value1 != null && value2 != null) {
return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
}
return (value1 != null) || (value2 != null);
}
/**
* Minimum ratio of values that is interpreted as values being similar.
* If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
*/
private static final double MIN_VALID_RATIO = 0.9;
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import java.util.Arrays;
import java.util.HashSet;
@ -174,6 +175,11 @@ public final class ReservedFieldNames {
Result.TIMESTAMP.getPreferredName(),
Result.IS_INTERIM.getPreferredName(),
TimingStats.BUCKET_COUNT.getPreferredName(),
TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
GetResult._ID,
GetResult._INDEX,
GetResult._TYPE

View File

@ -18,6 +18,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStatsTests;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests;
@ -38,35 +40,19 @@ public class GetJobStatsActionResponseTests extends AbstractWireSerializingTestC
List<Response.JobStats> jobStatsList = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
String jobId = randomAlphaOfLength(10);
DataCounts dataCounts = new DataCountsTests().createTestInstance();
ModelSizeStats sizeStats = null;
if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build();
}
ForecastStats forecastStats = null;
if (randomBoolean()) {
forecastStats = new ForecastStatsTests().createTestInstance();
}
ModelSizeStats sizeStats = randomBoolean() ? null : new ModelSizeStats.Builder("foo").build();
ForecastStats forecastStats = randomBoolean() ? null : new ForecastStatsTests().createTestInstance();
JobState jobState = randomFrom(EnumSet.allOf(JobState.class));
DiscoveryNode node = null;
if (randomBoolean()) {
node = new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
}
String explanation = null;
if (randomBoolean()) {
explanation = randomAlphaOfLength(3);
}
TimeValue openTime = null;
if (randomBoolean()) {
openTime = parseTimeValue(randomPositiveTimeValue(), "open_time-Test");
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, forecastStats, jobState, node, explanation,
openTime);
DiscoveryNode node =
randomBoolean()
? null
: new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
String explanation = randomBoolean() ? null : randomAlphaOfLength(3);
TimeValue openTime = randomBoolean() ? null : parseTimeValue(randomPositiveTimeValue(), "open_time-Test");
TimingStats timingStats = randomBoolean() ? null : TimingStatsTests.createTestInstance("foo");
Response.JobStats jobStats =
new Response.JobStats(jobId, dataCounts, sizeStats, forecastStats, jobState, node, explanation, openTime, timingStats);
jobStatsList.add(jobStats);
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames;
@ -76,10 +77,12 @@ public class ElasticsearchMappingsTests extends ESTestCase {
// These are not reserved because they're data types, not field names
overridden.add(Result.TYPE.getPreferredName());
overridden.add(DataCounts.TYPE.getPreferredName());
overridden.add(TimingStats.TYPE.getPreferredName());
overridden.add(CategoryDefinition.TYPE.getPreferredName());
overridden.add(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
overridden.add(ModelSnapshot.TYPE.getPreferredName());
overridden.add(Quantiles.TYPE.getPreferredName());
overridden.add(TimingStats.TYPE.getPreferredName());
Set<String> expected = collectResultsDocFieldNames();
expected.removeAll(overridden);

View File

@ -0,0 +1,146 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
private static final String JOB_ID = "my-job-id";
public static TimingStats createTestInstance(String jobId) {
return new TimingStats(
jobId,
randomLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}
@Override
public TimingStats createTestInstance() {
return createTestInstance(randomAlphaOfLength(10));
}
@Override
protected Writeable.Reader<TimingStats> instanceReader() {
return TimingStats::new;
}
@Override
protected TimingStats doParseInstance(XContentParser parser) {
return TimingStats.PARSER.apply(parser, null);
}
public void testEquals() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
assertTrue(stats1.equals(stats1));
assertTrue(stats1.equals(stats2));
assertFalse(stats2.equals(stats3));
}
public void testHashCode() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
assertEquals(stats1.hashCode(), stats1.hashCode());
assertEquals(stats1.hashCode(), stats2.hashCode());
assertNotEquals(stats2.hashCode(), stats3.hashCode());
}
public void testDefaultConstructor() {
TimingStats stats = new TimingStats(JOB_ID);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(0L));
assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
}
public void testConstructor() {
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
}
public void testCopyConstructor() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(stats1);
assertThat(stats2.getJobId(), equalTo(JOB_ID));
assertThat(stats2.getBucketCount(), equalTo(7L));
assertThat(stats2.getMinBucketProcessingTimeMs(), equalTo(1.0));
assertThat(stats2.getMaxBucketProcessingTimeMs(), equalTo(2.0));
assertThat(stats2.getAvgBucketProcessingTimeMs(), equalTo(1.23));
assertEquals(stats1, stats2);
assertEquals(stats1.hashCode(), stats2.hashCode());
}
public void testUpdateStats() {
TimingStats stats = new TimingStats(JOB_ID);
stats.updateStats(3);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 1, 3.0, 3.0, 3.0)));
stats.updateStats(2);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 2, 2.0, 3.0, 2.5)));
stats.updateStats(4);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 3, 2.0, 4.0, 3.0)));
stats.updateStats(1);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 4, 1.0, 4.0, 2.5)));
stats.updateStats(5);
assertThat(stats, equalTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0)));
}
public void testDocumentId() {
assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats"));
}
public void testTimingStatsDifferSignificantly() {
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0)),
is(false));
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0)),
is(false));
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0)),
is(true));
}
public void testValuesDifferSignificantly() {
assertThat(TimingStats.differSignificantly((Double) null, (Double) null), is(false));
assertThat(TimingStats.differSignificantly(1.0, null), is(true));
assertThat(TimingStats.differSignificantly(null, 1.0), is(true));
assertThat(TimingStats.differSignificantly(0.9, 1.0), is(false));
assertThat(TimingStats.differSignificantly(1.0, 0.9), is(false));
assertThat(TimingStats.differSignificantly(0.9, 1.000001), is(true));
assertThat(TimingStats.differSignificantly(1.0, 0.899999), is(true));
assertThat(TimingStats.differSignificantly(0.0, 1.0), is(true));
assertThat(TimingStats.differSignificantly(1.0, 0.0), is(true));
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
@ -29,6 +30,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
@ -42,7 +44,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -103,16 +104,19 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
String jobId = task.getJobId();
ClusterState state = clusterService.state();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(task);
Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> stats = processManager.getStatistics(task);
if (stats.isPresent()) {
DataCounts dataCounts = stats.get().v1();
ModelSizeStats modelSizeStats = stats.get().v2().v1();
TimingStats timingStats = stats.get().v2().v2();
PersistentTasksCustomMetaData.PersistentTask<?> pTask = MlTasks.getJobTask(jobId, tasks);
DiscoveryNode node = state.nodes().get(pTask.getExecutorNode());
JobState jobState = MlTasks.getJobState(jobId, tasks);
String assignmentExplanation = pTask.getAssignment().getExplanation();
TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task));
gatherForecastStats(jobId, forecastStats -> {
JobStats jobStats = new JobStats(jobId, stats.get().v1(),
stats.get().v2(), forecastStats, jobState, node, assignmentExplanation, openTime);
JobStats jobStats = new JobStats(
jobId, dataCounts, modelSizeStats, forecastStats, jobState, node, assignmentExplanation, openTime, timingStats);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
}, listener::onFailure);
@ -138,7 +142,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
int slot = i;
String jobId = closedJobIds.get(i);
gatherForecastStats(jobId, forecastStats -> {
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
gatherDataCountsModelSizeStatsAndTimingStats(jobId, (dataCounts, modelSizeStats, timingStats) -> {
JobState jobState = MlTasks.getJobState(jobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> pTask = MlTasks.getJobTask(jobId, tasks);
String assignmentExplanation = null;
@ -146,7 +150,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState,
null, assignmentExplanation, null));
null, assignmentExplanation, null, timingStats));
if (counter.decrementAndGet() == 0) {
List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
@ -163,11 +167,13 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
jobResultsProvider.getForecastStats(jobId, handler, errorHandler);
}
void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer<DataCounts, ModelSizeStats> handler,
Consumer<Exception> errorHandler) {
void gatherDataCountsModelSizeStatsAndTimingStats(
String jobId, TriConsumer<DataCounts, ModelSizeStats, TimingStats> handler, Consumer<Exception> errorHandler) {
jobResultsProvider.dataCounts(jobId, dataCounts -> {
jobResultsProvider.modelSizeStats(jobId, modelSizeStats -> {
handler.accept(dataCounts, modelSizeStats);
jobResultsProvider.timingStats(jobId, timingStats -> {
handler.apply(dataCounts, modelSizeStats, timingStats);
}, errorHandler);
}, errorHandler);
}, errorHandler);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer;
@ -121,6 +122,17 @@ public class JobResultsPersister {
}
}
/**
* Persist timing stats
*
* @param timingStats timing stats to persist
* @return this
*/
public Builder persistTimingStats(TimingStats timingStats) {
indexResult(TimingStats.documentId(timingStats.getJobId()), timingStats, TimingStats.TYPE.getPreferredName());
return this;
}
/**
* Persist a list of anomaly records
*

View File

@ -91,6 +91,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
@ -417,6 +418,30 @@ public class JobResultsProvider {
.addSort(SortBuilders.fieldSort(DataCounts.LATEST_RECORD_TIME.getPreferredName()).order(SortOrder.DESC));
}
/**
* Get the job's timing stats
*
* @param jobId The job id
*/
public void timingStats(String jobId, Consumer<TimingStats> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
searchSingleResult(
jobId,
TimingStats.TYPE.getPreferredName(),
createTimingStatsSearch(indexName, jobId),
TimingStats.PARSER,
result -> handler.accept(result.result),
errorHandler,
() -> new TimingStats(jobId));
}
private SearchRequestBuilder createTimingStatsSearch(String indexName, String jobId) {
return client.prepareSearch(indexName)
.setSize(1)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(TimingStats.documentId(jobId)));
}
public void getAutodetectParams(Job job, Consumer<AutodetectParams> consumer, Consumer<Exception> errorHandler) {
String jobId = job.getId();
@ -443,6 +468,7 @@ public class JobResultsProvider {
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createLatestDataCountsSearch(resultsIndex, jobId))
.add(createLatestModelSizeStatsSearch(resultsIndex))
.add(createTimingStatsSearch(resultsIndex, jobId))
// These next two document IDs never need to be the legacy ones due to the rule
// that you cannot open a 5.4 job in a subsequent version of the product
.add(createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, job.getModelSnapshotId())))
@ -504,6 +530,8 @@ public class JobResultsProvider {
String hitId = hit.getId();
if (DataCounts.documentId(jobId).equals(hitId)) {
paramsBuilder.setDataCounts(parseSearchHit(hit, DataCounts.PARSER, errorHandler));
} else if (TimingStats.documentId(jobId).equals(hitId)) {
paramsBuilder.setTimingStats(parseSearchHit(hit, TimingStats.PARSER, errorHandler));
} else if (hitId.startsWith(ModelSizeStats.documentIdPrefix(jobId))) {
ModelSizeStats.Builder modelSizeStats = parseSearchHit(hit, ModelSizeStats.LENIENT_PARSER, errorHandler);
paramsBuilder.setModelSizeStats(modelSizeStats == null ? null : modelSizeStats.build());

View File

@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknow
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
@ -334,6 +335,10 @@ public class AutodetectCommunicator implements Closeable {
return autoDetectResultProcessor.modelSizeStats();
}
public TimingStats getTimingStats() {
return autoDetectResultProcessor.timingStats();
}
public DataCounts getDataCounts() {
return dataCountsReporter.runningTotalStats();
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknow
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
@ -509,8 +510,15 @@ public class AutodetectProcessManager implements ClusterStateListener {
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService,
onProcessCrash(jobTask));
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, auditor, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
AutoDetectResultProcessor processor =
new AutoDetectResultProcessor(
client,
auditor,
jobId,
renormalizer,
jobResultsPersister,
autodetectParams.modelSizeStats(),
autodetectParams.timingStats());
ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
@ -716,12 +724,13 @@ public class AutodetectProcessManager implements ClusterStateListener {
});
}
public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(JobTask jobTask) {
public Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> getStatistics(JobTask jobTask) {
AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask);
if (communicator == null) {
return Optional.empty();
}
return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats()));
return Optional.of(
new Tuple<>(communicator.getDataCounts(), new Tuple<>(communicator.getModelSizeStats(), communicator.getTimingStats())));
}
ExecutorService createAutodetectExecutorService(ExecutorService executorService) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknow
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
@ -89,13 +90,26 @@ public class AutoDetectResultProcessor {
*/
private volatile ModelSizeStats latestModelSizeStats;
/**
* Current timing stats
*/
private volatile TimingStats timingStats;
/**
* Persisted timing stats. May be stale
*/
private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile
public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister, ModelSizeStats latestModelSizeStats) {
this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, new FlushListener());
JobResultsPersister persister,
ModelSizeStats latestModelSizeStats,
TimingStats timingStats) {
this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener());
}
AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister, ModelSizeStats latestModelSizeStats, FlushListener flushListener) {
JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats,
FlushListener flushListener) {
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
this.jobId = Objects.requireNonNull(jobId);
@ -103,6 +117,8 @@ public class AutoDetectResultProcessor {
this.persister = Objects.requireNonNull(persister);
this.flushListener = Objects.requireNonNull(flushListener);
this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
this.persistedTimingStats = Objects.requireNonNull(timingStats);
this.timingStats = new TimingStats(persistedTimingStats);
}
public void process(AutodetectProcess process) {
@ -116,7 +132,9 @@ public class AutoDetectResultProcessor {
try {
if (processKilled == false) {
context.bulkResultsPersister.executeRequest();
context.bulkResultsPersister
.persistTimingStats(timingStats)
.executeRequest();
}
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
@ -194,7 +212,9 @@ public class AutoDetectResultProcessor {
// persist after deleting interim results in case the new
// results are also interim
processTimingStats(context, bucket.getProcessingTimeMs());
context.bulkResultsPersister.persistBucket(bucket).executeRequest();
++bucketCount;
}
List<AnomalyRecord> records = result.getRecords();
@ -277,6 +297,15 @@ public class AutoDetectResultProcessor {
}
}
private void processTimingStats(Context context, long bucketProcessingTimeMs) {
timingStats.updateStats(bucketProcessingTimeMs);
if (TimingStats.differSignificantly(timingStats, persistedTimingStats)) {
context.bulkResultsPersister.persistTimingStats(timingStats);
persistedTimingStats = timingStats;
timingStats = new TimingStats(persistedTimingStats);
}
}
private void processModelSizeStats(Context context, ModelSizeStats modelSizeStats) {
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
context.jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
@ -407,5 +436,8 @@ public class AutoDetectResultProcessor {
public ModelSizeStats modelSizeStats() {
return latestModelSizeStats;
}
}
public TimingStats timingStats() {
return timingStats;
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import java.util.ArrayList;
import java.util.HashSet;
@ -24,6 +25,8 @@ public class AutodetectParams {
private final DataCounts dataCounts;
private final ModelSizeStats modelSizeStats;
@Nullable
private final TimingStats timingStats;
@Nullable
private final ModelSnapshot modelSnapshot;
@Nullable
private final Quantiles quantiles;
@ -31,12 +34,13 @@ public class AutodetectParams {
private final List<ScheduledEvent> scheduledEvents;
private AutodetectParams(DataCounts dataCounts, ModelSizeStats modelSizeStats,
private AutodetectParams(DataCounts dataCounts, ModelSizeStats modelSizeStats, TimingStats timingStats,
@Nullable ModelSnapshot modelSnapshot,
@Nullable Quantiles quantiles, Set<MlFilter> filters,
List<ScheduledEvent> scheduledEvents) {
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = Objects.requireNonNull(modelSizeStats);
this.timingStats = timingStats;
this.modelSnapshot = modelSnapshot;
this.quantiles = quantiles;
this.filters = filters;
@ -51,6 +55,10 @@ public class AutodetectParams {
return modelSizeStats;
}
public TimingStats timingStats() {
return timingStats;
}
@Nullable
public ModelSnapshot modelSnapshot() {
return modelSnapshot;
@ -83,6 +91,7 @@ public class AutodetectParams {
return Objects.equals(this.dataCounts, that.dataCounts)
&& Objects.equals(this.modelSizeStats, that.modelSizeStats)
&& Objects.equals(this.timingStats, that.timingStats)
&& Objects.equals(this.modelSnapshot, that.modelSnapshot)
&& Objects.equals(this.quantiles, that.quantiles)
&& Objects.equals(this.filters, that.filters)
@ -91,13 +100,14 @@ public class AutodetectParams {
@Override
public int hashCode() {
return Objects.hash(dataCounts, modelSizeStats, modelSnapshot, quantiles, filters, scheduledEvents);
return Objects.hash(dataCounts, modelSizeStats, timingStats, modelSnapshot, quantiles, filters, scheduledEvents);
}
public static class Builder {
private DataCounts dataCounts;
private ModelSizeStats modelSizeStats;
private TimingStats timingStats;
private ModelSnapshot modelSnapshot;
private Quantiles quantiles;
private Set<MlFilter> filters;
@ -106,6 +116,7 @@ public class AutodetectParams {
public Builder(String jobId) {
dataCounts = new DataCounts(jobId);
modelSizeStats = new ModelSizeStats.Builder(jobId).build();
timingStats = new TimingStats(jobId);
filters = new HashSet<>();
scheduledEvents = new ArrayList<>();
}
@ -124,6 +135,11 @@ public class AutodetectParams {
return this;
}
public Builder setTimingStats(TimingStats timingStats) {
this.timingStats = new TimingStats(timingStats);
return this;
}
public Builder setModelSnapshot(ModelSnapshot modelSnapshot) {
this.modelSnapshot = modelSnapshot;
return this;
@ -150,7 +166,7 @@ public class AutodetectParams {
}
public AutodetectParams build() {
return new AutodetectParams(dataCounts, modelSizeStats, modelSnapshot, quantiles,
return new AutodetectParams(dataCounts, modelSizeStats, timingStats, modelSnapshot, quantiles,
filters, scheduledEvents);
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import java.time.Duration;
import java.util.Arrays;
@ -27,8 +28,11 @@ public class TransportGetJobsStatsActionTests extends ESTestCase {
assertEquals(1, result.size());
assertEquals("id1", result.get(0));
result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null)));
result = determineJobIdsWithoutLiveStats(
Collections.singletonList("id1"),
Collections.singletonList(
new GetJobsStatsAction.Response.JobStats(
"id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null, new TimingStats("id1"))));
assertEquals(0, result.size());
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Collections.emptyList());
@ -39,23 +43,28 @@ public class TransportGetJobsStatsActionTests extends ESTestCase {
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"),
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null,
JobState.OPENED, null, null, null))
JobState.OPENED, null, null, null, new TimingStats("id1")))
);
assertEquals(2, result.size());
assertEquals("id2", result.get(0));
assertEquals("id3", result.get(1));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null)
new GetJobsStatsAction.Response.JobStats(
"id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null, new TimingStats("id1")),
new GetJobsStatsAction.Response.JobStats(
"id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null, new TimingStats("id3"))
));
assertEquals(1, result.size());
assertEquals("id2", result.get(0));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null)));
new GetJobsStatsAction.Response.JobStats(
"id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null, new TimingStats("id1")),
new GetJobsStatsAction.Response.JobStats(
"id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null, new TimingStats("id2")),
new GetJobsStatsAction.Response.JobStats(
"id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null, new TimingStats("id3"))));
assertEquals(0, result.size());
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknow
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
@ -89,7 +90,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
renormalizer = mock(Renormalizer.class);
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build()) {
new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) {
@Override
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
@ -116,8 +117,8 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
builder.addInfluencers(influencers);
CategoryDefinition categoryDefinition = createCategoryDefinition();
builder.addCategoryDefinition(categoryDefinition);
ModelPlot modelPlot = createmodelPlot();
builder.addmodelPlot(modelPlot);
ModelPlot modelPlot = createModelPlot();
builder.addModelPlot(modelPlot);
ModelSizeStats modelSizeStats = createModelSizeStats();
builder.addModelSizeStats(modelSizeStats);
ModelSnapshot modelSnapshot = createModelSnapshot();
@ -326,7 +327,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
return new CategoryDefinitionTests().createTestInstance(JOB_ID);
}
private ModelPlot createmodelPlot() {
private ModelPlot createModelPlot() {
return new ModelPlotTests().createTestInstance(JOB_ID);
}
@ -379,7 +380,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
return this;
}
ResultsBuilder addmodelPlot(ModelPlot modelPlot) {
ResultsBuilder addModelPlot(ModelPlot modelPlot) {
results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null, null, null));
return this;
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer;
@ -26,8 +27,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -195,6 +199,32 @@ public class JobResultsPersisterTests extends ESTestCase {
verifyNoMoreInteractions(client);
}
public void testPersistTimingStats() {
ArgumentCaptor<BulkRequest> bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class);
Client client = mockClient(bulkRequestCaptor);
JobResultsPersister persister = new JobResultsPersister(client);
TimingStats timingStats = new TimingStats("foo", 7, 1.0, 2.0, 1.23);
persister.bulkPersisterBuilder(JOB_ID).persistTimingStats(timingStats).executeRequest();
verify(client, times(1)).bulk(bulkRequestCaptor.capture());
BulkRequest bulkRequest = bulkRequestCaptor.getValue();
assertThat(bulkRequest.requests().size(), equalTo(1));
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0);
assertThat(indexRequest.index(), equalTo(".ml-anomalies-.write-foo"));
assertThat(indexRequest.id(), equalTo("foo_timing_stats"));
Map<String, Object> expectedSourceAsMap = new HashMap<>();
expectedSourceAsMap.put("job_id", "foo");
expectedSourceAsMap.put("bucket_count", 7);
expectedSourceAsMap.put("minimum_bucket_processing_time_ms", 1.0);
expectedSourceAsMap.put("maximum_bucket_processing_time_ms", 2.0);
expectedSourceAsMap.put("average_bucket_processing_time_ms", 1.23);
assertThat(indexRequest.sourceAsMap(), equalTo(expectedSourceAsMap));
verify(client, times(1)).threadPool();
verifyNoMoreInteractions(client);
}
@SuppressWarnings({"unchecked"})
private Client mockClient(ArgumentCaptor<BulkRequest> captor) {
Client client = mock(Client.class);

View File

@ -13,7 +13,9 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
@ -44,6 +46,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
@ -54,6 +57,7 @@ import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -70,6 +74,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class JobResultsProviderTests extends ESTestCase {
@ -824,6 +829,56 @@ public class JobResultsProviderTests extends ESTestCase {
assertEquals(7, JobResultsProvider.countFields(Collections.singletonMap("properties", mapping)));
}
public void testTimingStats_Ok() throws IOException {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName("foo");
Map<String, Object> timingStatsMap = new HashMap<>();
timingStatsMap.put(Job.ID.getPreferredName(), "foo");
timingStatsMap.put(TimingStats.BUCKET_COUNT.getPreferredName(), 7);
timingStatsMap.put(TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1.0);
timingStatsMap.put(TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1000.0);
timingStatsMap.put(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 666.0);
List<Map<String, Object>> source = Arrays.asList(timingStatsMap);
SearchResponse response = createSearchResponse(source);
Client client = getMockedClient(
queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")),
response);
when(client.prepareSearch(indexName)).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(indexName));
JobResultsProvider provider = createProvider(client);
provider.timingStats(
"foo",
stats -> assertThat(stats, equalTo(new TimingStats("foo", 7, 1.0, 1000.0, 666.0))),
e -> { throw new AssertionError(); });
verify(client).prepareSearch(indexName);
verify(client).threadPool();
verify(client).search(any(SearchRequest.class), any(ActionListener.class));
verifyNoMoreInteractions(client);
}
public void testTimingStats_NotFound() throws IOException {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName("foo");
List<Map<String, Object>> source = new ArrayList<>();
SearchResponse response = createSearchResponse(source);
Client client = getMockedClient(
queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")),
response);
when(client.prepareSearch(indexName)).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(indexName));
JobResultsProvider provider = createProvider(client);
provider.timingStats(
"foo",
stats -> assertThat(stats, equalTo(new TimingStats("foo"))),
e -> { throw new AssertionError(); });
verify(client).prepareSearch(indexName);
verify(client).threadPool();
verify(client).search(any(SearchRequest.class), any(ActionListener.class));
verifyNoMoreInteractions(client);
}
private Bucket createBucketAtEpochTime(long epoch) {
return new Bucket("foo", new Date(epoch), 123);
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknow
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
@ -93,8 +94,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(persister.persistModelSnapshot(any(), any()))
.thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true));
flushListener = mock(FlushListener.class);
processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister,
new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), flushListener);
processorUnderTest = new AutoDetectResultProcessor(
client,
auditor,
JOB_ID,
renormalizer,
persister,
new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(),
new TimingStats(JOB_ID),
flushListener);
}
@After
@ -121,6 +129,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_bucket() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
@ -130,6 +139,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getBucket()).thenReturn(bucket);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest();
verify(persister, never()).deleteInterimResults(JOB_ID);
@ -139,6 +149,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_bucket_deleteInterimRequired() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
@ -148,6 +159,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getBucket()).thenReturn(bucket);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest();
verify(persister, times(1)).deleteInterimResults(JOB_ID);

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import static org.hamcrest.Matchers.equalTo;
public class AutodetectParamsTests extends ESTestCase {
private static final String JOB_ID = "my-job";
public void testBuilder_WithTimingStats() {
TimingStats timingStats = new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0);
AutodetectParams params = new AutodetectParams.Builder(JOB_ID).setTimingStats(timingStats).build();
assertThat(params.timingStats(), equalTo(timingStats));
timingStats.updateStats(2000.0);
assertThat(timingStats, equalTo(new TimingStats(JOB_ID, 8, 1.0, 2000.0, 832.75)));
assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0)));
}
public void testBuilder_WithoutTimingStats() {
AutodetectParams params = new AutodetectParams.Builder(JOB_ID).build();
assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID)));
}
}

View File

@ -61,7 +61,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
bucket.setInterim(randomBoolean());
}
if (randomBoolean()) {
bucket.setProcessingTimeMs(randomLong());
bucket.setProcessingTimeMs(randomNonNegativeLong());
}
if (randomBoolean()) {
int size = randomInt(10);

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response.JobSta
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
@ -102,8 +103,9 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
final DataCounts dataCounts = new DataCounts("_job_id", 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, date3, date4, date5, date6, date7);
final ForecastStats forecastStats = new ForecastStats();
final JobStats jobStats = new JobStats("_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode,
"_explanation", time);
final TimingStats timingStats = new TimingStats("_job_id", 100, 10.0, 30.0, 20.0);
final JobStats jobStats = new JobStats(
"_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode, "_explanation", time, timingStats);
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);
final JobStatsMonitoringDoc document = new JobStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, jobStats);
@ -169,8 +171,15 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
+ "}"
+ "},"
+ "\"assignment_explanation\":\"_explanation\","
+ "\"open_time\":\"13h\""
+ "}"
+ "\"open_time\":\"13h\","
+ "\"timing_stats\":{"
+ "\"job_id\":\"_job_id\","
+ "\"bucket_count\":100,"
+ "\"minimum_bucket_processing_time_ms\":10.0,"
+ "\"maximum_bucket_processing_time_ms\":30.0,"
+ "\"average_bucket_processing_time_ms\":20.0"
+ "}"
+ "}"
+ "}", xContent.utf8ToString());
}
}

View File

@ -124,7 +124,7 @@ setup:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
count:
index: .ml-anomalies-shared
- match: {count: 6}
- match: {count: 8}
- do:
headers:
@ -138,7 +138,7 @@ setup:
term:
job_id: index-layout-job
- match: {count: 3}
- match: {count: 4}
- do:
headers:
@ -152,7 +152,7 @@ setup:
term:
job_id: index-layout-job
- match: {count: 3}
- match: {count: 4}
- do:
headers:
@ -166,7 +166,7 @@ setup:
term:
job_id: index-layout-job2
- match: {count: 3}
- match: {count: 4}
- do:
headers:
@ -179,7 +179,7 @@ setup:
filter:
term:
job_id: index-layout-job2
- match: {count: 3}
- match: {count: 4}
- do:
headers:
@ -236,7 +236,7 @@ setup:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
count:
index: .ml-anomalies-shared
- match: {count: 3}
- match: {count: 4}
- do:
@ -251,7 +251,7 @@ setup:
term:
job_id: index-layout-job2
- match: {count: 3}
- match: {count: 4}
- do:
headers:
@ -265,7 +265,7 @@ setup:
term:
job_id: index-layout-job2
- match: {count: 3}
- match: {count: 4}
- do:
ml.delete_job:

View File

@ -98,6 +98,8 @@ setup:
- is_true: jobs.0.node.transport_address
- match: { jobs.0.node.attributes.ml\.max_open_jobs: "20"}
- is_true: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- gte: { jobs.0.timing_stats.bucket_count: 0 }
---
"Test get job stats for closed job":
@ -130,6 +132,8 @@ setup:
- match: { jobs.0.state: closed }
- is_false: jobs.0.node
- is_false: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- gte: { jobs.0.timing_stats.bucket_count: 0 }
---
"Test get job stats of datafeed job that has not received any data":
@ -142,6 +146,8 @@ setup:
- match: { jobs.0.model_size_stats.model_bytes : 0 }
- match: { jobs.0.state: opened }
- is_true: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job }
- match: { jobs.0.timing_stats.bucket_count: 0 }
---
"Test get all job stats with _all":
@ -320,6 +326,8 @@ setup:
- match: { jobs.0.state: closed }
- is_false: jobs.0.node
- is_false: jobs.0.open_time
- match: { jobs.0.timing_stats.job_id: job-stats-test }
- gte: { jobs.0.timing_stats.bucket_count: 0 }
- match: { jobs.1.job_id : jobs-get-stats-datafeed-job }
- match: { jobs.1.data_counts.processed_record_count: 0 }
- match: { jobs.1.data_counts.processed_field_count: 0 }
@ -328,3 +336,5 @@ setup:
- match: { jobs.1.state: closed }
- is_false: jobs.1.node
- is_false: jobs.1.open_time
- match: { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job }
- gte: { jobs.1.timing_stats.bucket_count: 0 }