* [ML] Adding data frame analytics stats to _usage API (#45820) * [ML] Adding data frame analytics stats to _usage API * making the size of analytics stats 10k * adjusting backport
This commit is contained in:
parent
af2df9f221
commit
8e3c54fff7
|
@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
|
|||
import org.elasticsearch.xpack.core.XPackField;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -27,16 +28,23 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
|
|||
public static final String MODEL_SIZE = "model_size";
|
||||
public static final String CREATED_BY = "created_by";
|
||||
public static final String NODE_COUNT = "node_count";
|
||||
public static final String DATA_FRAME_ANALYTICS_JOBS_FIELD = "data_frame_analytics_jobs";
|
||||
|
||||
private final Map<String, Object> jobsUsage;
|
||||
private final Map<String, Object> datafeedsUsage;
|
||||
private final Map<String, Object> analyticsUsage;
|
||||
private final int nodeCount;
|
||||
|
||||
public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map<String, Object> jobsUsage,
|
||||
Map<String, Object> datafeedsUsage, int nodeCount) {
|
||||
public MachineLearningFeatureSetUsage(boolean available,
|
||||
boolean enabled,
|
||||
Map<String, Object> jobsUsage,
|
||||
Map<String, Object> datafeedsUsage,
|
||||
Map<String, Object> analyticsUsage,
|
||||
int nodeCount) {
|
||||
super(XPackField.MACHINE_LEARNING, available, enabled);
|
||||
this.jobsUsage = Objects.requireNonNull(jobsUsage);
|
||||
this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage);
|
||||
this.analyticsUsage = Objects.requireNonNull(analyticsUsage);
|
||||
this.nodeCount = nodeCount;
|
||||
}
|
||||
|
||||
|
@ -44,32 +52,37 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
|
|||
super(in);
|
||||
this.jobsUsage = in.readMap();
|
||||
this.datafeedsUsage = in.readMap();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
|
||||
this.analyticsUsage = in.readMap();
|
||||
} else {
|
||||
this.analyticsUsage = Collections.emptyMap();
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
|
||||
this.nodeCount = in.readInt();
|
||||
} else {
|
||||
this.nodeCount = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeMap(jobsUsage);
|
||||
out.writeMap(datafeedsUsage);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
|
||||
out.writeMap(analyticsUsage);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
|
||||
out.writeInt(nodeCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
super.innerXContent(builder, params);
|
||||
if (jobsUsage != null) {
|
||||
builder.field(JOBS_FIELD, jobsUsage);
|
||||
}
|
||||
if (datafeedsUsage != null) {
|
||||
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
|
||||
}
|
||||
builder.field(JOBS_FIELD, jobsUsage);
|
||||
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
|
||||
builder.field(DATA_FRAME_ANALYTICS_JOBS_FIELD, analyticsUsage);
|
||||
if (nodeCount >= 0) {
|
||||
builder.field(NODE_COUNT, nodeCount);
|
||||
}
|
||||
|
|
|
@ -24,10 +24,13 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
|
|||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
|
@ -157,6 +160,7 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
|
|||
private final boolean enabled;
|
||||
private Map<String, Object> jobsUsage;
|
||||
private Map<String, Object> datafeedsUsage;
|
||||
private Map<String, Object> analyticsUsage;
|
||||
private int nodeCount;
|
||||
|
||||
public Retriever(Client client, JobManagerHolder jobManagerHolder, boolean available, boolean enabled, int nodeCount) {
|
||||
|
@ -166,6 +170,7 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
|
|||
this.enabled = enabled;
|
||||
this.jobsUsage = new LinkedHashMap<>();
|
||||
this.datafeedsUsage = new LinkedHashMap<>();
|
||||
this.analyticsUsage = new LinkedHashMap<>();
|
||||
this.nodeCount = nodeCount;
|
||||
}
|
||||
|
||||
|
@ -173,19 +178,39 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
|
|||
// empty holder means either ML disabled or transport client mode
|
||||
if (jobManagerHolder.isEmpty()) {
|
||||
listener.onResponse(
|
||||
new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), 0));
|
||||
new MachineLearningFeatureSetUsage(available,
|
||||
enabled,
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
0));
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 3. Extract usage from data frame analytics and return usage response
|
||||
ActionListener<GetDataFrameAnalyticsStatsAction.Response> dataframeAnalyticsListener = ActionListener.wrap(
|
||||
response -> {
|
||||
addDataFrameAnalyticsUsage(response, analyticsUsage);
|
||||
listener.onResponse(new MachineLearningFeatureSetUsage(available,
|
||||
enabled,
|
||||
jobsUsage,
|
||||
datafeedsUsage,
|
||||
analyticsUsage,
|
||||
nodeCount));
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
// Step 2. Extract usage from datafeeds stats and return usage response
|
||||
ActionListener<GetDatafeedsStatsAction.Response> datafeedStatsListener =
|
||||
ActionListener.wrap(response -> {
|
||||
addDatafeedsUsage(response);
|
||||
listener.onResponse(new MachineLearningFeatureSetUsage(
|
||||
available, enabled, jobsUsage, datafeedsUsage, nodeCount));
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
ActionListener.wrap(response -> {
|
||||
addDatafeedsUsage(response);
|
||||
GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest =
|
||||
new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL);
|
||||
dataframeAnalyticsStatsRequest.setPageParams(new PageParams(0, 10_000));
|
||||
client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener);
|
||||
},
|
||||
listener::onFailure);
|
||||
|
||||
// Step 1. Extract usage from jobs stats and then request stats for all datafeeds
|
||||
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(MetaData.ALL);
|
||||
|
@ -283,17 +308,31 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
|
|||
ds -> Counter.newCounter()).addAndGet(1);
|
||||
}
|
||||
|
||||
datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createDatafeedUsageEntry(response.getResponse().count()));
|
||||
datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
|
||||
for (DatafeedState datafeedState : datafeedCountByState.keySet()) {
|
||||
datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT),
|
||||
createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get()));
|
||||
createCountUsageEntry(datafeedCountByState.get(datafeedState).get()));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> createDatafeedUsageEntry(long count) {
|
||||
private Map<String, Object> createCountUsageEntry(long count) {
|
||||
Map<String, Object> usage = new HashMap<>();
|
||||
usage.put(MachineLearningFeatureSetUsage.COUNT, count);
|
||||
return usage;
|
||||
}
|
||||
|
||||
private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response,
|
||||
Map<String, Object> dataframeAnalyticsUsage) {
|
||||
Map<DataFrameAnalyticsState, Counter> dataFrameAnalyticsStateCounterMap = new HashMap<>();
|
||||
|
||||
for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) {
|
||||
dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1);
|
||||
}
|
||||
dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
|
||||
for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) {
|
||||
dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT),
|
||||
createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,10 +33,13 @@ import org.elasticsearch.xpack.core.XPackField;
|
|||
import org.elasticsearch.xpack.core.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
||||
|
@ -94,6 +97,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
|
|||
when(clusterService.state()).thenReturn(clusterState);
|
||||
givenJobs(Collections.emptyList(), Collections.emptyList());
|
||||
givenDatafeeds(Collections.emptyList());
|
||||
givenDataFrameAnalytics(Collections.emptyList());
|
||||
}
|
||||
|
||||
public void testIsRunningOnMlPlatform() {
|
||||
|
@ -171,6 +175,11 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
|
|||
buildDatafeedStats(DatafeedState.STARTED),
|
||||
buildDatafeedStats(DatafeedState.STOPPED)
|
||||
));
|
||||
givenDataFrameAnalytics(Arrays.asList(
|
||||
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
|
||||
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
|
||||
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STARTED)
|
||||
));
|
||||
|
||||
MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()),
|
||||
clusterService, client, licenseState, jobManagerHolder);
|
||||
|
@ -237,6 +246,10 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
|
|||
assertThat(source.getValue("datafeeds.started.count"), equalTo(2));
|
||||
assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1));
|
||||
|
||||
assertThat(source.getValue("data_frame_analytics_jobs._all.count"), equalTo(3));
|
||||
assertThat(source.getValue("data_frame_analytics_jobs.started.count"), equalTo(1));
|
||||
assertThat(source.getValue("data_frame_analytics_jobs.stopped.count"), equalTo(2));
|
||||
|
||||
assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11));
|
||||
assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2));
|
||||
|
||||
|
@ -418,6 +431,19 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
|
|||
}).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any());
|
||||
}
|
||||
|
||||
private void givenDataFrameAnalytics(List<GetDataFrameAnalyticsStatsAction.Response.Stats> dataFrameAnalyticsStats) {
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener =
|
||||
(ActionListener<GetDataFrameAnalyticsStatsAction.Response>) invocationOnMock.getArguments()[2];
|
||||
listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(
|
||||
new QueryPage<>(dataFrameAnalyticsStats,
|
||||
dataFrameAnalyticsStats.size(),
|
||||
GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
|
||||
return Void.TYPE;
|
||||
}).when(client).execute(same(GetDataFrameAnalyticsStatsAction.INSTANCE), any(), any());
|
||||
}
|
||||
|
||||
private static Detector buildMinDetector(String fieldName) {
|
||||
Detector.Builder detectorBuilder = new Detector.Builder();
|
||||
detectorBuilder.setFunction("min");
|
||||
|
@ -458,6 +484,12 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
|
|||
return stats;
|
||||
}
|
||||
|
||||
private static GetDataFrameAnalyticsStatsAction.Response.Stats buildDataFrameAnalyticsStats(DataFrameAnalyticsState state) {
|
||||
GetDataFrameAnalyticsStatsAction.Response.Stats stats = mock(GetDataFrameAnalyticsStatsAction.Response.Stats.class);
|
||||
when(stats.getState()).thenReturn(state);
|
||||
return stats;
|
||||
}
|
||||
|
||||
private static ForecastStats buildForecastStats(long numberOfForecasts) {
|
||||
return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue