Add aggregation list to node info (#60074) (#60256)

Adds a full list of supported aggregations to the node info API. This list
will be used in transform tests and telemetry mapping tests that will be added
as follow-up PRs.

Fixes #59774
This commit is contained in:
Igor Motov 2020-07-28 14:06:12 -04:00 committed by GitHub
parent c7bfb5de41
commit 0dd53b76bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 188 additions and 8 deletions

View File

@ -0,0 +1,18 @@
---
"node_info test aggregations":
- skip:
version: " - 7.9.99"
reason: "aggregation info only supported in 8.0.0+"
features: [arbitrary_key]
- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id
- do:
nodes.info:
metric: [ aggregations ]
- match : { nodes.$node_id.aggregations.filter: { "types": ["other"] } }

View File

@ -34,6 +34,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;
@ -82,12 +83,15 @@ public class NodeInfo extends BaseNodeResponse {
addInfoIfNonNull(HttpInfo.class, in.readOptionalWriteable(HttpInfo::new));
addInfoIfNonNull(PluginsAndModules.class, in.readOptionalWriteable(PluginsAndModules::new));
addInfoIfNonNull(IngestInfo.class, in.readOptionalWriteable(IngestInfo::new));
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
addInfoIfNonNull(AggregationInfo.class, in.readOptionalWriteable(AggregationInfo::new));
}
}
public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Settings settings,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool,
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins,
@Nullable IngestInfo ingest, @Nullable ByteSizeValue totalIndexingBuffer) {
@Nullable IngestInfo ingest, @Nullable AggregationInfo aggsInfo, @Nullable ByteSizeValue totalIndexingBuffer) {
super(node);
this.version = version;
this.build = build;
@ -100,6 +104,7 @@ public class NodeInfo extends BaseNodeResponse {
addInfoIfNonNull(HttpInfo.class, http);
addInfoIfNonNull(PluginsAndModules.class, plugins);
addInfoIfNonNull(IngestInfo.class, ingest);
addInfoIfNonNull(AggregationInfo.class, aggsInfo);
this.totalIndexingBuffer = totalIndexingBuffer;
}
@ -187,5 +192,8 @@ public class NodeInfo extends BaseNodeResponse {
out.writeOptionalWriteable(getInfo(HttpInfo.class));
out.writeOptionalWriteable(getInfo(PluginsAndModules.class));
out.writeOptionalWriteable(getInfo(IngestInfo.class));
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
out.writeOptionalWriteable(getInfo(AggregationInfo.class));
}
}
}

View File

@ -183,6 +183,7 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
HTTP("http"),
PLUGINS("plugins"),
INGEST("ingest"),
AGGREGATIONS("aggregations"),
INDICES("indices");
private String metricName;

View File

@ -35,6 +35,7 @@ import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;
@ -126,6 +127,9 @@ public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements To
if (nodeInfo.getInfo(IngestInfo.class) != null) {
nodeInfo.getInfo(IngestInfo.class).toXContent(builder, params);
}
if (nodeInfo.getInfo(AggregationInfo.class) != null) {
nodeInfo.getInfo(AggregationInfo.class).toXContent(builder, params);
}
builder.endObject();
}

View File

@ -80,6 +80,7 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
metrics.contains(NodesInfoRequest.Metric.HTTP.metricName()),
metrics.contains(NodesInfoRequest.Metric.PLUGINS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INGEST.metricName()),
metrics.contains(NodesInfoRequest.Metric.AGGREGATIONS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INDICES.metricName()));
}

View File

@ -95,7 +95,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, true, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();

View File

@ -585,7 +585,7 @@ public class Node implements Closeable {
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService, indexingLimits);
searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService());
final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),

View File

@ -39,6 +39,7 @@ import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -61,6 +62,7 @@ public class NodeService implements Closeable {
private final ResponseCollectorService responseCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressure indexingPressure;
private final AggregationUsageService aggregationUsageService;
private final Discovery discovery;
@ -69,7 +71,8 @@ public class NodeService implements Closeable {
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService,
SearchTransportService searchTransportService, IndexingPressure indexingPressure) {
SearchTransportService searchTransportService, IndexingPressure indexingPressure,
AggregationUsageService aggregationUsageService) {
this.settings = settings;
this.threadPool = threadPool;
this.monitorService = monitorService;
@ -85,11 +88,12 @@ public class NodeService implements Closeable {
this.responseCollectorService = responseCollectorService;
this.searchTransportService = searchTransportService;
this.indexingPressure = indexingPressure;
this.aggregationUsageService = aggregationUsageService;
clusterService.addStateApplier(ingestService);
}
public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean transport, boolean http, boolean plugin, boolean ingest, boolean indices) {
boolean transport, boolean http, boolean plugin, boolean ingest, boolean aggs, boolean indices) {
return new NodeInfo(Version.CURRENT, Build.CURRENT, transportService.getLocalNode(),
settings ? settingsFilter.filter(this.settings) : null,
os ? monitorService.osService().info() : null,
@ -100,6 +104,7 @@ public class NodeService implements Closeable {
http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null,
plugin ? (pluginService == null ? null : pluginService.info()) : null,
ingest ? (ingestService == null ? null : ingestService.info()) : null,
aggs ? (aggregationUsageService == null ? null : aggregationUsageService.info()) : null,
indices ? indicesService.getTotalIndexingBufferBytes() : null
);
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.support;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.ReportingService;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.LongAdder;
public class AggregationInfo implements ReportingService.Info {
private final Map<String, Set<String>> aggs;
AggregationInfo(Map<String, Map<String, LongAdder>> aggs) {
// we use a treemap/treeset here to have a test-able / predictable order
Map<String, Set<String>> aggsMap = new TreeMap<>();
aggs.forEach((s, m) -> aggsMap.put(s, Collections.unmodifiableSet(new TreeSet<>(m.keySet()))));
this.aggs = Collections.unmodifiableMap(aggsMap);
}
/**
* Read from a stream.
*/
public AggregationInfo(StreamInput in) throws IOException {
aggs = new TreeMap<>();
final int size = in.readVInt();
for (int i = 0; i < size; i++) {
String key = in.readString();
final int keys = in.readVInt();
final Set<String> types = new TreeSet<>();
for (int j = 0; j < keys; j ++) {
types.add(in.readString());
}
aggs.put(key, types);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(aggs.size());
for (Map.Entry<String, Set<String>> e : aggs.entrySet()) {
out.writeString(e.getKey());
out.writeVInt(e.getValue().size());
for (String type : e.getValue()) {
out.writeString(type);
}
}
}
public Map<String, Set<String>> getAggregations() {
return aggs;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("aggregations");
for (Map.Entry<String, Set<String>> e : aggs.entrySet()) {
builder.startObject(e.getKey());
builder.startArray("types");
for (String s : e.getValue()) {
builder.value(s);
}
builder.endArray();
builder.endObject();
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AggregationInfo that = (AggregationInfo) o;
return Objects.equals(aggs, that.aggs);
}
@Override
public int hashCode() {
return Objects.hash(aggs);
}
}

View File

@ -19,12 +19,15 @@
package org.elasticsearch.search.aggregations.support;
import org.elasticsearch.node.ReportingService;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
public class AggregationUsageService {
public class AggregationUsageService implements ReportingService<AggregationInfo> {
private final Map<String, Map<String, LongAdder>> aggs;
private final AggregationInfo info;
public static final String OTHER_SUBTYPE = "other";
@ -54,6 +57,7 @@ public class AggregationUsageService {
private AggregationUsageService(Builder builder) {
this.aggs = builder.aggs;
info = new AggregationInfo(aggs);
}
public void incAggregationUsage(String aggregationName, String valuesSourceType) {
@ -85,4 +89,9 @@ public class AggregationUsageService {
});
return aggsUsageMap;
}
@Override
public AggregationInfo info() {
return info;
}
}

View File

@ -57,6 +57,7 @@ public class NodeInfoTests extends ESTestCase {
null,
null,
null,
null,
null);
// OsInfo is absent

View File

@ -128,6 +128,6 @@ public class ClusterStatsNodesTests extends ESTestCase {
}
return new NodeInfo(null, null,
new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), null),
settings.build(), null, null, null, null, null, null, null, null, null);
settings.build(), null, null, null, null, null, null, null, null, null, null);
}
}

View File

@ -39,6 +39,8 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.plugins.PluginInfo;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
@ -168,12 +170,35 @@ public class NodeInfoStreamingTests extends ESTestCase {
ingestInfo = new IngestInfo(processors);
}
AggregationInfo aggregationInfo = null;
if (randomBoolean()) {
AggregationUsageService.Builder builder = new AggregationUsageService.Builder();
int numOfAggs = randomIntBetween(0, 10);
for (int i = 0; i < numOfAggs; i++) {
String aggName = randomAlphaOfLength(10);
try {
if (randomBoolean()) {
builder.registerAggregationUsage(aggName);
} else {
int numOfTypes = randomIntBetween(1, 10);
for (int j = 0; j < numOfTypes; j++) {
builder.registerAggregationUsage(aggName, randomAlphaOfLength(10));
}
}
} catch (IllegalArgumentException ex) {
// Ignore duplicate strings
}
}
aggregationInfo = builder.build().info();
}
ByteSizeValue indexingBuffer = null;
if (randomBoolean()) {
// pick a random long that sometimes exceeds an int:
indexingBuffer = new ByteSizeValue(random().nextLong() & ((1L<<40)-1));
}
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, settings, osInfo, process, jvm,
threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, indexingBuffer);
threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, aggregationInfo, indexingBuffer);
}
}