[7.x] [ML] Add _cat/ml/data_frame/analytics API (#52260) (#52312)

This commit is contained in:
Przemysław Witek 2020-02-13 16:55:47 +01:00 committed by GitHub
parent ea6f0e39bc
commit 0da3af7581
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 390 additions and 21 deletions

View File

@ -254,6 +254,7 @@ import org.elasticsearch.xpack.ml.rest.calendar.RestGetCalendarsAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPostCalendarEventAction; import org.elasticsearch.xpack.ml.rest.calendar.RestPostCalendarEventAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarAction; import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarJobAction; import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarJobAction;
import org.elasticsearch.xpack.ml.rest.cat.RestCatDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.rest.cat.RestCatDatafeedsAction; import org.elasticsearch.xpack.ml.rest.cat.RestCatDatafeedsAction;
import org.elasticsearch.xpack.ml.rest.cat.RestCatJobsAction; import org.elasticsearch.xpack.ml.rest.cat.RestCatJobsAction;
import org.elasticsearch.xpack.ml.rest.cat.RestCatTrainedModelsAction; import org.elasticsearch.xpack.ml.rest.cat.RestCatTrainedModelsAction;
@ -788,7 +789,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
// CAT Handlers // CAT Handlers
new RestCatJobsAction(), new RestCatJobsAction(),
new RestCatTrainedModelsAction(), new RestCatTrainedModelsAction(),
new RestCatDatafeedsAction() new RestCatDatafeedsAction(),
new RestCatDataFrameAnalyticsAction()
); );
} }

View File

@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.cat;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.rest.action.cat.AbstractCatAction;
import org.elasticsearch.rest.action.cat.RestTable;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.Response.Stats;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestCatDataFrameAnalyticsAction extends AbstractCatAction {
@Override
public List<Route> routes() {
return unmodifiableList(asList(
new Route(GET, "_cat/ml/data_frame/analytics/{" + DataFrameAnalyticsConfig.ID.getPreferredName() + "}"),
new Route(GET, "_cat/ml/data_frame/analytics")));
}
@Override
public String getName() {
return "cat_ml_get_data_frame_analytics_action";
}
@Override
protected RestChannelConsumer doCatRequest(RestRequest restRequest, NodeClient client) {
String dataFrameAnalyticsId = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
if (Strings.isNullOrEmpty(dataFrameAnalyticsId)) {
dataFrameAnalyticsId = MetaData.ALL;
}
GetDataFrameAnalyticsAction.Request getRequest = new GetDataFrameAnalyticsAction.Request(dataFrameAnalyticsId);
getRequest.setAllowNoResources(
restRequest.paramAsBoolean(
GetDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName(), getRequest.isAllowNoResources()));
GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(dataFrameAnalyticsId);
getStatsRequest.setAllowNoMatch(true);
return channel -> client.execute(
GetDataFrameAnalyticsAction.INSTANCE, getRequest, new RestActionListener<GetDataFrameAnalyticsAction.Response>(channel) {
@Override
public void processResponse(GetDataFrameAnalyticsAction.Response getResponse) {
client.execute(
GetDataFrameAnalyticsStatsAction.INSTANCE,
getStatsRequest,
new RestResponseListener<GetDataFrameAnalyticsStatsAction.Response>(channel) {
@Override
public RestResponse buildResponse(GetDataFrameAnalyticsStatsAction.Response getStatsResponse) throws Exception {
return RestTable.buildResponse(buildTable(getResponse, getStatsResponse), channel);
}
});
}
});
}
@Override
protected void documentation(StringBuilder sb) {
sb.append("/_cat/ml/data_frame/analytics\n");
sb.append("/_cat/ml/data_frame/analytics/{").append(DataFrameAnalyticsConfig.ID.getPreferredName()).append("}\n");
}
@Override
protected Table getTableWithHeader(RestRequest unused) {
return getTableWithHeader();
}
private static Table getTableWithHeader() {
return new Table()
.startHeaders()
// DFA config info
.addCell("id", TableColumnAttributeBuilder.builder("the id").build())
.addCell("type",
TableColumnAttributeBuilder.builder("analysis type")
.setAliases("t")
.build())
.addCell("create_time",
TableColumnAttributeBuilder.builder("job creation time")
.setAliases("ct", "createTime")
.build())
.addCell("version",
TableColumnAttributeBuilder.builder("the version of Elasticsearch when the analytics was created", false)
.setAliases("v")
.build())
.addCell("source_index",
TableColumnAttributeBuilder.builder("source index", false)
.setAliases("si", "sourceIndex")
.build())
.addCell("dest_index",
TableColumnAttributeBuilder.builder("destination index", false)
.setAliases("di", "destIndex")
.build())
.addCell("description",
TableColumnAttributeBuilder.builder("description", false)
.setAliases("d")
.build())
.addCell("model_memory_limit",
TableColumnAttributeBuilder.builder("model memory limit", false)
.setAliases("mml", "modelMemoryLimit")
.build())
// DFA stats info
.addCell("state",
TableColumnAttributeBuilder.builder("job state")
.setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build())
.addCell("failure_reason",
TableColumnAttributeBuilder.builder("failure reason", false)
.setAliases("fr", "failureReason")
.build())
.addCell("progress",
TableColumnAttributeBuilder.builder("progress", false)
.setAliases("p")
.build())
.addCell("assignment_explanation",
TableColumnAttributeBuilder.builder("why the job is or is not assigned to a node", false)
.setAliases("ae", "assignmentExplanation")
.build())
// Node info
.addCell("node.id",
TableColumnAttributeBuilder.builder("id of the assigned node", false)
.setAliases("ni", "nodeId")
.build())
.addCell("node.name",
TableColumnAttributeBuilder.builder("name of the assigned node", false)
.setAliases("nn", "nodeName")
.build())
.addCell("node.ephemeral_id",
TableColumnAttributeBuilder.builder("ephemeral id of the assigned node", false)
.setAliases("ne", "nodeEphemeralId")
.build())
.addCell("node.address",
TableColumnAttributeBuilder.builder("network address of the assigned node", false)
.setAliases("na", "nodeAddress")
.build())
.endHeaders();
}
private static Table buildTable(GetDataFrameAnalyticsAction.Response getResponse,
GetDataFrameAnalyticsStatsAction.Response getStatsResponse) {
Map<String, Stats> statsById = getStatsResponse.getResponse().results().stream().collect(toMap(Stats::getId, Function.identity()));
Table table = getTableWithHeader();
for (DataFrameAnalyticsConfig config : getResponse.getResources().results()) {
Stats stats = statsById.get(config.getId());
DiscoveryNode node = stats == null ? null : stats.getNode();
table
.startRow()
.addCell(config.getId())
.addCell(config.getAnalysis().getWriteableName())
.addCell(config.getCreateTime())
.addCell(config.getVersion())
.addCell(String.join(",", config.getSource().getIndex()))
.addCell(config.getDest().getIndex())
.addCell(config.getDescription())
.addCell(config.getModelMemoryLimit())
.addCell(stats == null ? null : stats.getState())
.addCell(stats == null ? null : stats.getFailureReason())
.addCell(stats == null ? null : progressToString(stats.getProgress()))
.addCell(stats == null ? null : stats.getAssignmentExplanation())
.addCell(node == null ? null : node.getId())
.addCell(node == null ? null : node.getName())
.addCell(node == null ? null : node.getEphemeralId())
.addCell(node == null ? null : node.getAddress().toString())
.endRow();
}
return table;
}
private static String progressToString(List<PhaseProgress> phases) {
return phases.stream().map(p -> p.getPhase() + ":" + p.getProgressPercent()).collect(joining(","));
}
}

View File

@ -70,12 +70,12 @@ public class RestCatDatafeedsAction extends AbstractCatAction {
table.startHeaders(); table.startHeaders();
// Datafeed Info // Datafeed Info
table.addCell("id", TableColumnAttributeBuilder.builder().setDescription("the datafeed_id").build()); table.addCell("id", TableColumnAttributeBuilder.builder("the datafeed_id").build());
table.addCell("state", TableColumnAttributeBuilder.builder() table.addCell("state",
.setDescription("the datafeed state") TableColumnAttributeBuilder.builder("the datafeed state")
.setAliases("s") .setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT) .setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build()); .build());
table.addCell("assignment_explanation", table.addCell("assignment_explanation",
TableColumnAttributeBuilder.builder("why the datafeed is or is not assigned to a node", false) TableColumnAttributeBuilder.builder("why the datafeed is or is not assigned to a node", false)
.setAliases("ae") .setAliases("ae")

View File

@ -75,17 +75,15 @@ public class RestCatJobsAction extends AbstractCatAction {
table.startHeaders(); table.startHeaders();
// Job Info // Job Info
table.addCell("id", TableColumnAttributeBuilder.builder().setDescription("the job_id").build()); table.addCell("id", TableColumnAttributeBuilder.builder("the job_id").build());
table.addCell("state", TableColumnAttributeBuilder.builder() table.addCell("state",
.setDescription("the job state") TableColumnAttributeBuilder.builder("the job state")
.setAliases("s") .setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT) .setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build()); .build());
table.addCell("opened_time", table.addCell("opened_time",
TableColumnAttributeBuilder.builder() TableColumnAttributeBuilder.builder("the amount of time the job has been opened", false)
.setDescription("the amount of time the job has been opened")
.setAliases("ot") .setAliases("ot")
.setDisplayByDefault(false)
.build()); .build());
table.addCell("assignment_explanation", table.addCell("assignment_explanation",
TableColumnAttributeBuilder.builder("why the job is or is not assigned to a node", false) TableColumnAttributeBuilder.builder("why the job is or is not assigned to a node", false)

View File

@ -130,17 +130,15 @@ public class RestCatTrainedModelsAction extends AbstractCatAction {
table.startHeaders(); table.startHeaders();
// Trained Model Info // Trained Model Info
table.addCell("id", TableColumnAttributeBuilder.builder().setDescription("the trained model id").build()); table.addCell("id", TableColumnAttributeBuilder.builder("the trained model id").build());
table.addCell("created_by", TableColumnAttributeBuilder.builder("who created the model", false) table.addCell("created_by", TableColumnAttributeBuilder.builder("who created the model", false)
.setAliases("c", "createdBy") .setAliases("c", "createdBy")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT) .setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build()); .build());
table.addCell("heap_size", TableColumnAttributeBuilder.builder() table.addCell("heap_size", TableColumnAttributeBuilder.builder("the estimated heap size to keep the model in memory")
.setDescription("the estimated heap size to keep the model in memory")
.setAliases("hs","modelHeapSize") .setAliases("hs","modelHeapSize")
.build()); .build());
table.addCell("operations", TableColumnAttributeBuilder.builder() table.addCell("operations", TableColumnAttributeBuilder.builder("the estimated number of operations to use the model")
.setDescription("the estimated number of operations to use the model")
.setAliases("o", "modelOperations") .setAliases("o", "modelOperations")
.build()); .build());
table.addCell("license", TableColumnAttributeBuilder.builder("The license level of the model", false) table.addCell("license", TableColumnAttributeBuilder.builder("The license level of the model", false)

View File

@ -0,0 +1,89 @@
{
"cat.ml_data_frame_analytics":{
"documentation":{
"url":"http://www.elastic.co/guide/en/elasticsearch/reference/current/get-dfanalytics-stats.html"
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_cat/ml/data_frame/analytics",
"methods":[
"GET"
]
},
{
"path":"/_cat/ml/data_frame/analytics/{id}",
"methods":[
"GET"
],
"parts":{
"id":{
"type":"string",
"description":"The ID of the data frame analytics to fetch"
}
}
}
]
},
"params":{
"allow_no_match":{
"type":"boolean",
"required":false,
"description":"Whether to ignore if a wildcard expression matches no configs. (This includes `_all` string or when no configs have been specified)"
},
"bytes":{
"type":"enum",
"description":"The unit in which to display byte values",
"options":[
"b",
"k",
"kb",
"m",
"mb",
"g",
"gb",
"t",
"tb",
"p",
"pb"
]
},
"format":{
"type":"string",
"description":"a short version of the Accept header, e.g. json, yaml"
},
"h":{
"type":"list",
"description":"Comma-separated list of column names to display"
},
"help":{
"type":"boolean",
"description":"Return help information",
"default":false
},
"s":{
"type":"list",
"description":"Comma-separated list of column names or column aliases to sort by"
},
"time":{
"type":"enum",
"description":"The unit in which to display time values",
"options":[
"d (Days)",
"h (Hours)",
"m (Minutes)",
"s (Seconds)",
"ms (Milliseconds)",
"micros (Microseconds)",
"nanos (Nanoseconds)"
]
},
"v":{
"type":"boolean",
"description":"Verbose mode. Display column headers",
"default":false
}
}
}
}

View File

@ -0,0 +1,86 @@
setup:
- skip:
features: headers
- do:
indices.create:
index: index-source
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.put_data_frame_analytics:
id: dfa-outlier-detection-job
body: >
{
"source": { "index": "index-source" },
"dest": { "index": "index-dest-od" },
"analysis": {"outlier_detection": {}}
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.put_data_frame_analytics:
id: dfa-regression-job
body: >
{
"source": { "index": "index-source" },
"dest": { "index": "index-dest-r" },
"analysis": { "regression": { "dependent_variable": "dep_var" } }
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.put_data_frame_analytics:
id: dfa-classification-job
body: >
{
"source": { "index": "index-source" },
"dest": { "index": "index-dest-c" },
"analysis": { "classification": { "dependent_variable": "dep_var" } }
}
---
"Test cat data frame analytics single job":
- do:
cat.ml_data_frame_analytics:
id: dfa-outlier-detection-job
- match:
$body: |
/ #id type create_time state
^ (dfa\-outlier\-detection\-job \s+ outlier_detection \s+ [^\s]+ \s+ stopped \n)+ $/
---
"Test cat data frame analytics single job with header":
- do:
cat.ml_data_frame_analytics:
id: dfa-outlier-detection-job
v: true
- match:
$body: |
/^ id \s+ type \s+ create_time \s+ state \n
(dfa\-outlier\-detection\-job \s+ outlier_detection \s+ [^\s]+ \s+ stopped \n)+ $/
---
"Test cat data frame analytics all jobs with header":
- do:
cat.ml_data_frame_analytics:
v: true
- match:
$body: |
/^ id \s+ type \s+ create_time \s+ state \n
(dfa\-classification\-job \s+ classification \s+ [^\s]+ \s+ stopped \n)+
(dfa\-outlier\-detection\-job \s+ outlier_detection \s+ [^\s]+ \s+ stopped \n)+
(dfa\-regression\-job \s+ regression \s+ [^\s]+ \s+ stopped \n)+ $/
---
"Test cat data frame analytics all jobs with header and column selection":
- do:
cat.ml_data_frame_analytics:
v: true
h: id,t,s,p,source_index,dest_index
- match:
$body: |
/^ id \s+ t \s+ s \s+ p \s+ source_index \s+ dest_index \n
(dfa\-classification\-job \s+ classification \s+ stopped \s+ reindexing:0,loading_data:0,analyzing:0,writing_results:0 \s+ index-source \s+ index-dest-c \n)+
(dfa\-outlier\-detection\-job \s+ outlier_detection \s+ stopped \s+ reindexing:0,loading_data:0,analyzing:0,writing_results:0 \s+ index-source \s+ index-dest-od \n)+
(dfa\-regression\-job \s+ regression \s+ stopped \s+ reindexing:0,loading_data:0,analyzing:0,writing_results:0 \s+ index-source \s+ index-dest-r \n)+ $/