[ML] Return statistics about forecasts as part of the jobsstats and usage API (#31647)

This change adds stats about forecasts, to the jobstats api as well as xpack/_usage. The following 
information is collected:

_xpack/ml/anomaly_detectors/{jobid|_all}/_stats:

 -  total number of forecasts
 -  memory statistics (mean/min/max)
 -  runtime statistics
 -  record statistics
 -  counts by status

_xpack/usage

 -  collected by job status as well as overall (_all):
     -  total number of forecasts
     -  number of jobs that have at least 1 forecast
     -  memory, runtime, record statistics
     -  counts by status

Fixes #31395
This commit is contained in:
Hendrik Muhs 2018-07-04 08:15:45 +02:00 committed by GitHub
parent 32d67ef504
commit e9f8442bee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1071 additions and 160 deletions

View File

@ -20,6 +20,10 @@ progress of a job.
(object) An object that provides information about the size and contents of the model.
See <<ml-modelsizestats,model size stats objects>>
`forecasts_stats`::
(object) An object that provides statistical information about forecasts
of this job. See <<ml-forecastsstats, forecasts stats objects>>
`node`::
(object) For open jobs only, contains information about the node where the
job runs. See <<ml-stats-node,node object>>.
@ -177,6 +181,33 @@ NOTE: The `over` field values are counted separately for each detector and parti
`timestamp`::
(date) The timestamp of the `model_size_stats` according to the timestamp of the data.
[float]
[[ml-forecastsstats]]
==== Forecasts Stats Objects
The `forecasts_stats` object shows statistics about forecasts. It has the following properties:
`total`::
(long) The number of forecasts currently available for this model.
`forecasted_jobs`::
(long) The number of jobs that have at least one forecast.
`memory_bytes`::
(object) Statistics about the memory usage: minimum, maximum, average and total.
`records`::
(object) Statistics about the number of forecast records: minimum, maximum, average and total.
`processing_time_ms`::
(object) Statistics about the forecast runtime in milliseconds: minimum, maximum, average and total.
`status`::
(object) Counts per forecast status, for example: {"finished" : 2}.
NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise
these fields are ommitted.
[float]
[[ml-stats-node]]
==== Node Objects

View File

@ -22,6 +22,7 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
public static final String DATAFEEDS_FIELD = "datafeeds";
public static final String COUNT = "count";
public static final String DETECTORS = "detectors";
public static final String FORECASTS = "forecasts";
public static final String MODEL_SIZE = "model_size";
private final Map<String, Object> jobsUsage;

View File

@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -46,6 +47,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
private static final String DATA_COUNTS = "data_counts";
private static final String MODEL_SIZE_STATS = "model_size_stats";
private static final String FORECASTS_STATS = "forecasts_stats";
private static final String STATE = "state";
private static final String NODE = "node";
@ -154,6 +156,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
@Nullable
private ModelSizeStats modelSizeStats;
@Nullable
private ForecastStats forecastStats;
@Nullable
private TimeValue openTime;
private JobState state;
@Nullable
@ -161,11 +165,13 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
@Nullable
private String assignmentExplanation;
public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state,
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
@Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node,
@Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
this.forecastStats = forecastStats;
this.state = Objects.requireNonNull(state);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
@ -180,6 +186,9 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
openTime = in.readOptionalTimeValue();
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
forecastStats = in.readOptionalWriteable(ForecastStats::new);
}
}
public String getJobId() {
@ -193,6 +202,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
public ModelSizeStats getModelSizeStats() {
return modelSizeStats;
}
public ForecastStats getForecastStats() {
return forecastStats;
}
public JobState getState() {
return state;
@ -226,6 +239,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
if (modelSizeStats != null) {
builder.field(MODEL_SIZE_STATS, modelSizeStats);
}
if (forecastStats != null) {
builder.field(FORECASTS_STATS, forecastStats);
}
builder.field(STATE, state.toString());
if (node != null) {
builder.startObject(NODE);
@ -259,11 +276,14 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
out.writeOptionalTimeValue(openTime);
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
out.writeOptionalWriteable(forecastStats);
}
}
@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation, openTime);
return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime);
}
@Override
@ -278,6 +298,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
return Objects.equals(jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.forecastStats, other.forecastStats)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation)

View File

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* An accumulator for simple counts where statistical measures
* are not of interest.
*/
public class CountAccumulator implements Writeable {
private Map<String, Long> counts;
public CountAccumulator() {
this.counts = new HashMap<String, Long>();
}
private CountAccumulator(Map<String, Long> counts) {
this.counts = counts;
}
public CountAccumulator(StreamInput in) throws IOException {
this.counts = in.readMap(StreamInput::readString, StreamInput::readLong);
}
public void merge(CountAccumulator other) {
counts = Stream.of(counts, other.counts).flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (x, y) -> x + y));
}
public void add(String key, Long count) {
counts.put(key, counts.getOrDefault(key, 0L) + count);
}
public Map<String, Long> asMap() {
return counts;
}
public static CountAccumulator fromTermsAggregation(StringTerms termsAggregation) {
return new CountAccumulator(termsAggregation.getBuckets().stream()
.collect(Collectors.toMap(bucket -> bucket.getKeyAsString(), bucket -> bucket.getDocCount())));
}
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(counts, StreamOutput::writeString, StreamOutput::writeLong);
}
@Override
public int hashCode() {
return Objects.hash(counts);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
CountAccumulator other = (CountAccumulator) obj;
return Objects.equals(counts, other.counts);
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* A class to hold statistics about forecasts.
*/
public class ForecastStats implements ToXContentObject, Writeable {
public static class Fields {
public static final String TOTAL = "total";
public static final String FORECASTED_JOBS = "forecasted_jobs";
public static final String MEMORY = "memory_bytes";
public static final String RUNTIME = "processing_time_ms";
public static final String RECORDS = "records";
public static final String STATUSES = "status";
}
private long total;
private long forecastedJobs;
private StatsAccumulator memoryStats;
private StatsAccumulator recordStats;
private StatsAccumulator runtimeStats;
private CountAccumulator statusCounts;
public ForecastStats() {
this.total = 0;
this.forecastedJobs = 0;
this.memoryStats = new StatsAccumulator();
this.recordStats = new StatsAccumulator();
this.runtimeStats = new StatsAccumulator();
this.statusCounts = new CountAccumulator();
}
/*
* Construct ForecastStats for 1 job. Additional statistics can be added by merging other ForecastStats into it.
*/
public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats,
CountAccumulator statusCounts) {
this.total = total;
this.forecastedJobs = total > 0 ? 1 : 0;
this.memoryStats = Objects.requireNonNull(memoryStats);
this.recordStats = Objects.requireNonNull(recordStats);
this.runtimeStats = Objects.requireNonNull(runtimeStats);
this.statusCounts = Objects.requireNonNull(statusCounts);
}
public ForecastStats(StreamInput in) throws IOException {
this.total = in.readLong();
this.forecastedJobs = in.readLong();
this.memoryStats = new StatsAccumulator(in);
this.recordStats = new StatsAccumulator(in);
this.runtimeStats = new StatsAccumulator(in);
this.statusCounts = new CountAccumulator(in);
}
public ForecastStats merge(ForecastStats other) {
if (other == null) {
return this;
}
total += other.total;
forecastedJobs += other.forecastedJobs;
memoryStats.merge(other.memoryStats);
recordStats.merge(other.recordStats);
runtimeStats.merge(other.runtimeStats);
statusCounts.merge(other.statusCounts);
return this;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
doXContentBody(builder, params);
return builder.endObject();
}
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.TOTAL, total);
builder.field(Fields.FORECASTED_JOBS, forecastedJobs);
if (total > 0) {
builder.field(Fields.MEMORY, memoryStats.asMap());
builder.field(Fields.RECORDS, recordStats.asMap());
builder.field(Fields.RUNTIME, runtimeStats.asMap());
builder.field(Fields.STATUSES, statusCounts.asMap());
}
return builder;
}
public Map<String, Object> asMap() {
Map<String, Object> map = new HashMap<>();
map.put(Fields.TOTAL, total);
map.put(Fields.FORECASTED_JOBS, forecastedJobs);
if (total > 0) {
map.put(Fields.MEMORY, memoryStats.asMap());
map.put(Fields.RECORDS, recordStats.asMap());
map.put(Fields.RUNTIME, runtimeStats.asMap());
map.put(Fields.STATUSES, statusCounts.asMap());
}
return map;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(total);
out.writeLong(forecastedJobs);
memoryStats.writeTo(out);
recordStats.writeTo(out);
runtimeStats.writeTo(out);
statusCounts.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ForecastStats other = (ForecastStats) obj;
return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs)
&& Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats)
&& Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts);
}
}

View File

@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Helper class to collect min, max, avg and total statistics for a quantity
*/
public class StatsAccumulator implements Writeable {
public static class Fields {
public static final String MIN = "min";
public static final String MAX = "max";
public static final String AVG = "avg";
public static final String TOTAL = "total";
}
private long count;
private double total;
private Double min;
private Double max;
public StatsAccumulator() {
}
public StatsAccumulator(StreamInput in) throws IOException {
count = in.readLong();
total = in.readDouble();
min = in.readOptionalDouble();
max = in.readOptionalDouble();
}
private StatsAccumulator(long count, double total, double min, double max) {
this.count = count;
this.total = total;
this.min = min;
this.max = max;
}
public void add(double value) {
count++;
total += value;
min = min == null ? value : (value < min ? value : min);
max = max == null ? value : (value > max ? value : max);
}
public double getMin() {
return min == null ? 0.0 : min;
}
public double getMax() {
return max == null ? 0.0 : max;
}
public double getAvg() {
return count == 0.0 ? 0.0 : total/count;
}
public double getTotal() {
return total;
}
public void merge(StatsAccumulator other) {
count += other.count;
total += other.total;
// note: not using Math.min/max as some internal prefetch optimization causes an NPE
min = min == null ? other.min : (other.min == null ? min : other.min < min ? other.min : min);
max = max == null ? other.max : (other.max == null ? max : other.max > max ? other.max : max);
}
public Map<String, Double> asMap() {
Map<String, Double> map = new HashMap<>();
map.put(Fields.MIN, getMin());
map.put(Fields.MAX, getMax());
map.put(Fields.AVG, getAvg());
map.put(Fields.TOTAL, getTotal());
return map;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(count);
out.writeDouble(total);
out.writeOptionalDouble(min);
out.writeOptionalDouble(max);
}
public static StatsAccumulator fromStatsAggregation(Stats statsAggregation) {
return new StatsAccumulator(statsAggregation.getCount(), statsAggregation.getSum(), statsAggregation.getMin(),
statsAggregation.getMax());
}
@Override
public int hashCode() {
return Objects.hash(count, total, min, max);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StatsAccumulator other = (StatsAccumulator) obj;
return Objects.equals(count, other.count) && Objects.equals(total, other.total) && Objects.equals(min, other.min)
&& Objects.equals(max, other.max);
}
}

View File

@ -17,6 +17,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests;
import java.net.InetAddress;
import java.util.ArrayList;
@ -42,6 +44,12 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build();
}
ForecastStats forecastStats = null;
if (randomBoolean()) {
forecastStats = new ForecastStatsTests().createTestInstance();
}
JobState jobState = randomFrom(EnumSet.allOf(JobState.class));
DiscoveryNode node = null;
@ -56,7 +64,8 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
if (randomBoolean()) {
openTime = parseTimeValue(randomPositiveTimeValue(), "open_time-Test");
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState, node, explanation, openTime);
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, forecastStats, jobState, node, explanation,
openTime);
jobStatsList.add(jobStats);
}

View File

@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.stats;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms.Bucket;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CountAccumulatorTests extends AbstractWireSerializingTestCase<CountAccumulator> {
public void testEmpty() {
CountAccumulator accumulator = new CountAccumulator();
assertEquals(Collections.emptyMap(), accumulator.asMap());
}
public void testAdd() {
CountAccumulator accumulator = new CountAccumulator();
accumulator.add("a", 22L);
accumulator.add("a", 10L);
accumulator.add("a", 15L);
accumulator.add("a", -12L);
accumulator.add("a", 0L);
accumulator.add("b", 13L);
accumulator.add("b", 1L);
accumulator.add("b", 40000L);
accumulator.add("b", -2L);
accumulator.add("b", 333L);
assertEquals(35L, accumulator.asMap().get("a").longValue());
assertEquals(40345L, accumulator.asMap().get("b").longValue());
assertEquals(2, accumulator.asMap().size());
}
public void testMerge() {
CountAccumulator accumulator = new CountAccumulator();
accumulator.add("a", 13L);
accumulator.add("b", 42L);
CountAccumulator accumulator2 = new CountAccumulator();
accumulator2.add("a", 12L);
accumulator2.add("c", -1L);
accumulator.merge(accumulator2);
assertEquals(25L, accumulator.asMap().get("a").longValue());
assertEquals(42L, accumulator.asMap().get("b").longValue());
assertEquals(-1L, accumulator.asMap().get("c").longValue());
assertEquals(3, accumulator.asMap().size());
}
public void testFromTermsAggregation() {
StringTerms termsAggregation = mock(StringTerms.class);
Bucket bucket1 = mock(Bucket.class);
when(bucket1.getKeyAsString()).thenReturn("a");
when(bucket1.getDocCount()).thenReturn(10L);
Bucket bucket2 = mock(Bucket.class);
when(bucket2.getKeyAsString()).thenReturn("b");
when(bucket2.getDocCount()).thenReturn(33L);
List<Bucket> buckets = Arrays.asList(bucket1, bucket2);
when(termsAggregation.getBuckets()).thenReturn(buckets);
CountAccumulator accumulator = CountAccumulator.fromTermsAggregation(termsAggregation);
assertEquals(10L, accumulator.asMap().get("a").longValue());
assertEquals(33L, accumulator.asMap().get("b").longValue());
assertEquals(2, accumulator.asMap().size());
}
@Override
public CountAccumulator createTestInstance() {
CountAccumulator accumulator = new CountAccumulator();
for (int i = 0; i < randomInt(10); ++i) {
accumulator.add(randomAlphaOfLengthBetween(1, 20), randomLongBetween(1L, 100L));
}
return accumulator;
}
@Override
protected Reader<CountAccumulator> instanceReader() {
return CountAccumulator::new;
}
}

View File

