Formalize usage stats for analytics (backport of #52966) (#53077)

This moves the usage statistics gathering from the `AnalyticsPlugin`
into an `AnalyicsUsage`, removing the static state. It also checks the
license level when parsing all analytics aggregations. This is how we
were checking them before but we did it in an easy to forget way. This
way is slightly simpler, I think.
This commit is contained in:
Nik Everett 2020-03-04 10:29:11 -05:00 committed by GitHub
parent 801e50203e
commit 609c61f75c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 183 additions and 73 deletions

View File

@ -7,16 +7,27 @@ package org.elasticsearch.xpack.analytics;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
@ -28,6 +39,7 @@ import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuild
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics;
import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder;
import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregatorFactory;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
@ -37,15 +49,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.singletonList;
public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugin, MapperPlugin {
// TODO this should probably become more structured
public static AtomicLong cumulativeCardUsage = new AtomicLong(0);
public static AtomicLong topMetricsUsage = new AtomicLong(0);
private final AnalyticsUsage usage = new AnalyticsUsage();
private final boolean transportClientMode;
public AnalyticsPlugin(Settings settings) {
@ -61,7 +69,8 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
CumulativeCardinalityPipelineAggregationBuilder.PARSER)
usage.track(AnalyticsUsage.Item.CUMULATIVE_CARDINALITY,
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER)))
);
}
@ -71,16 +80,17 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
new AggregationSpec(
StringStatsAggregationBuilder.NAME,
StringStatsAggregationBuilder::new,
StringStatsAggregationBuilder.PARSER).addResultReader(InternalStringStats::new),
usage.track(AnalyticsUsage.Item.STRING_STATS, checkLicense(StringStatsAggregationBuilder.PARSER)))
.addResultReader(InternalStringStats::new),
new AggregationSpec(
BoxplotAggregationBuilder.NAME,
BoxplotAggregationBuilder::new,
BoxplotAggregationBuilder.PARSER)
usage.track(AnalyticsUsage.Item.BOXPLOT, checkLicense(BoxplotAggregationBuilder.PARSER)))
.addResultReader(InternalBoxplot::new),
new AggregationSpec(
TopMetricsAggregationBuilder.NAME,
TopMetricsAggregationBuilder::new,
track(TopMetricsAggregationBuilder.PARSER, topMetricsUsage))
usage.track(AnalyticsUsage.Item.TOP_METRICS, checkLicense(TopMetricsAggregationBuilder.PARSER)))
.addResultReader(InternalTopMetrics::new)
);
}
@ -113,15 +123,20 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
return Collections.singletonMap(HistogramFieldMapper.CONTENT_TYPE, new HistogramFieldMapper.TypeParser());
}
/**
* Track successful parsing.
*/
private static <T> ContextParser<String, T> track(ContextParser<String, T> realParser, AtomicLong usage) {
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver) {
return singletonList(new AnalyticsUsage());
}
private static <T> ContextParser<String, T> checkLicense(ContextParser<String, T> realParser) {
return (parser, name) -> {
T value = realParser.parse(parser, name);
// Intentionally doesn't count unless the parser returns cleanly.
usage.addAndGet(1);
return value;
if (getLicenseState().isAnalyticsAllowed() == false) {
throw LicenseUtils.newComplianceException(XPackField.ANALYTICS);
}
return realParser.parse(parser, name);
};
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* Tracks usage of the Analytics aggregations.
*/
public class AnalyticsUsage {
/**
* Items to track.
*/
public enum Item {
BOXPLOT,
CUMULATIVE_CARDINALITY,
STRING_STATS,
TOP_METRICS;
}
private final Map<Item, AtomicLong> trackers = new EnumMap<>(Item.class);
public AnalyticsUsage() {
for (Item item: Item.values()) {
trackers.put(item, new AtomicLong(0));
}
}
/**
* Track successful parsing.
*/
public <C, T> ContextParser<C, T> track(Item item, ContextParser<C, T> realParser) {
AtomicLong usage = trackers.get(item);
return (parser, context) -> {
T value = realParser.parse(parser, context);
// Intentionally doesn't count unless the parser returns cleanly.
usage.incrementAndGet();
return value;
};
}
public AnalyticsStatsAction.NodeResponse stats(DiscoveryNode node) {
return new AnalyticsStatsAction.NodeResponse(node,
trackers.get(Item.BOXPLOT).get(),
trackers.get(Item.CUMULATIVE_CARDINALITY).get(),
trackers.get(Item.STRING_STATS).get(),
trackers.get(Item.TOP_METRICS).get());
}
}

View File

@ -13,22 +13,23 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.analytics.AnalyticsUsage;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import java.io.IOException;
import java.util.List;
public class TransportAnalyticsStatsAction extends TransportNodesAction<AnalyticsStatsAction.Request, AnalyticsStatsAction.Response,
AnalyticsStatsAction.NodeRequest, AnalyticsStatsAction.NodeResponse> {
AnalyticsStatsAction.NodeRequest, AnalyticsStatsAction.NodeResponse> {
private final AnalyticsUsage usage;
@Inject
public TransportAnalyticsStatsAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
ThreadPool threadPool, ActionFilters actionFilters, AnalyticsUsage usage) {
super(AnalyticsStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
AnalyticsStatsAction.Request::new, AnalyticsStatsAction.NodeRequest::new, ThreadPool.Names.MANAGEMENT,
AnalyticsStatsAction.NodeResponse.class);
this.usage = usage;
}
@Override
@ -50,10 +51,7 @@ public class TransportAnalyticsStatsAction extends TransportNodesAction<Analytic
@Override
protected AnalyticsStatsAction.NodeResponse nodeOperation(AnalyticsStatsAction.NodeRequest request) {
AnalyticsStatsAction.NodeResponse statsResponse = new AnalyticsStatsAction.NodeResponse(clusterService.localNode());
statsResponse.setCumulativeCardinalityUsage(AnalyticsPlugin.cumulativeCardUsage.get());
statsResponse.setTopMetricsUsage(AnalyticsPlugin.topMetricsUsage.get());
return statsResponse;
return usage.stats(clusterService.localNode());
}
}

View File

@ -9,7 +9,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
@ -17,8 +16,6 @@ import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.core.XPackField;
import java.io.IOException;
import java.util.Collection;
@ -35,14 +32,6 @@ public class CumulativeCardinalityPipelineAggregationBuilder
public static final ConstructingObjectParser<CumulativeCardinalityPipelineAggregationBuilder, String> PARSER =
new ConstructingObjectParser<>(NAME, false, (args, name) -> {
if (AnalyticsPlugin.getLicenseState().isAnalyticsAllowed() == false) {
throw LicenseUtils.newComplianceException(XPackField.ANALYTICS);
}
// Increment usage here since it is a good boundary between internal and external, and should correlate 1:1 with
// usage and not internal instantiations
AnalyticsPlugin.cumulativeCardUsage.incrementAndGet();
return new CumulativeCardinalityPipelineAggregationBuilder(name, (String) args[0]);
});
static {

View File

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -20,58 +21,73 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.analytics.AnalyticsUsage;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TransportAnalyticsStatsActionTests extends ESTestCase {
private TransportAnalyticsStatsAction action;
@Before
public void setupTransportAction() {
public TransportAnalyticsStatsAction action(AnalyticsUsage usage) {
TransportService transportService = mock(TransportService.class);
ThreadPool threadPool = mock(ThreadPool.class);
ClusterService clusterService = mock(ClusterService.class);
DiscoveryNode discoveryNode = new DiscoveryNode("nodeId", buildNewFakeTransportAddress(), Version.CURRENT);
when(clusterService.localNode()).thenReturn(discoveryNode);
ClusterName clusterName = new ClusterName("cluster_name");
when(clusterService.getClusterName()).thenReturn(clusterName);
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(clusterState);
action = new TransportAnalyticsStatsAction(transportService, clusterService, threadPool, new
ActionFilters(Collections.emptySet()));
return new TransportAnalyticsStatsAction(transportService, clusterService, threadPool,
new ActionFilters(Collections.emptySet()), usage);
}
public void testCumulativeCardStats() throws Exception {
AnalyticsStatsAction.Request request = new AnalyticsStatsAction.Request();
AnalyticsStatsAction.NodeResponse nodeResponse1 = action.nodeOperation(new AnalyticsStatsAction.NodeRequest(request));
AnalyticsStatsAction.NodeResponse nodeResponse2 = action.nodeOperation(new AnalyticsStatsAction.NodeRequest(request));
public void test() throws IOException {
AnalyticsUsage.Item item = randomFrom(AnalyticsUsage.Item.values());
AnalyticsUsage realUsage = new AnalyticsUsage();
AnalyticsUsage emptyUsage = new AnalyticsUsage();
ContextParser<Void, Void> parser = realUsage.track(item, (p, c) -> c);
ObjectPath unused = run(realUsage, emptyUsage);
assertThat(unused.evaluate("stats.0." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
assertThat(unused.evaluate("stats.1." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
int count = between(1, 10000);
for (int i = 0; i < count; i++) {
assertNull(parser.parse(null, null));
}
ObjectPath used = run(realUsage, emptyUsage);
assertThat(used.evaluate("stats.0." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(count));
assertThat(used.evaluate("stats.1." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
}
AnalyticsStatsAction.Response response = action.newResponse(request,
Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList());
private ObjectPath run(AnalyticsUsage... nodeUsages) throws IOException {
AnalyticsStatsAction.Request request = new AnalyticsStatsAction.Request();
List<AnalyticsStatsAction.NodeResponse> nodeResponses = Arrays.stream(nodeUsages)
.map(usage -> action(usage).nodeOperation(new AnalyticsStatsAction.NodeRequest(request)))
.collect(toList());
AnalyticsStatsAction.Response response = new AnalyticsStatsAction.Response(
new ClusterName("cluster_name"), nodeResponses, emptyList());
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
ObjectPath objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
assertThat(objectPath.evaluate("stats.0.cumulative_cardinality_usage"), equalTo(0));
assertThat(objectPath.evaluate("stats.1.cumulative_cardinality_usage"), equalTo(0));
return ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
}
}
}

View File

@ -6,19 +6,18 @@
package org.elasticsearch.xpack.analytics.boxplot;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.hasSize;
public class BoxplotAggregationBuilderTests extends AbstractSerializingTestCase<BoxplotAggregationBuilder> {
@ -31,8 +30,10 @@ public class BoxplotAggregationBuilderTests extends AbstractSerializingTestCase<
@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.singletonList(new AnalyticsPlugin(Settings.EMPTY)));
return new NamedXContentRegistry(searchModule.getNamedXContents());
return new NamedXContentRegistry(singletonList(new NamedXContentRegistry.Entry(
BaseAggregationBuilder.class,
new ParseField(BoxplotAggregationBuilder.NAME),
(p, n) -> BoxplotAggregationBuilder.PARSER.apply(p, (String) n))));
}
@Override

View File

@ -110,48 +110,80 @@ public class AnalyticsStatsAction extends ActionType<AnalyticsStatsAction.Respon
}
public static class NodeResponse extends BaseNodeResponse implements ToXContentObject {
static final ParseField BOXPLOT_USAGE = new ParseField("boxplot_usage");
static final ParseField CUMULATIVE_CARDINALITY_USAGE = new ParseField("cumulative_cardinality_usage");
static final ParseField STRING_STATS_USAGE = new ParseField("string_stats_usage");
static final ParseField TOP_METRICS_USAGE = new ParseField("top_metrics_usage");
private long cumulativeCardinalityUsage;
private long topMetricsUsage;
private final long boxplotUsage;
private final long cumulativeCardinalityUsage;
private final long stringStatsUsage;
private final long topMetricsUsage;
public NodeResponse(DiscoveryNode node) {
public NodeResponse(DiscoveryNode node, long boxplotUsage, long cumulativeCardinalityUsage, long stringStatsUsage,
long topMetricsUsage) {
super(node);
this.boxplotUsage = boxplotUsage;
this.cumulativeCardinalityUsage = cumulativeCardinalityUsage;
this.stringStatsUsage = stringStatsUsage;
this.topMetricsUsage = topMetricsUsage;
}
public NodeResponse(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
boxplotUsage = in.readVLong();
} else {
boxplotUsage = 0;
}
cumulativeCardinalityUsage = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
stringStatsUsage = in.readVLong();
topMetricsUsage = in.readVLong();
} else {
topMetricsUsage = 0;
stringStatsUsage = 0;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(cumulativeCardinalityUsage);
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeVLong(boxplotUsage);
}
out.writeVLong(cumulativeCardinalityUsage);
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeVLong(stringStatsUsage);
out.writeVLong(topMetricsUsage);
}
}
public void setCumulativeCardinalityUsage(long cumulativeCardinalityUsage) {
this.cumulativeCardinalityUsage = cumulativeCardinalityUsage;
}
public void setTopMetricsUsage(long topMetricsUsage) {
this.topMetricsUsage = topMetricsUsage;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(BOXPLOT_USAGE.getPreferredName(), boxplotUsage);
builder.field(CUMULATIVE_CARDINALITY_USAGE.getPreferredName(), cumulativeCardinalityUsage);
builder.field(STRING_STATS_USAGE.getPreferredName(), stringStatsUsage);
builder.field(TOP_METRICS_USAGE.getPreferredName(), topMetricsUsage);
builder.endObject();
return builder;
}
public long getBoxplotUsage() {
return boxplotUsage;
}
public long getCumulativeCardinalityUsage() {
return cumulativeCardinalityUsage;
}
public long getStringStatsUsage() {
return stringStatsUsage;
}
public long getTopMetricsUsage() {
return topMetricsUsage;
}
}
}