Create GET _cat/transforms API Issue (#53643) (#53726)

Adds new` _cat/transform` and `_cat/transform/{transform_id}` endpoints.
This commit is contained in:
Benjamin Trent 2020-03-18 10:45:28 -04:00 committed by GitHub
parent 580bc40c0c
commit 2ccb963f1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 487 additions and 1 deletions

View File

@ -3,8 +3,8 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.cat;
package org.elasticsearch.xpack.core.common.table;
import org.elasticsearch.common.Strings;

View File

@ -10,6 +10,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestActionListener;

View File

@ -9,6 +9,7 @@ import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;

View File

@ -10,6 +10,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestRequest;

View File

@ -12,6 +12,7 @@ import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestRequest;

View File

@ -0,0 +1,82 @@
{
"transform.cat_transform":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-transform.html"
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_cat/transform",
"methods":[
"GET"
]
},
{
"path":"/_cat/transform/{transform_id}",
"methods":[
"GET"
],
"parts":{
"transform_id":{
"type":"string",
"description":"The id of the transform for which to get stats. '_all' or '*' implies all transforms"
}
}
}
]
},
"params":{
"from":{
"type":"int",
"required":false,
"description":"skips a number of transform configs, defaults to 0"
},
"size":{
"type":"int",
"required":false,
"description":"specifies a max number of transforms to get, defaults to 100"
},
"allow_no_match":{
"type":"boolean",
"required":false,
"description":"Whether to ignore if a wildcard expression matches no transforms. (This includes `_all` string or when no transforms have been specified)"
},
"format":{
"type":"string",
"description":"a short version of the Accept header, e.g. json, yaml"
},
"h":{
"type":"list",
"description":"Comma-separated list of column names to display"
},
"help":{
"type":"boolean",
"description":"Return help information",
"default":false
},
"s":{
"type":"list",
"description":"Comma-separated list of column names or column aliases to sort by"
},
"time":{
"type":"enum",
"description":"The unit in which to display time values",
"options":[
"d (Days)",
"h (Hours)",
"m (Minutes)",
"s (Seconds)",
"ms (Milliseconds)",
"micros (Microseconds)",
"nanos (Nanoseconds)"
]
},
"v":{
"type":"boolean",
"description":"Verbose mode. Display column headers",
"default":false
}
}
}
}

View File

@ -0,0 +1,138 @@
setup:
- skip:
features: headers
- do:
indices.create:
index: airline-data
body:
mappings:
properties:
time:
type: date
airline:
type: keyword
responsetime:
type: float
event_rate:
type: integer
- do:
indices.create:
index: airline-data-other
body:
mappings:
properties:
time:
type: date
airline:
type: keyword
responsetime:
type: float
event_rate:
type: integer
- do:
transform.put_transform:
transform_id: "airline-transform-stats"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
---
teardown:
- do:
transform.delete_transform:
transform_id: "airline-transform-stats"
---
"Test cat transform stats hiding headers":
- do:
transform.cat_transform:
transform_id: "airline-transform-stats"
- match:
$body: |
/^ #id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
(airline\-transform\-stats \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ \s+ batch \s+ 1m \s+ 500 \s+ STOPPED \n)+ $/
---
"Test cat transform stats with column selection":
- do:
transform.cat_transform:
transform_id: "airline-transform-stats"
v: true
h: id,version,source_index,dest_index,search_total,index_total,dt,cdtea,indexed_documents_exp_avg
- match:
$body: |
/^ id \s+ version \s+ source_index \s+ dest_index \s+ search_total \s+ index_total \s+ dt \s+ cdtea \s+ indexed_documents_exp_avg \n
(airline\-transform-stats \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ 0 \s+ 0 \s+ 0 \s+ 0.0 \s+ 0.0 \n)+ $/
---
"Test cat transform stats with batch transform":
- do:
transform.put_transform:
transform_id: "airline-transform-batch"
body: >
{
"source": {
"index": ["airline-data", "airline-data-other"],
"query": {"bool":{"filter":{"term":{"airline":"foo"}}}}
},
"dest": { "index": "airline-data-by-airline-batch" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "description"
}
- do:
transform.cat_transform:
transform_id: "airline-transform-batch"
v: true
- match:
$body: |
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
(airline\-transform\-batch \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+ \s+ description \s+ batch \s+ 1m \s+ 500 \s+ STOPPED \n)+ $/
- do:
transform.delete_transform:
transform_id: "airline-transform-batch"
---
"Test cat transform stats with continuous transform":
- do:
transform.put_transform:
transform_id: "airline-transform-continuous"
body: >
{
"source": {
"index": ["airline-data", "airline-data-other"],
"query": {"bool":{"filter":{"term":{"airline":"foo"}}}}
},
"dest": { "index": "airline-data-by-airline-continuous" },
"frequency": "10s",
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "description",
"sync": {
"time": {
"field": "time"
}
}
}
- do:
transform.cat_transform:
transform_id: "airline-transform-continuous"
v: true
- match:
$body: |
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
(airline\-transform\-continuous \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+ \s+ description \s+ continuous \s+ 10s \s+ 500 \s+ STOPPED \n)+ $/
- do:
transform.delete_transform:
transform_id: "airline-transform-continuous"

View File

@ -87,6 +87,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
import org.elasticsearch.xpack.transform.rest.action.RestCatTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestDeleteTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestGetTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestGetTransformStatsAction;
@ -215,6 +216,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
new RestGetTransformStatsAction(),
new RestPreviewTransformAction(),
new RestUpdateTransformAction(),
new RestCatTransformAction(),
// deprecated endpoints, to be removed for 8.0.0
new RestPutTransformActionDeprecated(),

View File

@ -0,0 +1,260 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.rest.action;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.rest.action.cat.AbstractCatAction;
import org.elasticsearch.rest.action.cat.RestTable;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.xpack.core.transform.TransformField.ALLOW_NO_MATCH;
public class RestCatTransformAction extends AbstractCatAction {
private static final Integer DEFAULT_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
private static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
@Override
public List<Route> routes() {
return unmodifiableList(asList(
new Route(GET, "_cat/transform"),
new Route(GET, "_cat/transform/{" + TransformField.TRANSFORM_ID + "}")));
}
@Override
public String getName() {
return "cat_transform_action";
}
@Override
protected RestChannelConsumer doCatRequest(RestRequest restRequest, NodeClient client) {
String id = restRequest.param(TransformField.TRANSFORM_ID);
if (Strings.isNullOrEmpty(id)) {
id = MetaData.ALL;
}
GetTransformAction.Request request = new GetTransformAction.Request(id);
request.setAllowNoResources(restRequest.paramAsBoolean(ALLOW_NO_MATCH.getPreferredName(), true));
GetTransformStatsAction.Request statsRequest = new GetTransformStatsAction.Request(id);
statsRequest.setAllowNoMatch(restRequest.paramAsBoolean(ALLOW_NO_MATCH.getPreferredName(), true));
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
PageParams pageParams = new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE));
request.setPageParams(pageParams);
statsRequest.setPageParams(pageParams);
}
return channel -> client.execute(GetTransformAction.INSTANCE,
request,
new RestActionListener<GetTransformAction.Response>(channel) {
@Override
public void processResponse(GetTransformAction.Response response) {
client.execute(GetTransformStatsAction.INSTANCE,
statsRequest,
new RestResponseListener<GetTransformStatsAction.Response>(channel) {
@Override
public RestResponse buildResponse(GetTransformStatsAction.Response statsResponse) throws Exception {
return RestTable.buildResponse(buildTable(response, statsResponse), channel);
}
});
}
});
}
@Override
protected void documentation(StringBuilder sb) {
sb.append("/_cat/transform\n");
sb.append("/_cat/transform/{" + TransformField.TRANSFORM_ID + "}\n");
}
@Override
protected Table getTableWithHeader(RestRequest unused) {
return getTableWithHeader();
}
private static Table getTableWithHeader() {
return new Table()
.startHeaders()
// Transform config info
.addCell("id", TableColumnAttributeBuilder.builder("the id").build())
.addCell("create_time",
TableColumnAttributeBuilder.builder("transform creation time")
.setAliases("ct", "createTime")
.build())
.addCell("version",
TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created")
.setAliases("v")
.build())
.addCell("source_index",
TableColumnAttributeBuilder.builder("source index")
.setAliases("si", "sourceIndex")
.build())
.addCell("dest_index",
TableColumnAttributeBuilder.builder("destination index")
.setAliases("di", "destIndex")
.build())
.addCell("pipeline",
TableColumnAttributeBuilder.builder("transform pipeline")
.setAliases("p")
.build())
.addCell("description",
TableColumnAttributeBuilder.builder("description")
.setAliases("d")
.build())
.addCell("transform_type",
TableColumnAttributeBuilder.builder("batch or continuous transform")
.setAliases("tt")
.build())
.addCell("frequency",
TableColumnAttributeBuilder.builder("frequency of transform")
.setAliases("f")
.build())
.addCell("max_page_search_size",
TableColumnAttributeBuilder.builder("max page search size ")
.setAliases("mpsz")
.build())
// Transform stats info
.addCell("state",
TableColumnAttributeBuilder.builder("transform state")
.setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build())
.addCell("reason",
TableColumnAttributeBuilder.builder("reason", false)
.setAliases("r", "reason")
.build())
.addCell("changes_last_detection_time",
TableColumnAttributeBuilder.builder("changes last detected time", false)
.setAliases("cldt")
.build())
.addCell("search_total",
TableColumnAttributeBuilder.builder("total number of searches", false)
.setAliases("st")
.build())
.addCell("search_failure",
TableColumnAttributeBuilder.builder("total number of search failures", false)
.setAliases("sf")
.build())
.addCell("search_time",
TableColumnAttributeBuilder.builder("search time", false)
.setAliases("stime")
.build())
.addCell("index_total",
TableColumnAttributeBuilder.builder("total number of indices", false)
.setAliases("it")
.build())
.addCell("index_failure",
TableColumnAttributeBuilder.builder("total number of index failures", false)
.setAliases("if")
.build())
.addCell("index_time",
TableColumnAttributeBuilder.builder("index time", false)
.setAliases("itime")
.build())
.addCell("document_total",
TableColumnAttributeBuilder.builder("total number of documents", false)
.setAliases("dt")
.build())
.addCell("invocation_total",
TableColumnAttributeBuilder.builder("total number of invocations", false)
.setAliases("itotal")
.build())
.addCell("page_total",
TableColumnAttributeBuilder.builder("total number of pages", false)
.setAliases("pt")
.build())
.addCell("checkpoint_duration_time_exp_avg",
TableColumnAttributeBuilder.builder("exponential average checkpoint processing time (milliseconds)", false)
.setAliases("cdtea", "checkpointTimeExpAvg")
.build())
.addCell("indexed_documents_exp_avg",
TableColumnAttributeBuilder.builder("exponential average number of documents indexed", false)
.setAliases("idea")
.build())
.addCell("processed_documents_exp_avg",
TableColumnAttributeBuilder.builder("exponential average number of documents processed", false)
.setAliases("pdea")
.build())
.endHeaders();
}
private Table buildTable(GetTransformAction.Response response, GetTransformStatsAction.Response statsResponse) {
Table table = getTableWithHeader();
Map<String, TransformStats> statsById = statsResponse.getTransformsStats().stream()
.collect(Collectors.toMap(TransformStats::getId, Function.identity()));
response.getTransformConfigurations().forEach(config -> {
TransformStats stats = statsById.get(config.getId());
TransformCheckpointingInfo checkpointingInfo = null;
TransformIndexerStats transformIndexerStats = null;
if(stats != null) {
checkpointingInfo = stats.getCheckpointingInfo();
transformIndexerStats = stats.getIndexerStats();
}
table
.startRow()
.addCell(config.getId())
.addCell(config.getCreateTime())
.addCell(config.getVersion())
.addCell(String.join(",", config.getSource().getIndex()))
.addCell(config.getDestination().getIndex())
.addCell(config.getDestination().getPipeline())
.addCell(config.getDescription())
.addCell(config.getSyncConfig() == null ? "batch" : "continuous")
.addCell(config.getFrequency() == null ? DEFAULT_TRANSFORM_FREQUENCY : config.getFrequency())
.addCell(config.getPivotConfig() == null || config.getPivotConfig().getMaxPageSearchSize() == null ?
DEFAULT_MAX_PAGE_SEARCH_SIZE : config.getPivotConfig().getMaxPageSearchSize())
.addCell(stats == null ? null : stats.getState())
.addCell(stats == null ? null : stats.getReason())
.addCell(checkpointingInfo == null ? null : checkpointingInfo.getChangesLastDetectedAt())
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getSearchTotal())
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getSearchFailures())
.addCell(transformIndexerStats == null ? null : TimeValue.timeValueMillis(transformIndexerStats.getSearchTime()))
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getIndexTotal())
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getIndexFailures())
.addCell(transformIndexerStats == null ? null : TimeValue.timeValueMillis(transformIndexerStats.getIndexTime()))
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumDocuments())
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumInvocations())
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumPages())
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgCheckpointDurationMs())
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgDocumentsIndexed())
.addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgDocumentsProcessed())
.endRow();
});
return table;
}
}