@ -0,0 +1,254 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.stats;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats.Fields;
import java.io.IOException;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class ForecastStatsTests extends AbstractWireSerializingTestCase<ForecastStats> {
public void testEmpty() throws IOException {
ForecastStats forecastStats = new ForecastStats();
XContentBuilder builder = JsonXContent.contentBuilder();
forecastStats.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = createParser(builder);
Map<String, Object> properties = parser.map();
assertTrue(properties.containsKey(Fields.TOTAL));
assertTrue(properties.containsKey(Fields.FORECASTED_JOBS));
assertFalse(properties.containsKey(Fields.MEMORY));
assertFalse(properties.containsKey(Fields.RECORDS));
assertFalse(properties.containsKey(Fields.RUNTIME));
assertFalse(properties.containsKey(Fields.STATUSES));
}
public void testMerge() {
StatsAccumulator memoryStats = new StatsAccumulator();
memoryStats.add(1000);
memoryStats.add(45000);
memoryStats.add(2300);
StatsAccumulator recordStats = new StatsAccumulator();
recordStats.add(10);
recordStats.add(0);
recordStats.add(20);
StatsAccumulator runtimeStats = new StatsAccumulator();
runtimeStats.add(0);
runtimeStats.add(0);
runtimeStats.add(10);
CountAccumulator statusStats = new CountAccumulator();
statusStats.add("finished", 2L);
statusStats.add("failed", 5L);
ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats);
StatsAccumulator memoryStats2 = new StatsAccumulator();
memoryStats2.add(10);
memoryStats2.add(30);
StatsAccumulator recordStats2 = new StatsAccumulator();
recordStats2.add(10);
recordStats2.add(0);
StatsAccumulator runtimeStats2 = new StatsAccumulator();
runtimeStats2.add(96);
runtimeStats2.add(0);
CountAccumulator statusStats2 = new CountAccumulator();
statusStats2.add("finished", 2L);
statusStats2.add("scheduled", 1L);
ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2);
forecastStats.merge(forecastStats2);
Map<String, Object> mergedStats = forecastStats.asMap();
assertEquals(2L, mergedStats.get(Fields.FORECASTED_JOBS));
assertEquals(5L, mergedStats.get(Fields.TOTAL));
@SuppressWarnings("unchecked")
Map<String, Double> mergedMemoryStats = (Map<String, Double>) mergedStats.get(Fields.MEMORY);
assertTrue(mergedMemoryStats != null);
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.AVG), equalTo(9668.0));
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MAX), equalTo(45000.0));
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MIN), equalTo(10.0));
@SuppressWarnings("unchecked")
Map<String, Double> mergedRecordStats = (Map<String, Double>) mergedStats.get(Fields.RECORDS);
assertTrue(mergedRecordStats != null);
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.AVG), equalTo(8.0));
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MAX), equalTo(20.0));
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0));
@SuppressWarnings("unchecked")
Map<String, Double> mergedRuntimeStats = (Map<String, Double>) mergedStats.get(Fields.RUNTIME);
assertTrue(mergedRuntimeStats != null);
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.AVG), equalTo(21.2));
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MAX), equalTo(96.0));
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0));
@SuppressWarnings("unchecked")
Map<String, Long> mergedCountStats = (Map<String, Long>) mergedStats.get(Fields.STATUSES);
assertTrue(mergedCountStats != null);
assertEquals(3, mergedCountStats.size());
assertEquals(4, mergedCountStats.get("finished").longValue());
assertEquals(5, mergedCountStats.get("failed").longValue());
assertEquals(1, mergedCountStats.get("scheduled").longValue());
}
public void testChainedMerge() {
StatsAccumulator memoryStats = new StatsAccumulator();
memoryStats.add(1000);
memoryStats.add(45000);
memoryStats.add(2300);
StatsAccumulator recordStats = new StatsAccumulator();
recordStats.add(10);
recordStats.add(0);
recordStats.add(20);
StatsAccumulator runtimeStats = new StatsAccumulator();
runtimeStats.add(0);
runtimeStats.add(0);
runtimeStats.add(10);
CountAccumulator statusStats = new CountAccumulator();
statusStats.add("finished", 2L);
statusStats.add("failed", 5L);
ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats);
StatsAccumulator memoryStats2 = new StatsAccumulator();
memoryStats2.add(10);
memoryStats2.add(30);
StatsAccumulator recordStats2 = new StatsAccumulator();
recordStats2.add(10);
recordStats2.add(0);
StatsAccumulator runtimeStats2 = new StatsAccumulator();
runtimeStats2.add(96);
runtimeStats2.add(0);
CountAccumulator statusStats2 = new CountAccumulator();
statusStats2.add("finished", 2L);
statusStats2.add("scheduled", 1L);
ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2);
StatsAccumulator memoryStats3 = new StatsAccumulator();
memoryStats3.add(500);
StatsAccumulator recordStats3 = new StatsAccumulator();
recordStats3.add(50);
StatsAccumulator runtimeStats3 = new StatsAccumulator();
runtimeStats3.add(32);
CountAccumulator statusStats3 = new CountAccumulator();
statusStats3.add("finished", 1L);
ForecastStats forecastStats3 = new ForecastStats(1, memoryStats3, recordStats3, runtimeStats3, statusStats3);
ForecastStats forecastStats4 = new ForecastStats();
// merge 4 into 3
forecastStats3.merge(forecastStats4);
// merge 3 into 2
forecastStats2.merge(forecastStats3);
// merger 2 into 1
forecastStats.merge(forecastStats2);
Map<String, Object> mergedStats = forecastStats.asMap();
assertEquals(3L, mergedStats.get(Fields.FORECASTED_JOBS));
assertEquals(6L, mergedStats.get(Fields.TOTAL));
@SuppressWarnings("unchecked")
Map<String, Double> mergedMemoryStats = (Map<String, Double>) mergedStats.get(Fields.MEMORY);
assertTrue(mergedMemoryStats != null);
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.AVG), equalTo(8140.0));
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MAX), equalTo(45000.0));
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MIN), equalTo(10.0));
@SuppressWarnings("unchecked")
Map<String, Double> mergedRecordStats = (Map<String, Double>) mergedStats.get(Fields.RECORDS);
assertTrue(mergedRecordStats != null);
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.AVG), equalTo(15.0));
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MAX), equalTo(50.0));
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0));
@SuppressWarnings("unchecked")
Map<String, Double> mergedRuntimeStats = (Map<String, Double>) mergedStats.get(Fields.RUNTIME);
assertTrue(mergedRuntimeStats != null);
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.AVG), equalTo(23.0));
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MAX), equalTo(96.0));
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0));
@SuppressWarnings("unchecked")
Map<String, Long> mergedCountStats = (Map<String, Long>) mergedStats.get(Fields.STATUSES);
assertTrue(mergedCountStats != null);
assertEquals(3, mergedCountStats.size());
assertEquals(5, mergedCountStats.get("finished").longValue());
assertEquals(5, mergedCountStats.get("failed").longValue());
assertEquals(1, mergedCountStats.get("scheduled").longValue());
}
public void testUniqueCountOfJobs() {
ForecastStats forecastStats = createForecastStats(5, 10);
ForecastStats forecastStats2 = createForecastStats(2, 8);
ForecastStats forecastStats3 = createForecastStats(0, 0);
ForecastStats forecastStats4 = createForecastStats(0, 0);
ForecastStats forecastStats5 = createForecastStats(1, 12);
forecastStats.merge(forecastStats2);
forecastStats.merge(forecastStats3);
forecastStats.merge(forecastStats4);
forecastStats.merge(forecastStats5);
assertEquals(3L, forecastStats.asMap().get(Fields.FORECASTED_JOBS));
}
@Override
public ForecastStats createTestInstance() {
return createForecastStats(1, 22);
}
@Override
protected Reader<ForecastStats> instanceReader() {
return ForecastStats::new;
}
public ForecastStats createForecastStats(long minTotal, long maxTotal) {
ForecastStats forecastStats = new ForecastStats(randomLongBetween(minTotal, maxTotal), createStatsAccumulator(),
createStatsAccumulator(), createStatsAccumulator(), createCountAccumulator());
return forecastStats;
}
private StatsAccumulator createStatsAccumulator() {
return new StatsAccumulatorTests().createTestInstance();
}
private CountAccumulator createCountAccumulator() {
return new CountAccumulatorTests().createTestInstance();
}
}

View File

@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.stats;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class StatsAccumulatorTests extends AbstractWireSerializingTestCase<StatsAccumulator> {
public void testGivenNoValues() {
StatsAccumulator accumulator = new StatsAccumulator();
assertThat(accumulator.getMin(), equalTo(0.0));
assertThat(accumulator.getMax(), equalTo(0.0));
assertThat(accumulator.getTotal(), equalTo(0.0));
assertThat(accumulator.getAvg(), equalTo(0.0));
}
public void testGivenPositiveValues() {
StatsAccumulator accumulator = new StatsAccumulator();
for (int i = 1; i <= 10; i++) {
accumulator.add(i);
}
assertThat(accumulator.getMin(), equalTo(1.0));
assertThat(accumulator.getMax(), equalTo(10.0));
assertThat(accumulator.getTotal(), equalTo(55.0));
assertThat(accumulator.getAvg(), equalTo(5.5));
}
public void testGivenNegativeValues() {
StatsAccumulator accumulator = new StatsAccumulator();
for (int i = 1; i <= 10; i++) {
accumulator.add(-1 * i);
}
assertThat(accumulator.getMin(), equalTo(-10.0));
assertThat(accumulator.getMax(), equalTo(-1.0));
assertThat(accumulator.getTotal(), equalTo(-55.0));
assertThat(accumulator.getAvg(), equalTo(-5.5));
}
public void testAsMap() {
StatsAccumulator accumulator = new StatsAccumulator();
accumulator.add(5.0);
accumulator.add(10.0);
Map<String, Double> expectedMap = new HashMap<>();
expectedMap.put("min", 5.0);
expectedMap.put("max", 10.0);
expectedMap.put("avg", 7.5);
expectedMap.put("total", 15.0);
assertThat(accumulator.asMap(), equalTo(expectedMap));
}
public void testMerge() {
StatsAccumulator accumulator = new StatsAccumulator();
accumulator.add(5.0);
accumulator.add(10.0);
assertThat(accumulator.getMin(), equalTo(5.0));
assertThat(accumulator.getMax(), equalTo(10.0));
assertThat(accumulator.getTotal(), equalTo(15.0));
assertThat(accumulator.getAvg(), equalTo(7.5));
StatsAccumulator accumulator2 = new StatsAccumulator();
accumulator2.add(1.0);
accumulator2.add(3.0);
accumulator2.add(7.0);
assertThat(accumulator2.getMin(), equalTo(1.0));
assertThat(accumulator2.getMax(), equalTo(7.0));
assertThat(accumulator2.getTotal(), equalTo(11.0));
assertThat(accumulator2.getAvg(), equalTo(11.0 / 3.0));
accumulator.merge(accumulator2);
assertThat(accumulator.getMin(), equalTo(1.0));
assertThat(accumulator.getMax(), equalTo(10.0));
assertThat(accumulator.getTotal(), equalTo(26.0));
assertThat(accumulator.getAvg(), equalTo(5.2));
// same as accumulator
StatsAccumulator accumulator3 = new StatsAccumulator();
accumulator3.add(5.0);
accumulator3.add(10.0);
// merging the other way should yield the same results
accumulator2.merge(accumulator3);
assertThat(accumulator2.getMin(), equalTo(1.0));
assertThat(accumulator2.getMax(), equalTo(10.0));
assertThat(accumulator2.getTotal(), equalTo(26.0));
assertThat(accumulator2.getAvg(), equalTo(5.2));
}
public void testMergeMixedEmpty() {
StatsAccumulator accumulator = new StatsAccumulator();
StatsAccumulator accumulator2 = new StatsAccumulator();
accumulator2.add(1.0);
accumulator2.add(3.0);
accumulator.merge(accumulator2);
assertThat(accumulator.getMin(), equalTo(1.0));
assertThat(accumulator.getMax(), equalTo(3.0));
assertThat(accumulator.getTotal(), equalTo(4.0));
StatsAccumulator accumulator3 = new StatsAccumulator();
accumulator.merge(accumulator3);
assertThat(accumulator.getMin(), equalTo(1.0));
assertThat(accumulator.getMax(), equalTo(3.0));
assertThat(accumulator.getTotal(), equalTo(4.0));
StatsAccumulator accumulator4 = new StatsAccumulator();
accumulator3.merge(accumulator4);
assertThat(accumulator3.getMin(), equalTo(0.0));
assertThat(accumulator3.getMax(), equalTo(0.0));
assertThat(accumulator3.getTotal(), equalTo(0.0));
}
public void testFromStatsAggregation() {
Stats stats = mock(Stats.class);
when(stats.getMax()).thenReturn(25.0);
when(stats.getMin()).thenReturn(2.5);
when(stats.getCount()).thenReturn(4L);
when(stats.getSum()).thenReturn(48.0);
when(stats.getAvg()).thenReturn(12.0);
StatsAccumulator accumulator = StatsAccumulator.fromStatsAggregation(stats);
assertThat(accumulator.getMin(), equalTo(2.5));
assertThat(accumulator.getMax(), equalTo(25.0));
assertThat(accumulator.getTotal(), equalTo(48.0));
assertThat(accumulator.getAvg(), equalTo(12.0));
}
@Override
public StatsAccumulator createTestInstance() {
StatsAccumulator accumulator = new StatsAccumulator();
for (int i = 0; i < randomInt(10); ++i) {
accumulator.add(randomDoubleBetween(0.0, 1000.0, true));
}
return accumulator;
}
@Override
protected Reader<StatsAccumulator> instanceReader() {
return StatsAccumulator::new;
}
}

View File

@ -33,7 +33,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.utils.StatsAccumulator;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
import java.io.IOException;
import java.util.Arrays;
@ -192,10 +193,12 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
private void addJobsUsage(GetJobsStatsAction.Response response) {
StatsAccumulator allJobsDetectorsStats = new StatsAccumulator();
StatsAccumulator allJobsModelSizeStats = new StatsAccumulator();
ForecastStats allJobsForecastStats = new ForecastStats();
Map<JobState, Counter> jobCountByState = new HashMap<>();
Map<JobState, StatsAccumulator> detectorStatsByState = new HashMap<>();
Map<JobState, StatsAccumulator> modelSizeStatsByState = new HashMap<>();
Map<JobState, ForecastStats> forecastStatsByState = new HashMap<>();
Map<String, Job> jobs = mlMetadata.getJobs();
List<GetJobsStatsAction.Response.JobStats> jobsStats = response.getResponse().results();
@ -206,6 +209,7 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
double modelSize = modelSizeStats == null ? 0.0
: jobStats.getModelSizeStats().getModelBytes();
allJobsForecastStats.merge(jobStats.getForecastStats());
allJobsDetectorsStats.add(detectorsCount);
allJobsModelSizeStats.add(modelSize);
@ -215,24 +219,28 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
js -> new StatsAccumulator()).add(detectorsCount);
modelSizeStatsByState.computeIfAbsent(jobState,
js -> new StatsAccumulator()).add(modelSize);
forecastStatsByState.merge(jobState, jobStats.getForecastStats(), (f1, f2) -> f1.merge(f2));
}
jobsUsage.put(MachineLearningFeatureSetUsage.ALL, createJobUsageEntry(jobs.size(), allJobsDetectorsStats,
allJobsModelSizeStats));
allJobsModelSizeStats, allJobsForecastStats));
for (JobState jobState : jobCountByState.keySet()) {
jobsUsage.put(jobState.name().toLowerCase(Locale.ROOT), createJobUsageEntry(
jobCountByState.get(jobState).get(),
detectorStatsByState.get(jobState),
modelSizeStatsByState.get(jobState)));
modelSizeStatsByState.get(jobState),
forecastStatsByState.get(jobState)));
}
}
private Map<String, Object> createJobUsageEntry(long count, StatsAccumulator detectorStats,
StatsAccumulator modelSizeStats) {
StatsAccumulator modelSizeStats,
ForecastStats forecastStats) {
Map<String, Object> usage = new HashMap<>();
usage.put(MachineLearningFeatureSetUsage.COUNT, count);
usage.put(MachineLearningFeatureSetUsage.DETECTORS, detectorStats.asMap());
usage.put(MachineLearningFeatureSetUsage.MODEL_SIZE, modelSizeStats.asMap());
usage.put(MachineLearningFeatureSetUsage.FORECASTS, forecastStats.asMap());
return usage;
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -106,9 +107,12 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
JobState jobState = MlMetadata.getJobState(jobId, tasks);
String assignmentExplanation = pTask.getAssignment().getExplanation();
TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task));
GetJobsStatsAction.Response.JobStats jobStats = new GetJobsStatsAction.Response.JobStats(jobId, stats.get().v1(),
stats.get().v2(), jobState, node, assignmentExplanation, openTime);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
gatherForecastStats(jobId, forecastStats -> {
GetJobsStatsAction.Response.JobStats jobStats = new GetJobsStatsAction.Response.JobStats(jobId, stats.get().v1(),
stats.get().v2(), forecastStats, jobState, node, assignmentExplanation, openTime);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
}, listener::onFailure);
} else {
listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD));
}
@ -131,25 +135,31 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
for (int i = 0; i < jobIds.size(); i++) {
int slot = i;
String jobId = jobIds.get(i);
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobState jobState = MlMetadata.getJobState(jobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> pTask = MlMetadata.getJobTask(jobId, tasks);
String assignmentExplanation = null;
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(slot, new GetJobsStatsAction.Response.JobStats(jobId, dataCounts, modelSizeStats, jobState, null,
assignmentExplanation, null));
if (counter.decrementAndGet() == 0) {
List<GetJobsStatsAction.Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
}
gatherForecastStats(jobId, forecastStats -> {
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobState jobState = MlMetadata.getJobState(jobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> pTask = MlMetadata.getJobTask(jobId, tasks);
String assignmentExplanation = null;
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(slot, new GetJobsStatsAction.Response.JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState,
null, assignmentExplanation, null));
if (counter.decrementAndGet() == 0) {
List<GetJobsStatsAction.Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
}
}, listener::onFailure);
}, listener::onFailure);
}
}
void gatherForecastStats(String jobId, Consumer<ForecastStats> handler, Consumer<Exception> errorHandler) {
jobProvider.getForecastStats(jobId, handler, errorHandler);
}
void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer<DataCounts, ModelSizeStats> handler,
Consumer<Exception> errorHandler) {
jobProvider.dataCounts(jobId, dataCounts -> {

View File

@ -63,6 +63,9 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
@ -93,6 +96,9 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.stats.CountAccumulator;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
import org.elasticsearch.xpack.core.security.support.Exceptions;
@ -1112,6 +1118,53 @@ public class JobProvider {
result -> handler.accept(result.result), errorHandler, () -> null);
}
public void getForecastStats(String jobId, Consumer<ForecastStats> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
QueryBuilder termQuery = new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE);
QueryBuilder jobQuery = new TermsQueryBuilder(Job.ID.getPreferredName(), jobId);
QueryBuilder finalQuery = new BoolQueryBuilder().filter(termQuery).filter(jobQuery);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(searchRequest.indicesOptions()));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(finalQuery);
sourceBuilder.aggregation(
AggregationBuilders.stats(ForecastStats.Fields.MEMORY).field(ForecastRequestStats.MEMORY_USAGE.getPreferredName()));
sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.RECORDS)
.field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName()));
sourceBuilder.aggregation(
AggregationBuilders.stats(ForecastStats.Fields.RUNTIME).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName()));
sourceBuilder.aggregation(
AggregationBuilders.terms(ForecastStats.Fields.STATUSES).field(ForecastRequestStats.STATUS.getPreferredName()));
sourceBuilder.size(0);
searchRequest.source(sourceBuilder);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(searchResponse -> {
long totalHits = searchResponse.getHits().getTotalHits();
Aggregations aggregations = searchResponse.getAggregations();
if (totalHits == 0 || aggregations == null) {
handler.accept(new ForecastStats());
return;
}
Map<String, Aggregation> aggregationsAsMap = aggregations.asMap();
StatsAccumulator memoryStats = StatsAccumulator
.fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.MEMORY));
StatsAccumulator recordStats = StatsAccumulator
.fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RECORDS));
StatsAccumulator runtimeStats = StatsAccumulator
.fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RUNTIME));
CountAccumulator statusCount = CountAccumulator
.fromTermsAggregation((StringTerms) aggregationsAsMap.get(ForecastStats.Fields.STATUSES));
ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, runtimeStats, statusCount);
handler.accept(forecastStats);
}, errorHandler), client::search);
}
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove,
Consumer<Calendar> handler, Consumer<Exception> errorHandler) {

View File

@ -1,57 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import java.util.HashMap;
import java.util.Map;
/**
* Helper class to collect min, max, avg and total statistics for a quantity
*/
public class StatsAccumulator {
private static final String MIN = "min";
private static final String MAX = "max";
private static final String AVG = "avg";
private static final String TOTAL = "total";
private long count;
private double total;
private Double min;
private Double max;
public void add(double value) {
count++;
total += value;
min = min == null ? value : (value < min ? value : min);
max = max == null ? value : (value > max ? value : max);
}
public double getMin() {
return min == null ? 0.0 : min;
}
public double getMax() {
return max == null ? 0.0 : max;
}
public double getAvg() {
return count == 0.0 ? 0.0 : total/count;
}
public double getTotal() {
return total;
}
public Map<String, Double> asMap() {
Map<String, Double> map = new HashMap<>();
map.put(MIN, getMin());
map.put(MAX, getMax());
map.put(AVG, getAvg());
map.put(TOTAL, getTotal());
return map;
}
}

View File

@ -39,6 +39,8 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.junit.Before;
@ -138,11 +140,11 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
settings.put("xpack.ml.enabled", true);
Job opened1 = buildJob("opened1", Arrays.asList(buildMinDetector("foo")));
GetJobsStatsAction.Response.JobStats opened1JobStats = buildJobStats("opened1", JobState.OPENED, 100L);
GetJobsStatsAction.Response.JobStats opened1JobStats = buildJobStats("opened1", JobState.OPENED, 100L, 3L);
Job opened2 = buildJob("opened2", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar")));
GetJobsStatsAction.Response.JobStats opened2JobStats = buildJobStats("opened2", JobState.OPENED, 200L);
GetJobsStatsAction.Response.JobStats opened2JobStats = buildJobStats("opened2", JobState.OPENED, 200L, 8L);
Job closed1 = buildJob("closed1", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar"), buildMinDetector("foobar")));
GetJobsStatsAction.Response.JobStats closed1JobStats = buildJobStats("closed1", JobState.CLOSED, 300L);
GetJobsStatsAction.Response.JobStats closed1JobStats = buildJobStats("closed1", JobState.CLOSED, 300L, 0);
givenJobs(Arrays.asList(opened1, opened2, closed1),
Arrays.asList(opened1JobStats, opened2JobStats, closed1JobStats));
@ -210,6 +212,15 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
assertThat(source.getValue("datafeeds._all.count"), equalTo(3));
assertThat(source.getValue("datafeeds.started.count"), equalTo(2));
assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1));
assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11));
assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2));
assertThat(source.getValue("jobs.closed.forecasts.total"), equalTo(0));
assertThat(source.getValue("jobs.closed.forecasts.forecasted_jobs"), equalTo(0));
assertThat(source.getValue("jobs.opened.forecasts.total"), equalTo(11));
assertThat(source.getValue("jobs.opened.forecasts.forecasted_jobs"), equalTo(2));
}
}
@ -301,12 +312,16 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
.build(new Date(randomNonNegativeLong()));
}
private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes) {
private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes,
long numberOfForecasts) {
ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId);
modelSizeStats.setModelBytes(modelBytes);
GetJobsStatsAction.Response.JobStats jobStats = mock(GetJobsStatsAction.Response.JobStats.class);
ForecastStats forecastStats = buildForecastStats(numberOfForecasts);
when(jobStats.getJobId()).thenReturn(jobId);
when(jobStats.getModelSizeStats()).thenReturn(modelSizeStats.build());
when(jobStats.getForecastStats()).thenReturn(forecastStats);
when(jobStats.getState()).thenReturn(state);
return jobStats;
}
@ -316,4 +331,8 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
when(stats.getDatafeedState()).thenReturn(state);
return stats;
}
private static ForecastStats buildForecastStats(long numberOfForecasts) {
return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts);
}
}

