[ML] Add ml jobs and datafeeds in usage API (elastic/x-pack-elasticsearch#805)
This adds basic info about jobs and datafeeds, sufficient for the first release for supporting monitoring and phone-home. In particular, usage now displays: - job count for _all and by state - detectors statistics (min, max, mean, total) for _all jobs and by job state - model size statistics (min, max, mean, total) for _all jobs and by job state - datafeed count for _all and by state Relates elastic/x-pack-elasticsearch#660 Original commit: elastic/x-pack-elasticsearch@6e0da6c3db
This commit is contained in:
parent
ea67089cef
commit
1cfd9bd986
|
@ -330,6 +330,11 @@ public class MachineLearning implements ActionPlugin {
|
||||||
|
|
||||||
public Collection<Module> nodeModules() {
|
public Collection<Module> nodeModules() {
|
||||||
List<Module> modules = new ArrayList<>();
|
List<Module> modules = new ArrayList<>();
|
||||||
|
|
||||||
|
if (transportClientMode) {
|
||||||
|
return modules;
|
||||||
|
}
|
||||||
|
|
||||||
modules.add(b -> {
|
modules.add(b -> {
|
||||||
XPackPlugin.bindFeatureSet(b, MachineLearningFeatureSet.class);
|
XPackPlugin.bindFeatureSet(b, MachineLearningFeatureSet.class);
|
||||||
});
|
});
|
||||||
|
|
|
@ -5,33 +5,56 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml;
|
package org.elasticsearch.xpack.ml;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.Counter;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.license.XPackLicenseState;
|
import org.elasticsearch.license.XPackLicenseState;
|
||||||
import org.elasticsearch.xpack.XPackFeatureSet;
|
import org.elasticsearch.xpack.XPackFeatureSet;
|
||||||
import org.elasticsearch.xpack.XPackPlugin;
|
import org.elasticsearch.xpack.XPackPlugin;
|
||||||
import org.elasticsearch.xpack.XPackSettings;
|
import org.elasticsearch.xpack.XPackSettings;
|
||||||
|
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
|
||||||
|
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||||
|
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||||
import org.elasticsearch.xpack.ml.job.process.NativeController;
|
import org.elasticsearch.xpack.ml.job.process.NativeController;
|
||||||
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
|
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
|
||||||
|
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||||
|
import org.elasticsearch.xpack.ml.utils.StatsAccumulator;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
public class MachineLearningFeatureSet implements XPackFeatureSet {
|
public class MachineLearningFeatureSet implements XPackFeatureSet {
|
||||||
|
|
||||||
private final boolean enabled;
|
private final boolean enabled;
|
||||||
private final XPackLicenseState licenseState;
|
private final XPackLicenseState licenseState;
|
||||||
|
private final ClusterService clusterService;
|
||||||
|
private final Client client;
|
||||||
private final Map<String, Object> nativeCodeInfo;
|
private final Map<String, Object> nativeCodeInfo;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public MachineLearningFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState) {
|
public MachineLearningFeatureSet(Settings settings, ClusterService clusterService, Client client,
|
||||||
|
@Nullable XPackLicenseState licenseState) {
|
||||||
this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings);
|
this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings);
|
||||||
|
this.clusterService = Objects.requireNonNull(clusterService);
|
||||||
|
this.client = Objects.requireNonNull(client);
|
||||||
this.licenseState = licenseState;
|
this.licenseState = licenseState;
|
||||||
Map<String, Object> nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO;
|
Map<String, Object> nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO;
|
||||||
// Don't try to get the native code version in the transport client - the controller process won't be running
|
// Don't try to get the native code version in the transport client - the controller process won't be running
|
||||||
|
@ -79,17 +102,177 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
|
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
|
||||||
listener.onResponse(new Usage(available(), enabled()));
|
ClusterState state = clusterService.state();
|
||||||
|
MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE);
|
||||||
|
new Usage.Retriever(client, mlMetadata, available(), enabled()).execute(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Usage extends XPackFeatureSet.Usage {
|
public static class Usage extends XPackFeatureSet.Usage {
|
||||||
|
|
||||||
public Usage(StreamInput input) throws IOException {
|
private static final String ALL = "_all";
|
||||||
super(input);
|
private static final String JOBS_FIELD = "jobs";
|
||||||
|
private static final String DATAFEEDS_FIELD = "datafeeds";
|
||||||
|
private static final String COUNT = "count";
|
||||||
|
private static final String DETECTORS = "detectors";
|
||||||
|
private static final String MODEL_SIZE = "model_size";
|
||||||
|
|
||||||
|
private final Map<String, Object> jobsUsage;
|
||||||
|
private final Map<String, Object> datafeedsUsage;
|
||||||
|
|
||||||
|
public Usage(boolean available, boolean enabled, Map<String, Object> jobsUsage,
|
||||||
|
Map<String, Object> datafeedsUsage) {
|
||||||
|
super(XPackPlugin.MACHINE_LEARNING, available, enabled);
|
||||||
|
this.jobsUsage = Objects.requireNonNull(jobsUsage);
|
||||||
|
this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Usage(boolean available, boolean enabled) {
|
public Usage(StreamInput in) throws IOException {
|
||||||
super(XPackPlugin.MACHINE_LEARNING, available, enabled);
|
super(in);
|
||||||
|
this.jobsUsage = in.readMap();
|
||||||
|
this.datafeedsUsage = in.readMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
super.writeTo(out);
|
||||||
|
out.writeMap(jobsUsage);
|
||||||
|
out.writeMap(datafeedsUsage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
super.innerXContent(builder, params);
|
||||||
|
if (jobsUsage != null) {
|
||||||
|
builder.field(JOBS_FIELD, jobsUsage);
|
||||||
|
}
|
||||||
|
if (datafeedsUsage != null) {
|
||||||
|
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Retriever {
|
||||||
|
|
||||||
|
private final Client client;
|
||||||
|
private final MlMetadata mlMetadata;
|
||||||
|
private final boolean available;
|
||||||
|
private final boolean enabled;
|
||||||
|
private Map<String, Object> jobsUsage;
|
||||||
|
private Map<String, Object> datafeedsUsage;
|
||||||
|
|
||||||
|
public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled) {
|
||||||
|
this.client = Objects.requireNonNull(client);
|
||||||
|
this.mlMetadata = mlMetadata;
|
||||||
|
this.available = available;
|
||||||
|
this.enabled = enabled;
|
||||||
|
this.jobsUsage = new LinkedHashMap<>();
|
||||||
|
this.datafeedsUsage = new LinkedHashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(ActionListener<XPackFeatureSet.Usage> listener) {
|
||||||
|
if (enabled == false) {
|
||||||
|
listener.onResponse(new Usage(available, enabled, Collections.emptyMap(), Collections.emptyMap()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2. Extract usage from datafeeds stats and return usage response
|
||||||
|
ActionListener<GetDatafeedsStatsAction.Response> datafeedStatsListener =
|
||||||
|
ActionListener.wrap(response -> {
|
||||||
|
addDatafeedsUsage(response);
|
||||||
|
listener.onResponse(new Usage(
|
||||||
|
available, enabled, jobsUsage, datafeedsUsage));
|
||||||
|
},
|
||||||
|
error -> {
|
||||||
|
listener.onFailure(error);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Step 1. Extract usage from jobs stats and then request stats for all datafeeds
|
||||||
|
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(Job.ALL);
|
||||||
|
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
|
||||||
|
response -> {
|
||||||
|
addJobsUsage(response);
|
||||||
|
GetDatafeedsStatsAction.Request datafeedStatsRequest =
|
||||||
|
new GetDatafeedsStatsAction.Request(GetDatafeedsStatsAction.ALL);
|
||||||
|
client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest,
|
||||||
|
datafeedStatsListener);
|
||||||
|
},
|
||||||
|
error -> {
|
||||||
|
listener.onFailure(error);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Step 0. Kick off the chain of callbacks by requesting jobs stats
|
||||||
|
client.execute(GetJobsStatsAction.INSTANCE, jobStatsRequest, jobStatsListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addJobsUsage(GetJobsStatsAction.Response response) {
|
||||||
|
StatsAccumulator allJobsDetectorsStats = new StatsAccumulator();
|
||||||
|
StatsAccumulator allJobsModelSizeStats = new StatsAccumulator();
|
||||||
|
|
||||||
|
Map<JobState, Counter> jobCountByState = new HashMap<>();
|
||||||
|
Map<JobState, StatsAccumulator> detectorStatsByState = new HashMap<>();
|
||||||
|
Map<JobState, StatsAccumulator> modelSizeStatsByState = new HashMap<>();
|
||||||
|
|
||||||
|
Map<String, Job> jobs = mlMetadata.getJobs();
|
||||||
|
for (GetJobsStatsAction.Response.JobStats jobStats
|
||||||
|
: response.getResponse().results()) {
|
||||||
|
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
|
||||||
|
int detectorsCount = jobs.get(jobStats.getJobId()).getAnalysisConfig()
|
||||||
|
.getDetectors().size();
|
||||||
|
double modelSize = modelSizeStats == null ? 0.0
|
||||||
|
: jobStats.getModelSizeStats().getModelBytes();
|
||||||
|
|
||||||
|
allJobsDetectorsStats.add(detectorsCount);
|
||||||
|
allJobsModelSizeStats.add(modelSize);
|
||||||
|
|
||||||
|
JobState jobState = jobStats.getState();
|
||||||
|
jobCountByState.computeIfAbsent(jobState, js -> Counter.newCounter()).addAndGet(1);
|
||||||
|
detectorStatsByState.computeIfAbsent(jobState,
|
||||||
|
js -> new StatsAccumulator()).add(detectorsCount);
|
||||||
|
modelSizeStatsByState.computeIfAbsent(jobState,
|
||||||
|
js -> new StatsAccumulator()).add(modelSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
jobsUsage.put(ALL, createJobUsageEntry(jobs.size(), allJobsDetectorsStats,
|
||||||
|
allJobsModelSizeStats));
|
||||||
|
for (JobState jobState : jobCountByState.keySet()) {
|
||||||
|
jobsUsage.put(jobState.name().toLowerCase(Locale.ROOT), createJobUsageEntry(
|
||||||
|
jobCountByState.get(jobState).get(),
|
||||||
|
detectorStatsByState.get(jobState),
|
||||||
|
modelSizeStatsByState.get(jobState)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> createJobUsageEntry(long count, StatsAccumulator detectorStats,
|
||||||
|
StatsAccumulator modelSizeStats) {
|
||||||
|
Map<String, Object> usage = new HashMap<>();
|
||||||
|
usage.put(COUNT, count);
|
||||||
|
usage.put(DETECTORS, detectorStats.asMap());
|
||||||
|
usage.put(MODEL_SIZE, modelSizeStats.asMap());
|
||||||
|
return usage;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addDatafeedsUsage(GetDatafeedsStatsAction.Response response) {
|
||||||
|
Map<DatafeedState, Counter> datafeedCountByState = new HashMap<>();
|
||||||
|
|
||||||
|
for (GetDatafeedsStatsAction.Response.DatafeedStats datafeedStats
|
||||||
|
: response.getResponse().results()) {
|
||||||
|
datafeedCountByState.computeIfAbsent(datafeedStats.getDatafeedState(),
|
||||||
|
ds -> Counter.newCounter()).addAndGet(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
datafeedsUsage.put(ALL, createDatafeedUsageEntry(response.getResponse().count()));
|
||||||
|
for (DatafeedState datafeedState : datafeedCountByState.keySet()) {
|
||||||
|
datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT),
|
||||||
|
createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> createDatafeedUsageEntry(long count) {
|
||||||
|
Map<String, Object> usage = new HashMap<>();
|
||||||
|
usage.put(COUNT, count);
|
||||||
|
return usage;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@ import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
||||||
import org.elasticsearch.search.fetch.StoredFieldsContext;
|
import org.elasticsearch.search.fetch.StoredFieldsContext;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
|
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.ml.utils;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to collect min, max, avg and total statistics for a quantity
|
||||||
|
*/
|
||||||
|
public class StatsAccumulator {
|
||||||
|
|
||||||
|
private static final String MIN = "min";
|
||||||
|
private static final String MAX = "max";
|
||||||
|
private static final String AVG = "avg";
|
||||||
|
private static final String TOTAL = "total";
|
||||||
|
|
||||||
|
private long count;
|
||||||
|
private double total;
|
||||||
|
private Double min;
|
||||||
|
private Double max;
|
||||||
|
|
||||||
|
public void add(double value) {
|
||||||
|
count++;
|
||||||
|
total += value;
|
||||||
|
min = min == null ? value : (value < min ? value : min);
|
||||||
|
max = max == null ? value : (value > max ? value : max);
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getMin() {
|
||||||
|
return min == null ? 0.0 : min;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getMax() {
|
||||||
|
return max == null ? 0.0 : max;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getAvg() {
|
||||||
|
return count == 0.0 ? 0.0 : total/count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getTotal() {
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Double> asMap() {
|
||||||
|
Map<String, Double> map = new HashMap<>();
|
||||||
|
map.put(MIN, getMin());
|
||||||
|
map.put(MAX, getMax());
|
||||||
|
map.put(AVG, getAvg());
|
||||||
|
map.put(TOTAL, getTotal());
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,30 +5,67 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml;
|
package org.elasticsearch.xpack.ml;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.license.XPackLicenseState;
|
import org.elasticsearch.license.XPackLicenseState;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.XPackFeatureSet;
|
import org.elasticsearch.xpack.XPackFeatureSet;
|
||||||
import org.elasticsearch.xpack.XPackFeatureSet.Usage;
|
import org.elasticsearch.xpack.XPackFeatureSet.Usage;
|
||||||
|
import org.elasticsearch.xpack.XPackPlugin;
|
||||||
|
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
|
||||||
|
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||||
|
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||||
|
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||||
|
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||||
|
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||||
|
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.same;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class MachineLearningFeatureSetTests extends ESTestCase {
|
public class MachineLearningFeatureSetTests extends ESTestCase {
|
||||||
|
|
||||||
|
private ClusterService clusterService;
|
||||||
|
private Client client;
|
||||||
private XPackLicenseState licenseState;
|
private XPackLicenseState licenseState;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws Exception {
|
public void init() throws Exception {
|
||||||
|
clusterService = mock(ClusterService.class);
|
||||||
|
client = mock(Client.class);
|
||||||
licenseState = mock(XPackLicenseState.class);
|
licenseState = mock(XPackLicenseState.class);
|
||||||
|
givenJobs(Collections.emptyList(), Collections.emptyList());
|
||||||
|
givenDatafeeds(Collections.emptyList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAvailable() throws Exception {
|
public void testAvailable() throws Exception {
|
||||||
MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(Settings.EMPTY, licenseState);
|
MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(Settings.EMPTY, clusterService, client, licenseState);
|
||||||
boolean available = randomBoolean();
|
boolean available = randomBoolean();
|
||||||
when(licenseState.isMachineLearningAllowed()).thenReturn(available);
|
when(licenseState.isMachineLearningAllowed()).thenReturn(available);
|
||||||
assertThat(featureSet.available(), is(available));
|
assertThat(featureSet.available(), is(available));
|
||||||
|
@ -52,7 +89,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
|
||||||
settings.put("xpack.ml.enabled", enabled);
|
settings.put("xpack.ml.enabled", enabled);
|
||||||
}
|
}
|
||||||
boolean expected = enabled || useDefault;
|
boolean expected = enabled || useDefault;
|
||||||
MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(settings.build(), licenseState);
|
MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(settings.build(), clusterService, client, licenseState);
|
||||||
assertThat(featureSet.enabled(), is(expected));
|
assertThat(featureSet.enabled(), is(expected));
|
||||||
PlainActionFuture<Usage> future = new PlainActionFuture<>();
|
PlainActionFuture<Usage> future = new PlainActionFuture<>();
|
||||||
featureSet.usage(future);
|
featureSet.usage(future);
|
||||||
|
@ -65,4 +102,144 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
|
||||||
assertThat(serializedUsage.enabled(), is(expected));
|
assertThat(serializedUsage.enabled(), is(expected));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testUsage() throws Exception {
|
||||||
|
when(licenseState.isMachineLearningAllowed()).thenReturn(true);
|
||||||
|
Settings.Builder settings = Settings.builder();
|
||||||
|
settings.put("xpack.ml.enabled", true);
|
||||||
|
|
||||||
|
Job opened1 = buildJob("opened1", Arrays.asList(buildMinDetector("foo")));
|
||||||
|
GetJobsStatsAction.Response.JobStats opened1JobStats = buildJobStats("opened1", JobState.OPENED, 100L);
|
||||||
|
Job opened2 = buildJob("opened2", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar")));
|
||||||
|
GetJobsStatsAction.Response.JobStats opened2JobStats = buildJobStats("opened2", JobState.OPENED, 200L);
|
||||||
|
Job closed1 = buildJob("closed1", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar"), buildMinDetector("foobar")));
|
||||||
|
GetJobsStatsAction.Response.JobStats closed1JobStats = buildJobStats("closed1", JobState.CLOSED, 300L);
|
||||||
|
givenJobs(Arrays.asList(opened1, opened2, closed1),
|
||||||
|
Arrays.asList(opened1JobStats, opened2JobStats, closed1JobStats));
|
||||||
|
|
||||||
|
givenDatafeeds(Arrays.asList(
|
||||||
|
buildDatafeedStats(DatafeedState.STARTED),
|
||||||
|
buildDatafeedStats(DatafeedState.STARTED),
|
||||||
|
buildDatafeedStats(DatafeedState.STOPPED)
|
||||||
|
));
|
||||||
|
|
||||||
|
MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(settings.build(), clusterService, client, licenseState);
|
||||||
|
PlainActionFuture<Usage> future = new PlainActionFuture<>();
|
||||||
|
featureSet.usage(future);
|
||||||
|
XPackFeatureSet.Usage mlUsage = future.get();
|
||||||
|
|
||||||
|
BytesStreamOutput out = new BytesStreamOutput();
|
||||||
|
mlUsage.writeTo(out);
|
||||||
|
XPackFeatureSet.Usage serializedUsage = new MachineLearningFeatureSet.Usage(out.bytes().streamInput());
|
||||||
|
|
||||||
|
for (XPackFeatureSet.Usage usage : Arrays.asList(mlUsage, serializedUsage)) {
|
||||||
|
assertThat(usage, is(notNullValue()));
|
||||||
|
assertThat(usage.name(), is(XPackPlugin.MACHINE_LEARNING));
|
||||||
|
assertThat(usage.enabled(), is(true));
|
||||||
|
assertThat(usage.available(), is(true));
|
||||||
|
XContentSource source;
|
||||||
|
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
|
||||||
|
usage.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||||
|
source = new XContentSource(builder);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(source.getValue("jobs._all.count"), equalTo(3));
|
||||||
|
assertThat(source.getValue("jobs._all.detectors.min"), equalTo(1.0));
|
||||||
|
assertThat(source.getValue("jobs._all.detectors.max"), equalTo(3.0));
|
||||||
|
assertThat(source.getValue("jobs._all.detectors.total"), equalTo(6.0));
|
||||||
|
assertThat(source.getValue("jobs._all.detectors.avg"), equalTo(2.0));
|
||||||
|
assertThat(source.getValue("jobs._all.model_size.min"), equalTo(100.0));
|
||||||
|
assertThat(source.getValue("jobs._all.model_size.max"), equalTo(300.0));
|
||||||
|
assertThat(source.getValue("jobs._all.model_size.total"), equalTo(600.0));
|
||||||
|
assertThat(source.getValue("jobs._all.model_size.avg"), equalTo(200.0));
|
||||||
|
|
||||||
|
assertThat(source.getValue("jobs.opened.count"), equalTo(2));
|
||||||
|
assertThat(source.getValue("jobs.opened.detectors.min"), equalTo(1.0));
|
||||||
|
assertThat(source.getValue("jobs.opened.detectors.max"), equalTo(2.0));
|
||||||
|
assertThat(source.getValue("jobs.opened.detectors.total"), equalTo(3.0));
|
||||||
|
assertThat(source.getValue("jobs.opened.detectors.avg"), equalTo(1.5));
|
||||||
|
assertThat(source.getValue("jobs.opened.model_size.min"), equalTo(100.0));
|
||||||
|
assertThat(source.getValue("jobs.opened.model_size.max"), equalTo(200.0));
|
||||||
|
assertThat(source.getValue("jobs.opened.model_size.total"), equalTo(300.0));
|
||||||
|
assertThat(source.getValue("jobs.opened.model_size.avg"), equalTo(150.0));
|
||||||
|
|
||||||
|
assertThat(source.getValue("jobs.closed.count"), equalTo(1));
|
||||||
|
assertThat(source.getValue("jobs.closed.detectors.min"), equalTo(3.0));
|
||||||
|
assertThat(source.getValue("jobs.closed.detectors.max"), equalTo(3.0));
|
||||||
|
assertThat(source.getValue("jobs.closed.detectors.total"), equalTo(3.0));
|
||||||
|
assertThat(source.getValue("jobs.closed.detectors.avg"), equalTo(3.0));
|
||||||
|
assertThat(source.getValue("jobs.closed.model_size.min"), equalTo(300.0));
|
||||||
|
assertThat(source.getValue("jobs.closed.model_size.max"), equalTo(300.0));
|
||||||
|
assertThat(source.getValue("jobs.closed.model_size.total"), equalTo(300.0));
|
||||||
|
assertThat(source.getValue("jobs.closed.model_size.avg"), equalTo(300.0));
|
||||||
|
|
||||||
|
assertThat(source.getValue("jobs.opening"), is(nullValue()));
|
||||||
|
assertThat(source.getValue("jobs.closing"), is(nullValue()));
|
||||||
|
assertThat(source.getValue("jobs.failed"), is(nullValue()));
|
||||||
|
|
||||||
|
assertThat(source.getValue("datafeeds._all.count"), equalTo(3));
|
||||||
|
assertThat(source.getValue("datafeeds.started.count"), equalTo(2));
|
||||||
|
assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void givenJobs(List<Job> jobs, List<GetJobsStatsAction.Response.JobStats> jobsStats) {
|
||||||
|
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||||
|
for (Job job : jobs) {
|
||||||
|
mlMetadataBuilder.putJob(job, false);
|
||||||
|
}
|
||||||
|
ClusterState clusterState = new ClusterState.Builder(ClusterState.EMPTY_STATE)
|
||||||
|
.metaData(new MetaData.Builder()
|
||||||
|
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
|
||||||
|
.build();
|
||||||
|
when(clusterService.state()).thenReturn(clusterState);
|
||||||
|
|
||||||
|
doAnswer(invocationOnMock -> {
|
||||||
|
ActionListener<GetJobsStatsAction.Response> listener =
|
||||||
|
(ActionListener<GetJobsStatsAction.Response>) invocationOnMock.getArguments()[2];
|
||||||
|
listener.onResponse(new GetJobsStatsAction.Response(
|
||||||
|
new QueryPage<>(jobsStats, jobsStats.size(), Job.RESULTS_FIELD)));
|
||||||
|
return Void.TYPE;
|
||||||
|
}).when(client).execute(same(GetJobsStatsAction.INSTANCE), any(), any());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void givenDatafeeds(List<GetDatafeedsStatsAction.Response.DatafeedStats> datafeedStats) {
|
||||||
|
doAnswer(invocationOnMock -> {
|
||||||
|
ActionListener<GetDatafeedsStatsAction.Response> listener =
|
||||||
|
(ActionListener<GetDatafeedsStatsAction.Response>) invocationOnMock.getArguments()[2];
|
||||||
|
listener.onResponse(new GetDatafeedsStatsAction.Response(
|
||||||
|
new QueryPage<>(datafeedStats, datafeedStats.size(), DatafeedConfig.RESULTS_FIELD)));
|
||||||
|
return Void.TYPE;
|
||||||
|
}).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Detector buildMinDetector(String fieldName) {
|
||||||
|
Detector.Builder detectorBuilder = new Detector.Builder();
|
||||||
|
detectorBuilder.setFunction("min");
|
||||||
|
detectorBuilder.setFieldName(fieldName);
|
||||||
|
return detectorBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Job buildJob(String jobId, List<Detector> detectors) {
|
||||||
|
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors);
|
||||||
|
return new Job.Builder(jobId)
|
||||||
|
.setCreateTime(new Date(randomNonNegativeLong()))
|
||||||
|
.setAnalysisConfig(analysisConfig)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes) {
|
||||||
|
ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId);
|
||||||
|
modelSizeStats.setModelBytes(modelBytes);
|
||||||
|
GetJobsStatsAction.Response.JobStats jobStats = mock(GetJobsStatsAction.Response.JobStats.class);
|
||||||
|
when(jobStats.getJobId()).thenReturn(jobId);
|
||||||
|
when(jobStats.getModelSizeStats()).thenReturn(modelSizeStats.build());
|
||||||
|
when(jobStats.getState()).thenReturn(state);
|
||||||
|
return jobStats;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats(DatafeedState state) {
|
||||||
|
GetDatafeedsStatsAction.Response.DatafeedStats stats = mock(GetDatafeedsStatsAction.Response.DatafeedStats.class);
|
||||||
|
when(stats.getDatafeedState()).thenReturn(state);
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.ml.utils;
|
||||||
|
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
public class StatsAccumulatorTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testGivenNoValues() {
|
||||||
|
StatsAccumulator accumulator = new StatsAccumulator();
|
||||||
|
assertThat(accumulator.getMin(), equalTo(0.0));
|
||||||
|
assertThat(accumulator.getMax(), equalTo(0.0));
|
||||||
|
assertThat(accumulator.getTotal(), equalTo(0.0));
|
||||||
|
assertThat(accumulator.getAvg(), equalTo(0.0));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGivenPositiveValues() {
|
||||||
|
StatsAccumulator accumulator = new StatsAccumulator();
|
||||||
|
|
||||||
|
for (int i = 1; i <= 10; i++) {
|
||||||
|
accumulator.add(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(accumulator.getMin(), equalTo(1.0));
|
||||||
|
assertThat(accumulator.getMax(), equalTo(10.0));
|
||||||
|
assertThat(accumulator.getTotal(), equalTo(55.0));
|
||||||
|
assertThat(accumulator.getAvg(), equalTo(5.5));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGivenNegativeValues() {
|
||||||
|
StatsAccumulator accumulator = new StatsAccumulator();
|
||||||
|
|
||||||
|
for (int i = 1; i <= 10; i++) {
|
||||||
|
accumulator.add(-1 * i);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(accumulator.getMin(), equalTo(-10.0));
|
||||||
|
assertThat(accumulator.getMax(), equalTo(-1.0));
|
||||||
|
assertThat(accumulator.getTotal(), equalTo(-55.0));
|
||||||
|
assertThat(accumulator.getAvg(), equalTo(-5.5));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAsMap() {
|
||||||
|
StatsAccumulator accumulator = new StatsAccumulator();
|
||||||
|
accumulator.add(5.0);
|
||||||
|
accumulator.add(10.0);
|
||||||
|
|
||||||
|
Map<String, Double> expectedMap = new HashMap<>();
|
||||||
|
expectedMap.put("min", 5.0);
|
||||||
|
expectedMap.put("max", 10.0);
|
||||||
|
expectedMap.put("avg", 7.5);
|
||||||
|
expectedMap.put("total", 15.0);
|
||||||
|
assertThat(accumulator.asMap(), equalTo(expectedMap));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue