From 1cfd9bd9866f31fcc59cc01ed2cfc3fcf52b4cb1 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 24 Mar 2017 10:42:12 +0000 Subject: [PATCH] [ML] Add ml jobs and datafeeds in usage API (elastic/x-pack-elasticsearch#805) This adds basic info about jobs and datafeeds, sufficient for the first release for supporting monitoring and phone-home. In particular, usage now displays: - job count for _all and by state - detectors statistics (min, max, mean, total) for _all jobs and by job state - model size statistics (min, max, mean, total) for _all jobs and by job state - datafeed count for _all and by state Relates elastic/x-pack-elasticsearch#660 Original commit: elastic/x-pack-elasticsearch@6e0da6c3db357cc390ddff0645b02339a00db3e2 --- .../xpack/ml/MachineLearning.java | 5 + .../xpack/ml/MachineLearningFeatureSet.java | 195 +++++++++++++++++- .../extractor/scroll/ScrollDataExtractor.java | 1 - .../xpack/ml/utils/StatsAccumulator.java | 57 +++++ .../ml/MachineLearningFeatureSetTests.java | 181 +++++++++++++++- .../xpack/ml/utils/StatsAccumulatorTests.java | 63 ++++++ 6 files changed, 493 insertions(+), 9 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 57caf3aeb4a..3425fe5abe1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -330,6 +330,11 @@ public class MachineLearning implements ActionPlugin { public Collection nodeModules() { List modules = new ArrayList<>(); + + if (transportClientMode) { + return modules; + } + modules.add(b -> { XPackPlugin.bindFeatureSet(b, MachineLearningFeatureSet.class); }); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index e0015964c15..c8a56dc33fb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -5,33 +5,56 @@ */ package org.elasticsearch.xpack.ml; +import org.apache.lucene.util.Counter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.XPackFeatureSet; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.NativeController; import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.ml.utils.StatsAccumulator; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeoutException; public class MachineLearningFeatureSet implements XPackFeatureSet { private final boolean enabled; private final XPackLicenseState licenseState; + private final ClusterService clusterService; + private final Client client; private final Map nativeCodeInfo; @Inject - public MachineLearningFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState) { + public MachineLearningFeatureSet(Settings settings, ClusterService clusterService, Client client, + @Nullable XPackLicenseState licenseState) { this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings); + this.clusterService = Objects.requireNonNull(clusterService); + this.client = Objects.requireNonNull(client); this.licenseState = licenseState; Map nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO; // Don't try to get the native code version in the transport client - the controller process won't be running @@ -79,17 +102,177 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { @Override public void usage(ActionListener listener) { - listener.onResponse(new Usage(available(), enabled())); + ClusterState state = clusterService.state(); + MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE); + new Usage.Retriever(client, mlMetadata, available(), enabled()).execute(listener); } public static class Usage extends XPackFeatureSet.Usage { - public Usage(StreamInput input) throws IOException { - super(input); + private static final String ALL = "_all"; + private static final String JOBS_FIELD = "jobs"; + private static final String DATAFEEDS_FIELD = "datafeeds"; + private static final String COUNT = "count"; + private static final String DETECTORS = "detectors"; + private static final String MODEL_SIZE = "model_size"; + + private final Map jobsUsage; + private final Map datafeedsUsage; + + public Usage(boolean available, boolean enabled, Map jobsUsage, + Map datafeedsUsage) { + super(XPackPlugin.MACHINE_LEARNING, available, enabled); + this.jobsUsage = Objects.requireNonNull(jobsUsage); + this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage); } - public Usage(boolean available, boolean enabled) { - super(XPackPlugin.MACHINE_LEARNING, available, enabled); + public Usage(StreamInput in) throws IOException { + super(in); + this.jobsUsage = in.readMap(); + this.datafeedsUsage = in.readMap(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(jobsUsage); + out.writeMap(datafeedsUsage); + } + + @Override + protected void innerXContent(XContentBuilder builder, Params params) throws IOException { + super.innerXContent(builder, params); + if (jobsUsage != null) { + builder.field(JOBS_FIELD, jobsUsage); + } + if (datafeedsUsage != null) { + builder.field(DATAFEEDS_FIELD, datafeedsUsage); + } + } + + public static class Retriever { + + private final Client client; + private final MlMetadata mlMetadata; + private final boolean available; + private final boolean enabled; + private Map jobsUsage; + private Map datafeedsUsage; + + public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled) { + this.client = Objects.requireNonNull(client); + this.mlMetadata = mlMetadata; + this.available = available; + this.enabled = enabled; + this.jobsUsage = new LinkedHashMap<>(); + this.datafeedsUsage = new LinkedHashMap<>(); + } + + public void execute(ActionListener listener) { + if (enabled == false) { + listener.onResponse(new Usage(available, enabled, Collections.emptyMap(), Collections.emptyMap())); + return; + } + + // Step 2. Extract usage from datafeeds stats and return usage response + ActionListener datafeedStatsListener = + ActionListener.wrap(response -> { + addDatafeedsUsage(response); + listener.onResponse(new Usage( + available, enabled, jobsUsage, datafeedsUsage)); + }, + error -> { + listener.onFailure(error); + } + ); + + // Step 1. Extract usage from jobs stats and then request stats for all datafeeds + GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(Job.ALL); + ActionListener jobStatsListener = ActionListener.wrap( + response -> { + addJobsUsage(response); + GetDatafeedsStatsAction.Request datafeedStatsRequest = + new GetDatafeedsStatsAction.Request(GetDatafeedsStatsAction.ALL); + client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest, + datafeedStatsListener); + }, + error -> { + listener.onFailure(error); + } + ); + + // Step 0. Kick off the chain of callbacks by requesting jobs stats + client.execute(GetJobsStatsAction.INSTANCE, jobStatsRequest, jobStatsListener); + } + + private void addJobsUsage(GetJobsStatsAction.Response response) { + StatsAccumulator allJobsDetectorsStats = new StatsAccumulator(); + StatsAccumulator allJobsModelSizeStats = new StatsAccumulator(); + + Map jobCountByState = new HashMap<>(); + Map detectorStatsByState = new HashMap<>(); + Map modelSizeStatsByState = new HashMap<>(); + + Map jobs = mlMetadata.getJobs(); + for (GetJobsStatsAction.Response.JobStats jobStats + : response.getResponse().results()) { + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + int detectorsCount = jobs.get(jobStats.getJobId()).getAnalysisConfig() + .getDetectors().size(); + double modelSize = modelSizeStats == null ? 0.0 + : jobStats.getModelSizeStats().getModelBytes(); + + allJobsDetectorsStats.add(detectorsCount); + allJobsModelSizeStats.add(modelSize); + + JobState jobState = jobStats.getState(); + jobCountByState.computeIfAbsent(jobState, js -> Counter.newCounter()).addAndGet(1); + detectorStatsByState.computeIfAbsent(jobState, + js -> new StatsAccumulator()).add(detectorsCount); + modelSizeStatsByState.computeIfAbsent(jobState, + js -> new StatsAccumulator()).add(modelSize); + } + + jobsUsage.put(ALL, createJobUsageEntry(jobs.size(), allJobsDetectorsStats, + allJobsModelSizeStats)); + for (JobState jobState : jobCountByState.keySet()) { + jobsUsage.put(jobState.name().toLowerCase(Locale.ROOT), createJobUsageEntry( + jobCountByState.get(jobState).get(), + detectorStatsByState.get(jobState), + modelSizeStatsByState.get(jobState))); + } + } + + private Map createJobUsageEntry(long count, StatsAccumulator detectorStats, + StatsAccumulator modelSizeStats) { + Map usage = new HashMap<>(); + usage.put(COUNT, count); + usage.put(DETECTORS, detectorStats.asMap()); + usage.put(MODEL_SIZE, modelSizeStats.asMap()); + return usage; + } + + private void addDatafeedsUsage(GetDatafeedsStatsAction.Response response) { + Map datafeedCountByState = new HashMap<>(); + + for (GetDatafeedsStatsAction.Response.DatafeedStats datafeedStats + : response.getResponse().results()) { + datafeedCountByState.computeIfAbsent(datafeedStats.getDatafeedState(), + ds -> Counter.newCounter()).addAndGet(1); + } + + datafeedsUsage.put(ALL, createDatafeedUsageEntry(response.getResponse().count())); + for (DatafeedState datafeedState : datafeedCountByState.keySet()) { + datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT), + createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get())); + } + } + + private Map createDatafeedUsageEntry(long count) { + Map usage = new HashMap<>(); + usage.put(COUNT, count); + return usage; + } } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index 46bf6012253..8f08ba85c8f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.StoredFieldsContext; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java new file mode 100644 index 00000000000..1f1df147d80 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +import java.util.HashMap; +import java.util.Map; + +/** + * Helper class to collect min, max, avg and total statistics for a quantity + */ +public class StatsAccumulator { + + private static final String MIN = "min"; + private static final String MAX = "max"; + private static final String AVG = "avg"; + private static final String TOTAL = "total"; + + private long count; + private double total; + private Double min; + private Double max; + + public void add(double value) { + count++; + total += value; + min = min == null ? value : (value < min ? value : min); + max = max == null ? value : (value > max ? value : max); + } + + public double getMin() { + return min == null ? 0.0 : min; + } + + public double getMax() { + return max == null ? 0.0 : max; + } + + public double getAvg() { + return count == 0.0 ? 0.0 : total/count; + } + + public double getTotal() { + return total; + } + + public Map asMap() { + Map map = new HashMap<>(); + map.put(MIN, getMin()); + map.put(MAX, getMax()); + map.put(AVG, getAvg()); + map.put(TOTAL, getTotal()); + return map; + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index 686d48b0b5f..4ba3e483153 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -5,30 +5,67 @@ */ package org.elasticsearch.xpack.ml; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.XPackFeatureSet; import org.elasticsearch.xpack.XPackFeatureSet.Usage; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource; import org.junit.Before; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.Is.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MachineLearningFeatureSetTests extends ESTestCase { + private ClusterService clusterService; + private Client client; private XPackLicenseState licenseState; @Before public void init() throws Exception { + clusterService = mock(ClusterService.class); + client = mock(Client.class); licenseState = mock(XPackLicenseState.class); + givenJobs(Collections.emptyList(), Collections.emptyList()); + givenDatafeeds(Collections.emptyList()); } public void testAvailable() throws Exception { - MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(Settings.EMPTY, licenseState); + MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(Settings.EMPTY, clusterService, client, licenseState); boolean available = randomBoolean(); when(licenseState.isMachineLearningAllowed()).thenReturn(available); assertThat(featureSet.available(), is(available)); @@ -52,7 +89,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase { settings.put("xpack.ml.enabled", enabled); } boolean expected = enabled || useDefault; - MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(settings.build(), licenseState); + MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(settings.build(), clusterService, client, licenseState); assertThat(featureSet.enabled(), is(expected)); PlainActionFuture future = new PlainActionFuture<>(); featureSet.usage(future); @@ -65,4 +102,144 @@ public class MachineLearningFeatureSetTests extends ESTestCase { assertThat(serializedUsage.enabled(), is(expected)); } + public void testUsage() throws Exception { + when(licenseState.isMachineLearningAllowed()).thenReturn(true); + Settings.Builder settings = Settings.builder(); + settings.put("xpack.ml.enabled", true); + + Job opened1 = buildJob("opened1", Arrays.asList(buildMinDetector("foo"))); + GetJobsStatsAction.Response.JobStats opened1JobStats = buildJobStats("opened1", JobState.OPENED, 100L); + Job opened2 = buildJob("opened2", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar"))); + GetJobsStatsAction.Response.JobStats opened2JobStats = buildJobStats("opened2", JobState.OPENED, 200L); + Job closed1 = buildJob("closed1", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar"), buildMinDetector("foobar"))); + GetJobsStatsAction.Response.JobStats closed1JobStats = buildJobStats("closed1", JobState.CLOSED, 300L); + givenJobs(Arrays.asList(opened1, opened2, closed1), + Arrays.asList(opened1JobStats, opened2JobStats, closed1JobStats)); + + givenDatafeeds(Arrays.asList( + buildDatafeedStats(DatafeedState.STARTED), + buildDatafeedStats(DatafeedState.STARTED), + buildDatafeedStats(DatafeedState.STOPPED) + )); + + MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(settings.build(), clusterService, client, licenseState); + PlainActionFuture future = new PlainActionFuture<>(); + featureSet.usage(future); + XPackFeatureSet.Usage mlUsage = future.get(); + + BytesStreamOutput out = new BytesStreamOutput(); + mlUsage.writeTo(out); + XPackFeatureSet.Usage serializedUsage = new MachineLearningFeatureSet.Usage(out.bytes().streamInput()); + + for (XPackFeatureSet.Usage usage : Arrays.asList(mlUsage, serializedUsage)) { + assertThat(usage, is(notNullValue())); + assertThat(usage.name(), is(XPackPlugin.MACHINE_LEARNING)); + assertThat(usage.enabled(), is(true)); + assertThat(usage.available(), is(true)); + XContentSource source; + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + usage.toXContent(builder, ToXContent.EMPTY_PARAMS); + source = new XContentSource(builder); + } + + assertThat(source.getValue("jobs._all.count"), equalTo(3)); + assertThat(source.getValue("jobs._all.detectors.min"), equalTo(1.0)); + assertThat(source.getValue("jobs._all.detectors.max"), equalTo(3.0)); + assertThat(source.getValue("jobs._all.detectors.total"), equalTo(6.0)); + assertThat(source.getValue("jobs._all.detectors.avg"), equalTo(2.0)); + assertThat(source.getValue("jobs._all.model_size.min"), equalTo(100.0)); + assertThat(source.getValue("jobs._all.model_size.max"), equalTo(300.0)); + assertThat(source.getValue("jobs._all.model_size.total"), equalTo(600.0)); + assertThat(source.getValue("jobs._all.model_size.avg"), equalTo(200.0)); + + assertThat(source.getValue("jobs.opened.count"), equalTo(2)); + assertThat(source.getValue("jobs.opened.detectors.min"), equalTo(1.0)); + assertThat(source.getValue("jobs.opened.detectors.max"), equalTo(2.0)); + assertThat(source.getValue("jobs.opened.detectors.total"), equalTo(3.0)); + assertThat(source.getValue("jobs.opened.detectors.avg"), equalTo(1.5)); + assertThat(source.getValue("jobs.opened.model_size.min"), equalTo(100.0)); + assertThat(source.getValue("jobs.opened.model_size.max"), equalTo(200.0)); + assertThat(source.getValue("jobs.opened.model_size.total"), equalTo(300.0)); + assertThat(source.getValue("jobs.opened.model_size.avg"), equalTo(150.0)); + + assertThat(source.getValue("jobs.closed.count"), equalTo(1)); + assertThat(source.getValue("jobs.closed.detectors.min"), equalTo(3.0)); + assertThat(source.getValue("jobs.closed.detectors.max"), equalTo(3.0)); + assertThat(source.getValue("jobs.closed.detectors.total"), equalTo(3.0)); + assertThat(source.getValue("jobs.closed.detectors.avg"), equalTo(3.0)); + assertThat(source.getValue("jobs.closed.model_size.min"), equalTo(300.0)); + assertThat(source.getValue("jobs.closed.model_size.max"), equalTo(300.0)); + assertThat(source.getValue("jobs.closed.model_size.total"), equalTo(300.0)); + assertThat(source.getValue("jobs.closed.model_size.avg"), equalTo(300.0)); + + assertThat(source.getValue("jobs.opening"), is(nullValue())); + assertThat(source.getValue("jobs.closing"), is(nullValue())); + assertThat(source.getValue("jobs.failed"), is(nullValue())); + + assertThat(source.getValue("datafeeds._all.count"), equalTo(3)); + assertThat(source.getValue("datafeeds.started.count"), equalTo(2)); + assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1)); + } + } + + private void givenJobs(List jobs, List jobsStats) { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + for (Job job : jobs) { + mlMetadataBuilder.putJob(job, false); + } + ClusterState clusterState = new ClusterState.Builder(ClusterState.EMPTY_STATE) + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + doAnswer(invocationOnMock -> { + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new GetJobsStatsAction.Response( + new QueryPage<>(jobsStats, jobsStats.size(), Job.RESULTS_FIELD))); + return Void.TYPE; + }).when(client).execute(same(GetJobsStatsAction.INSTANCE), any(), any()); + } + + private void givenDatafeeds(List datafeedStats) { + doAnswer(invocationOnMock -> { + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new GetDatafeedsStatsAction.Response( + new QueryPage<>(datafeedStats, datafeedStats.size(), DatafeedConfig.RESULTS_FIELD))); + return Void.TYPE; + }).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any()); + } + + private static Detector buildMinDetector(String fieldName) { + Detector.Builder detectorBuilder = new Detector.Builder(); + detectorBuilder.setFunction("min"); + detectorBuilder.setFieldName(fieldName); + return detectorBuilder.build(); + } + + private static Job buildJob(String jobId, List detectors) { + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors); + return new Job.Builder(jobId) + .setCreateTime(new Date(randomNonNegativeLong())) + .setAnalysisConfig(analysisConfig) + .build(); + } + + private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes) { + ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId); + modelSizeStats.setModelBytes(modelBytes); + GetJobsStatsAction.Response.JobStats jobStats = mock(GetJobsStatsAction.Response.JobStats.class); + when(jobStats.getJobId()).thenReturn(jobId); + when(jobStats.getModelSizeStats()).thenReturn(modelSizeStats.build()); + when(jobStats.getState()).thenReturn(state); + return jobStats; + } + + private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats(DatafeedState state) { + GetDatafeedsStatsAction.Response.DatafeedStats stats = mock(GetDatafeedsStatsAction.Response.DatafeedStats.class); + when(stats.getDatafeedState()).thenReturn(state); + return stats; + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java new file mode 100644 index 00000000000..ae9b6a7360c --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class StatsAccumulatorTests extends ESTestCase { + + public void testGivenNoValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + assertThat(accumulator.getMin(), equalTo(0.0)); + assertThat(accumulator.getMax(), equalTo(0.0)); + assertThat(accumulator.getTotal(), equalTo(0.0)); + assertThat(accumulator.getAvg(), equalTo(0.0)); + } + + public void testGivenPositiveValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + + for (int i = 1; i <= 10; i++) { + accumulator.add(i); + } + + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(10.0)); + assertThat(accumulator.getTotal(), equalTo(55.0)); + assertThat(accumulator.getAvg(), equalTo(5.5)); + } + + public void testGivenNegativeValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + + for (int i = 1; i <= 10; i++) { + accumulator.add(-1 * i); + } + + assertThat(accumulator.getMin(), equalTo(-10.0)); + assertThat(accumulator.getMax(), equalTo(-1.0)); + assertThat(accumulator.getTotal(), equalTo(-55.0)); + assertThat(accumulator.getAvg(), equalTo(-5.5)); + } + + public void testAsMap() { + StatsAccumulator accumulator = new StatsAccumulator(); + accumulator.add(5.0); + accumulator.add(10.0); + + Map expectedMap = new HashMap<>(); + expectedMap.put("min", 5.0); + expectedMap.put("max", 10.0); + expectedMap.put("avg", 7.5); + expectedMap.put("total", 15.0); + assertThat(accumulator.asMap(), equalTo(expectedMap)); + } +} \ No newline at end of file