View File

@ -37,7 +37,7 @@ public class TransportGetJobsStatsActionTests extends ESTestCase {
result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Collections.singletonList("id1"), Collections.singletonList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null)));
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null)));
assertEquals(0, result.size());
result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
@ -49,7 +49,7 @@ public class TransportGetJobsStatsActionTests extends ESTestCase {
result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Arrays.asList("id1", "id2", "id3"),
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null,
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null,
JobState.CLOSED, null, null, null))
);
assertEquals(2, result.size());
@ -58,17 +58,16 @@ public class TransportGetJobsStatsActionTests extends ESTestCase {
result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.OPENED, null, null, null)
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null)
));
assertEquals(1, result.size());
assertEquals("id2", result.get(0));
result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"),
Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.OPENED, null, null, null)));
result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null)));
assertEquals(0, result.size());
// No jobs running, but job 4 is being deleted

View File

@ -1,63 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class StatsAccumulatorTests extends ESTestCase {
public void testGivenNoValues() {
StatsAccumulator accumulator = new StatsAccumulator();
assertThat(accumulator.getMin(), equalTo(0.0));
assertThat(accumulator.getMax(), equalTo(0.0));
assertThat(accumulator.getTotal(), equalTo(0.0));
assertThat(accumulator.getAvg(), equalTo(0.0));
}
public void testGivenPositiveValues() {
StatsAccumulator accumulator = new StatsAccumulator();
for (int i = 1; i <= 10; i++) {
accumulator.add(i);
}
assertThat(accumulator.getMin(), equalTo(1.0));
assertThat(accumulator.getMax(), equalTo(10.0));
assertThat(accumulator.getTotal(), equalTo(55.0));
assertThat(accumulator.getAvg(), equalTo(5.5));
}
public void testGivenNegativeValues() {
StatsAccumulator accumulator = new StatsAccumulator();
for (int i = 1; i <= 10; i++) {
accumulator.add(-1 * i);
}
assertThat(accumulator.getMin(), equalTo(-10.0));
assertThat(accumulator.getMax(), equalTo(-1.0));
assertThat(accumulator.getTotal(), equalTo(-55.0));
assertThat(accumulator.getAvg(), equalTo(-5.5));
}
public void testAsMap() {
StatsAccumulator accumulator = new StatsAccumulator();
accumulator.add(5.0);
accumulator.add(10.0);
Map<String, Double> expectedMap = new HashMap<>();
expectedMap.put("min", 5.0);
expectedMap.put("max", 10.0);
expectedMap.put("avg", 7.5);
expectedMap.put("total", 15.0);
assertThat(accumulator.asMap(), equalTo(expectedMap));
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response.JobSta
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase;
@ -100,7 +101,9 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
.build();
final DataCounts dataCounts = new DataCounts("_job_id", 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, date3, date4, date5, date6, date7);
final JobStats jobStats = new JobStats("_job", dataCounts, modelStats, JobState.OPENED, discoveryNode, "_explanation", time);
final ForecastStats forecastStats = new ForecastStats();
final JobStats jobStats = new JobStats("_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode,
"_explanation", time);
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);
final JobStatsMonitoringDoc document = new JobStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, jobStats);
@ -152,6 +155,9 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
+ "\"log_time\":1483315322002,"
+ "\"timestamp\":1483228861001"
+ "},"
+ "\"forecasts_stats\":{"
+ "\"total\":0,\"forecasted_jobs\":0"
+ "},"
+ "\"state\":\"opened\","
+ "\"node\":{"
+ "\"id\":\"_node_id\","