diff --git a/distribution/build.gradle b/distribution/build.gradle index d71038f36c7..3055ec82670 100644 --- a/distribution/build.gradle +++ b/distribution/build.gradle @@ -425,6 +425,7 @@ task run(type: RunTask) { setting 'xpack.monitoring.enabled', 'true' setting 'xpack.sql.enabled', 'true' setting 'xpack.rollup.enabled', 'true' + setting 'xpack.data-science.enabled', 'true' keystoreSetting 'bootstrap.password', 'password' } } diff --git a/docs/build.gradle b/docs/build.gradle index a2d13cd0d09..b7e2f81e3d7 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -218,6 +218,42 @@ buildRestTests.setups['sales'] = ''' {"index":{}} {"date": "2015/03/01 00:00:00", "price": 175, "promoted": false, "rating": 2, "type": "t-shirt"}''' +// Used by cumulative cardinality aggregation docs +buildRestTests.setups['user_hits'] = ''' + - do: + indices.create: + index: user_hits + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + user_id: + type: keyword + timestamp: + type: date + - do: + bulk: + index: user_hits + refresh: true + body: | + {"index":{}} + {"timestamp": "2019-01-01T13:00:00", "user_id": "1"} + {"index":{}} + {"timestamp": "2019-01-01T13:00:00", "user_id": "2"} + {"index":{}} + {"timestamp": "2019-01-02T13:00:00", "user_id": "1"} + {"index":{}} + {"timestamp": "2019-01-02T13:00:00", "user_id": "3"} + {"index":{}} + {"timestamp": "2019-01-03T13:00:00", "user_id": "1"} + {"index":{}} + {"timestamp": "2019-01-03T13:00:00", "user_id": "2"} + {"index":{}} + {"timestamp": "2019-01-03T13:00:00", "user_id": "4"}''' + + // Dummy bank account data used by getting-started.asciidoc buildRestTests.setups['bank'] = ''' - do: diff --git a/docs/reference/aggregations/pipeline/cumulative-cardinality-aggregation.asciidoc b/docs/reference/aggregations/pipeline/cumulative-cardinality-aggregation.asciidoc new file mode 100644 index 00000000000..2e316164d64 --- /dev/null +++ b/docs/reference/aggregations/pipeline/cumulative-cardinality-aggregation.asciidoc @@ -0,0 +1,235 @@ +[role="xpack"] +[testenv="basic"] +[[search-aggregations-pipeline-cumulative-cardinality-aggregation]] +=== Cumulative Cardinality Aggregation + +A parent pipeline aggregation which calculates the Cumulative Cardinality in a parent histogram (or date_histogram) +aggregation. The specified metric must be a cardinality aggregation and the enclosing histogram +must have `min_doc_count` set to `0` (default for `histogram` aggregations). + +The `cumulative_cardinality` agg is useful for finding "total new items", like the number of new visitors to your +website each day. A regular cardinality aggregation will tell you how many unique visitors came each day, but doesn't +differentiate between "new" or "repeat" visitors. The Cumulative Cardinality aggregation can be used to determine +how many of each day's unique visitors are "new". + +==== Syntax + +A `cumulative_cardinality` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "cumulative_cardinality": { + "buckets_path": "my_cardinality_agg" + } +} +-------------------------------------------------- +// NOTCONSOLE + +[[cumulative-cardinality-params]] +.`cumulative_cardinality` Parameters +[options="header"] +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |The path to the cardinality aggregation we wish to find the cumulative cardinality for (see <> for more + details) |Required | +|`format` |format to apply to the output value of this aggregation |Optional |`null` +|=== + +The following snippet calculates the cumulative cardinality of the total daily `users`: + +[source,js] +-------------------------------------------------- +GET /user_hits/_search +{ + "size": 0, + "aggs" : { + "users_per_day" : { + "date_histogram" : { + "field" : "timestamp", + "calendar_interval" : "day" + }, + "aggs": { + "distinct_users": { + "cardinality": { + "field": "user_id" + } + }, + "total_new_users": { + "cumulative_cardinality": { + "buckets_path": "distinct_users" <1> + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:user_hits] + +<1> `buckets_path` instructs this aggregation to use the output of the `distinct_users` aggregation for the cumulative cardinality + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "took": 11, + "timed_out": false, + "_shards": ..., + "hits": ..., + "aggregations": { + "users_per_day": { + "buckets": [ + { + "key_as_string": "2019-01-01T00:00:00.000Z", + "key": 1546300800000, + "doc_count": 2, + "distinct_users": { + "value": 2 + }, + "total_new_users": { + "value": 2 + } + }, + { + "key_as_string": "2019-01-02T00:00:00.000Z", + "key": 1546387200000, + "doc_count": 2, + "distinct_users": { + "value": 2 + }, + "total_new_users": { + "value": 3 + } + }, + { + "key_as_string": "2019-01-03T00:00:00.000Z", + "key": 1546473600000, + "doc_count": 3, + "distinct_users": { + "value": 3 + }, + "total_new_users": { + "value": 4 + } + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 11/"took": $body.took/] +// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/] +// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] + + +Note how the second day, `2019-01-02`, has two distinct users but the `total_new_users` metric generated by the +cumulative pipeline agg only increments to three. This means that only one of the two users that day were +new, the other had already been seen in the previous day. This happens again on the third day, where only +one of three users is completely new. + +==== Incremental cumulative cardinality + +The `cumulative_cardinality` agg will show you the total, distinct count since the beginning of the time period +being queried. Sometimes, however, it is useful to see the "incremental" count. Meaning, how many new users +are added each day, rather than the total cumulative count. + +This can be accomplished by adding a `derivative` aggregation to our query: + +[source,js] +-------------------------------------------------- +GET /user_hits/_search +{ + "size": 0, + "aggs" : { + "users_per_day" : { + "date_histogram" : { + "field" : "timestamp", + "calendar_interval" : "day" + }, + "aggs": { + "distinct_users": { + "cardinality": { + "field": "user_id" + } + }, + "total_new_users": { + "cumulative_cardinality": { + "buckets_path": "distinct_users" + } + }, + "incremental_new_users": { + "derivative": { + "buckets_path": "total_new_users" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:user_hits] + + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "took": 11, + "timed_out": false, + "_shards": ..., + "hits": ..., + "aggregations": { + "users_per_day": { + "buckets": [ + { + "key_as_string": "2019-01-01T00:00:00.000Z", + "key": 1546300800000, + "doc_count": 2, + "distinct_users": { + "value": 2 + }, + "total_new_users": { + "value": 2 + } + }, + { + "key_as_string": "2019-01-02T00:00:00.000Z", + "key": 1546387200000, + "doc_count": 2, + "distinct_users": { + "value": 2 + }, + "total_new_users": { + "value": 3 + }, + "incremental_new_users": { + "value": 1.0 + } + }, + { + "key_as_string": "2019-01-03T00:00:00.000Z", + "key": 1546473600000, + "doc_count": 3, + "distinct_users": { + "value": 3 + }, + "total_new_users": { + "value": 4 + }, + "incremental_new_users": { + "value": 1.0 + } + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 11/"took": $body.took/] +// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/] +// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] diff --git a/docs/reference/rest-api/info.asciidoc b/docs/reference/rest-api/info.asciidoc index 3fd860aab99..f48337ffd35 100644 --- a/docs/reference/rest-api/info.asciidoc +++ b/docs/reference/rest-api/info.asciidoc @@ -71,6 +71,10 @@ Example response: "available" : true, "enabled" : true }, + "data_science" : { + "available" : true, + "enabled" : true + }, "flattened" : { "available" : true, "enabled" : true diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java index bfe82c6bef6..c3132a29904 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java @@ -80,7 +80,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation return counts == null ? 0 : counts.cardinality(0); } - HyperLogLogPlusPlus getCounts() { + public HyperLogLogPlusPlus getCounts() { return counts; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java index a123d3620e4..ac58bdf4556 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -70,6 +70,9 @@ public class XPackLicenseState { "Creating and Starting rollup jobs will no longer be allowed.", "Stopping/Deleting existing jobs, RollupCaps API and RollupSearch continue to function." }); + messages.put(XPackField.DATA_SCIENCE, new String[] { + "Aggregations provided by Data Science plugin are no longer usable." + }); EXPIRATION_MESSAGES = Collections.unmodifiableMap(messages); } @@ -744,6 +747,15 @@ public class XPackLicenseState { return localStatus.active; } + /** + * Datascience is always available as long as there is a valid license + * + * @return true if the license is active + */ + public synchronized boolean isDataScienceAllowed() { + return status.active; + } + public synchronized boolean isTrialLicense() { return status.mode == OperationMode.TRIAL; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index d51c00036da..3a47135db9b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.SyncConfig; import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig; +import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage; import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction; import org.elasticsearch.xpack.core.flattened.FlattenedFeatureSetUsage; import org.elasticsearch.xpack.core.frozen.FrozenIndicesFeatureSetUsage; @@ -544,7 +545,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl // Frozen indices new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.FROZEN_INDICES, FrozenIndicesFeatureSetUsage::new), // Spatial - new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SPATIAL, SpatialFeatureSetUsage::new) + new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SPATIAL, SpatialFeatureSetUsage::new), + // data science + new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_SCIENCE, DataScienceFeatureSetUsage::new) ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java index 904db89bb54..4f61ff03ebf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java @@ -47,6 +47,8 @@ public final class XPackField { public static final String FROZEN_INDICES = "frozen_indices"; /** Name constant for spatial features. */ public static final String SPATIAL = "spatial"; + /** Name constant for the data science plugin. */ + public static final String DATA_SCIENCE = "data_science"; private XPackField() {} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java index 625ecb98c1b..5ad38596491 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java @@ -126,6 +126,10 @@ public class XPackSettings { /** Setting for enabling or disabling vectors. Defaults to true. */ public static final Setting VECTORS_ENABLED = Setting.boolSetting("xpack.vectors.enabled", true, Setting.Property.NodeScope); + /** Setting for enabling or disabling data science plugin. Defaults to true. */ + public static final Setting DATA_SCIENCE_ENABLED = Setting.boolSetting("xpack.data-science.enabled", + true, Setting.Property.NodeScope); + public static final List DEFAULT_SUPPORTED_PROTOCOLS; static { @@ -258,6 +262,7 @@ public class XPackSettings { settings.add(DATA_FRAME_ENABLED); settings.add(FLATTENED_ENABLED); settings.add(VECTORS_ENABLED); + settings.add(DATA_SCIENCE_ENABLED); return Collections.unmodifiableList(settings); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/DataScienceFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/DataScienceFeatureSetUsage.java new file mode 100644 index 00000000000..f4ed1b12c9b --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/DataScienceFeatureSetUsage.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.datascience; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.XPackFeatureSet; +import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction; + +import java.io.IOException; +import java.util.Objects; + +public class DataScienceFeatureSetUsage extends XPackFeatureSet.Usage { + + private final DataScienceStatsAction.Response response; + + public DataScienceFeatureSetUsage(boolean available, boolean enabled, DataScienceStatsAction.Response response) { + super(XPackField.DATA_SCIENCE, available, enabled); + this.response = response; + } + + public DataScienceFeatureSetUsage(StreamInput input) throws IOException { + super(input); + this.response = new DataScienceStatsAction.Response(input); + } + + @Override + public int hashCode() { + return Objects.hash(available, enabled, response); + } + + @Override + protected void innerXContent(XContentBuilder builder, Params params) throws IOException { + super.innerXContent(builder, params); + if (response != null) { + response.toXContent(builder, params); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + response.writeTo(out); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + DataScienceFeatureSetUsage other = (DataScienceFeatureSetUsage) obj; + return Objects.equals(available, other.available) + && Objects.equals(enabled, other.enabled) + && Objects.equals(response, other.response); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/action/DataScienceStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/action/DataScienceStatsAction.java new file mode 100644 index 00000000000..6a06dc39227 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/action/DataScienceStatsAction.java @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.datascience.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class DataScienceStatsAction extends ActionType { + public static final DataScienceStatsAction INSTANCE = new DataScienceStatsAction(); + public static final String NAME = "cluster:monitor/xpack/datascience/stats"; + + private DataScienceStatsAction() { + super(NAME, Response::new); + } + + public static class Request extends BaseNodesRequest implements ToXContentObject { + + public Request() { + super((String[]) null); + } + + public Request(StreamInput in) throws IOException { + super(in); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + // Nothing to hash atm, so just use the action name + return Objects.hashCode(NAME); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + return true; + } + } + + public static class NodeRequest extends BaseNodeRequest { + public NodeRequest(StreamInput in) throws IOException { + super(in); + } + + public NodeRequest(Request request) { + + } + } + + public static class Response extends BaseNodesResponse implements Writeable, ToXContentObject { + public Response(StreamInput in) throws IOException { + super(in); + } + + public Response(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray("stats"); + for (NodeResponse node : getNodes()) { + node.toXContent(builder, params); + } + builder.endArray(); + + return builder; + } + } + + public static class NodeResponse extends BaseNodeResponse implements ToXContentObject { + static ParseField CUMULATIVE_CARDINALITY_USAGE = new ParseField("cumulative_cardinality_usage"); + private long cumulativeCardinalityUsage; + + public NodeResponse(StreamInput in) throws IOException { + super(in); + cumulativeCardinalityUsage = in.readZLong(); + } + + public NodeResponse(DiscoveryNode node) { + super(node); + } + + public void setCumulativeCardinalityUsage(long cumulativeCardinalityUsage) { + this.cumulativeCardinalityUsage = cumulativeCardinalityUsage; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeZLong(cumulativeCardinalityUsage); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(CUMULATIVE_CARDINALITY_USAGE.getPreferredName(), cumulativeCardinalityUsage); + builder.endObject(); + return builder; + } + } +} diff --git a/x-pack/plugin/data-science/build.gradle b/x-pack/plugin/data-science/build.gradle new file mode 100644 index 00000000000..815491451fd --- /dev/null +++ b/x-pack/plugin/data-science/build.gradle @@ -0,0 +1,26 @@ +evaluationDependsOn(xpackModule('core')) + +apply plugin: 'elasticsearch.esplugin' +esplugin { + name 'x-pack-data-science' + description 'Elasticsearch Expanded Pack Plugin - Data Science' + classname 'org.elasticsearch.xpack.datascience.DataSciencePlugin' + extendedPlugins = ['x-pack-core'] +} +archivesBaseName = 'x-pack-data-science' + +compileJava.options.compilerArgs << "-Xlint:-rawtypes" +compileTestJava.options.compilerArgs << "-Xlint:-rawtypes" + + +dependencies { + compileOnly project(":server") + + compileOnly project(path: xpackModule('core'), configuration: 'default') + testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') + if (isEclipse) { + testCompile project(path: xpackModule('core-tests'), configuration: 'testArtifacts') + } +} + +integTest.enabled = false diff --git a/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataScienceAggregationBuilders.java b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataScienceAggregationBuilders.java new file mode 100644 index 00000000000..fa2365db1b8 --- /dev/null +++ b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataScienceAggregationBuilders.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience; + +import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; + +public class DataScienceAggregationBuilders { + + public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCaardinality(String name, String bucketsPath) { + return new CumulativeCardinalityPipelineAggregationBuilder(name, bucketsPath); + } +} diff --git a/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataScienceFeatureSet.java b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataScienceFeatureSet.java new file mode 100644 index 00000000000..7da8d6d888e --- /dev/null +++ b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataScienceFeatureSet.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.core.XPackFeatureSet; +import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage; +import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction; + +import java.util.Map; + +public class DataScienceFeatureSet implements XPackFeatureSet { + + private final boolean enabled; + private final XPackLicenseState licenseState; + private Client client; + + @Inject + public DataScienceFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState, Client client) { + this.enabled = XPackSettings.DATA_SCIENCE_ENABLED.get(settings); + this.licenseState = licenseState; + this.client = client; + } + + @Override + public String name() { + return XPackField.DATA_SCIENCE; + } + + @Override + public boolean available() { + return licenseState != null && licenseState.isDataScienceAllowed(); + } + + @Override + public boolean enabled() { + return enabled; + } + + @Override + public Map nativeCodeInfo() { + return null; + } + + @Override + public void usage(ActionListener listener) { + if (enabled) { + DataScienceStatsAction.Request request = new DataScienceStatsAction.Request(); + client.execute(DataScienceStatsAction.INSTANCE, request, + ActionListener.wrap(r -> listener.onResponse(new DataScienceFeatureSetUsage(available(), enabled(), r)), + listener::onFailure)); + + } else { + listener.onResponse(new DataScienceFeatureSetUsage(available(), enabled(), null)); + } + } +} diff --git a/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataSciencePlugin.java b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataSciencePlugin.java new file mode 100644 index 00000000000..2676d42a6a8 --- /dev/null +++ b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataSciencePlugin.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction; +import org.elasticsearch.xpack.datascience.action.TransportDataScienceStatsAction; +import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; +import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.singletonList; + +public class DataSciencePlugin extends Plugin implements SearchPlugin, ActionPlugin { + + // TODO this should probably become more structured once DataScience plugin has more than just one agg + public static AtomicLong cumulativeCardUsage = new AtomicLong(0); + private final boolean transportClientMode; + + public DataSciencePlugin(Settings settings) { + this.transportClientMode = XPackPlugin.transportClientMode(settings); + } + + public static XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } + + @Override + public List getPipelineAggregations() { + return singletonList(new PipelineAggregationSpec( + CumulativeCardinalityPipelineAggregationBuilder.NAME, + CumulativeCardinalityPipelineAggregationBuilder::new, + CumulativeCardinalityPipelineAggregator::new, + CumulativeCardinalityPipelineAggregationBuilder::parse)); + } + + @Override + public List> getActions() { + return singletonList( + new ActionHandler<>(DataScienceStatsAction.INSTANCE, TransportDataScienceStatsAction.class)); + } + + @Override + public Collection createGuiceModules() { + List modules = new ArrayList<>(); + + if (transportClientMode) { + return modules; + } + + modules.add(b -> XPackPlugin.bindFeatureSet(b, DataScienceFeatureSet.class)); + return modules; + } +} diff --git a/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsAction.java b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsAction.java new file mode 100644 index 00000000000..c14ee518f60 --- /dev/null +++ b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsAction.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction; +import org.elasticsearch.xpack.datascience.DataSciencePlugin; + +import java.io.IOException; +import java.util.List; + +public class TransportDataScienceStatsAction extends TransportNodesAction { + + + @Inject + public TransportDataScienceStatsAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters) { + super(DataScienceStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, + DataScienceStatsAction.Request::new, DataScienceStatsAction.NodeRequest::new, ThreadPool.Names.MANAGEMENT, + DataScienceStatsAction.NodeResponse.class); + } + + @Override + protected DataScienceStatsAction.Response newResponse(DataScienceStatsAction.Request request, + List nodes, + List failures) { + return new DataScienceStatsAction.Response(clusterService.getClusterName(), nodes, failures); + } + + @Override + protected DataScienceStatsAction.NodeRequest newNodeRequest(DataScienceStatsAction.Request request) { + return new DataScienceStatsAction.NodeRequest(request); + } + + @Override + protected DataScienceStatsAction.NodeResponse newNodeResponse(StreamInput in) throws IOException { + return new DataScienceStatsAction.NodeResponse(in); + } + + @Override + protected DataScienceStatsAction.NodeResponse nodeOperation(DataScienceStatsAction.NodeRequest request) { + DataScienceStatsAction.NodeResponse statsResponse = new DataScienceStatsAction.NodeResponse(clusterService.localNode()); + statsResponse.setCumulativeCardinalityUsage(DataSciencePlugin.cumulativeCardUsage.get()); + return statsResponse; + } + +} diff --git a/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregationBuilder.java b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregationBuilder.java new file mode 100644 index 00000000000..f24f09ccba4 --- /dev/null +++ b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregationBuilder.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience.cumulativecardinality; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.datascience.DataSciencePlugin; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; + +public class CumulativeCardinalityPipelineAggregationBuilder + extends AbstractPipelineAggregationBuilder { + public static final String NAME = "cumulative_cardinality"; + + private String format; + + private static final Function> PARSER + = name -> { + ConstructingObjectParser parser = new ConstructingObjectParser<>( + CumulativeCardinalityPipelineAggregationBuilder.NAME, + false, + o -> new CumulativeCardinalityPipelineAggregationBuilder(name, (String) o[0])); + + parser.declareString(ConstructingObjectParser.constructorArg(), BUCKETS_PATH_FIELD); + parser.declareString(CumulativeCardinalityPipelineAggregationBuilder::format, FORMAT); + return parser; + }; + + public CumulativeCardinalityPipelineAggregationBuilder(String name, String bucketsPath) { + super(name, NAME, new String[] { bucketsPath }); + } + + /** + * Read from a stream. + */ + public CumulativeCardinalityPipelineAggregationBuilder(StreamInput in) throws IOException { + super(in, NAME); + format = in.readOptionalString(); + } + + @Override + protected final void doWriteTo(StreamOutput out) throws IOException { + out.writeOptionalString(format); + } + + /** + * Sets the format to use on the output of this aggregation. + */ + public CumulativeCardinalityPipelineAggregationBuilder format(String format) { + if (format == null) { + throw new IllegalArgumentException("[format] must not be null: [" + name + "]"); + } + this.format = format; + return this; + } + + /** + * Gets the format to use on the output of this aggregation. + */ + public String format() { + return format; + } + + protected DocValueFormat formatter() { + if (format != null) { + return new DocValueFormat.Decimal(format); + } else { + return DocValueFormat.RAW; + } + } + + @Override + protected PipelineAggregator createInternal(Map metaData) { + return new CumulativeCardinalityPipelineAggregator(name, bucketsPaths, formatter(), metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, Collection aggFactories, + Collection pipelineAggregatorFactories) { + if (bucketsPaths.length != 1) { + throw new IllegalStateException(BUCKETS_PATH.getPreferredName() + + " must contain a single entry for aggregation [" + name + "]"); + } + + validateSequentiallyOrderedParentAggs(parent, NAME, name); + } + + @Override + protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format); + } + return builder; + } + + public static CumulativeCardinalityPipelineAggregationBuilder parse(String aggName, XContentParser parser) { + if (DataSciencePlugin.getLicenseState().isDataScienceAllowed() == false) { + throw LicenseUtils.newComplianceException(XPackField.DATA_SCIENCE); + } + + // Increment usage here since it is a good boundary between internal and external, and should correlate 1:1 with + // usage and not internal instantiations + DataSciencePlugin.cumulativeCardUsage.incrementAndGet(); + return PARSER.apply(aggName).apply(parser, null); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), format); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + CumulativeCardinalityPipelineAggregationBuilder other = (CumulativeCardinalityPipelineAggregationBuilder) obj; + return Objects.equals(format, other.format); + } + + @Override + public String getWriteableName() { + return NAME; + } +} diff --git a/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java new file mode 100644 index 00000000000..5e79c1f3095 --- /dev/null +++ b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java @@ -0,0 +1,123 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience.cumulativecardinality; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; +import org.elasticsearch.search.aggregations.metrics.HyperLogLogPlusPlus; +import org.elasticsearch.search.aggregations.metrics.InternalCardinality; +import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.AggregationPath; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator { + private final DocValueFormat formatter; + + CumulativeCardinalityPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, Map metadata) { + super(name, bucketsPaths, metadata); + this.formatter = formatter; + } + + /** + * Read from a stream. + */ + public CumulativeCardinalityPipelineAggregator(StreamInput in) throws IOException { + super(in); + formatter = in.readNamedWriteable(DocValueFormat.class); + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(formatter); + } + + @Override + public String getWriteableName() { + return CumulativeCardinalityPipelineAggregationBuilder.NAME; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalMultiBucketAggregation + histo = (InternalMultiBucketAggregation) aggregation; + List buckets = histo.getBuckets(); + HistogramFactory factory = (HistogramFactory) histo; + List newBuckets = new ArrayList<>(buckets.size()); + HyperLogLogPlusPlus hll = null; + + try { + long cardinality = 0; + for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) { + HyperLogLogPlusPlus bucketHll = resolveBucketValue(histo, bucket, bucketsPaths()[0]); + if (hll == null && bucketHll != null) { + // We have to create a new HLL because otherwise it will alter the + // existing cardinality sketch and bucket value + hll = new HyperLogLogPlusPlus(bucketHll.precision(), reduceContext.bigArrays(), 1); + } + if (bucketHll != null) { + hll.merge(0, bucketHll, 0); + cardinality = hll.cardinality(0); + } + + List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) + .map((p) -> (InternalAggregation) p) + .collect(Collectors.toList()); + aggs.add(new InternalSimpleLongValue(name(), cardinality, formatter, new ArrayList<>(), metaData())); + Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs)); + newBuckets.add(newBucket); + } + return factory.createAggregation(newBuckets); + } finally { + if (hll != null) { + hll.close(); + } + } + } + + private HyperLogLogPlusPlus resolveBucketValue(MultiBucketsAggregation agg, + InternalMultiBucketAggregation.InternalBucket bucket, + String aggPath) { + List aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList(); + Object propertyValue = bucket.getProperty(agg.getName(), aggPathsList); + if (propertyValue == null) { + throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName() + + " must reference a cardinality aggregation"); + } + + if (propertyValue instanceof InternalCardinality) { + return ((InternalCardinality) propertyValue).getCounts(); + } + + String currentAggName; + if (aggPathsList.isEmpty()) { + currentAggName = agg.getName(); + } else { + currentAggName = aggPathsList.get(0); + } + + throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName() + + " must reference a cardinality aggregation, got: [" + + propertyValue.getClass().getSimpleName() + "] at aggregation [" + currentAggName + "]"); + } + +} diff --git a/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/InternalSimpleLongValue.java b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/InternalSimpleLongValue.java new file mode 100644 index 00000000000..bd9c7903f4a --- /dev/null +++ b/x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/InternalSimpleLongValue.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience.cumulativecardinality; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.SimpleValue; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class InternalSimpleLongValue extends InternalNumericMetricsAggregation.SingleValue implements SimpleValue { + public static final String NAME = "simple_long_value"; + protected final long value; + + public InternalSimpleLongValue(String name, long value, DocValueFormat formatter, List pipelineAggregators, + Map metaData) { + super(name, pipelineAggregators, metaData); + this.format = formatter; + this.value = value; + } + + /** + * Read from a stream. + */ + public InternalSimpleLongValue(StreamInput in) throws IOException { + super(in); + format = in.readNamedWriteable(DocValueFormat.class); + value = in.readZLong(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + out.writeZLong(value); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public double value() { + return value; + } + + public long getValue() { + return value; + } + + DocValueFormat formatter() { + return format; + } + + @Override + public InternalSimpleLongValue doReduce(List aggregations, ReduceContext reduceContext) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value)); + builder.field(CommonFields.VALUE.getPreferredName(), hasValue ? value : null); + if (hasValue && format != DocValueFormat.RAW) { + builder.field(CommonFields.VALUE_AS_STRING.getPreferredName(), format.format(value).toString()); + } + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), value); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + InternalSimpleLongValue other = (InternalSimpleLongValue) obj; + return Objects.equals(value, other.value); + } +} diff --git a/x-pack/plugin/data-science/src/main/plugin-metadata/plugin-security.policy b/x-pack/plugin/data-science/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 00000000000..e69de29bb2d diff --git a/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/StubAggregatorFactory.java b/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/StubAggregatorFactory.java new file mode 100644 index 00000000000..fd45a15c765 --- /dev/null +++ b/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/StubAggregatorFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test implementation for AggregatorFactory. + */ +public class StubAggregatorFactory extends AggregatorFactory { + + private final Aggregator aggregator; + + private StubAggregatorFactory(SearchContext context, Aggregator aggregator) throws IOException { + super("_name", context, null, new AggregatorFactories.Builder(), Collections.emptyMap()); + this.aggregator = aggregator; + } + + @Override + protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List list, Map metaData) throws IOException { + return aggregator; + } + + public static StubAggregatorFactory createInstance() throws IOException { + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + SearchContext searchContext = mock(SearchContext.class); + when(searchContext.bigArrays()).thenReturn(bigArrays); + + Aggregator aggregator = mock(Aggregator.class); + + return new StubAggregatorFactory(searchContext, aggregator); + } +} diff --git a/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsActionTests.java b/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsActionTests.java new file mode 100644 index 00000000000..8ce58bbf83e --- /dev/null +++ b/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsActionTests.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportDataScienceStatsActionTests extends ESTestCase { + + private TransportDataScienceStatsAction action; + + @Before + public void setupTransportAction() { + TransportService transportService = mock(TransportService.class); + ThreadPool threadPool = mock(ThreadPool.class); + + ClusterService clusterService = mock(ClusterService.class); + DiscoveryNode discoveryNode = new DiscoveryNode("nodeId", buildNewFakeTransportAddress(), Version.CURRENT); + when(clusterService.localNode()).thenReturn(discoveryNode); + + ClusterName clusterName = new ClusterName("cluster_name"); + when(clusterService.getClusterName()).thenReturn(clusterName); + + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA); + when(clusterService.state()).thenReturn(clusterState); + + + action = new TransportDataScienceStatsAction(transportService, clusterService, threadPool, new + ActionFilters(Collections.emptySet())); + } + + public void testCumulativeCardStats() throws Exception { + DataScienceStatsAction.Request request = new DataScienceStatsAction.Request(); + DataScienceStatsAction.NodeResponse nodeResponse1 = action.nodeOperation(new DataScienceStatsAction.NodeRequest(request)); + DataScienceStatsAction.NodeResponse nodeResponse2 = action.nodeOperation(new DataScienceStatsAction.NodeRequest(request)); + + DataScienceStatsAction.Response response = action.newResponse(request, + Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList()); + + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + ObjectPath objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + assertThat(objectPath.evaluate("stats.0.cumulative_cardinality_usage"), equalTo(0)); + assertThat(objectPath.evaluate("stats.1.cumulative_cardinality_usage"), equalTo(0)); + } + } +} diff --git a/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityAggregatorTests.java b/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityAggregatorTests.java new file mode 100644 index 00000000000..9cf2c9846b2 --- /dev/null +++ b/x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityAggregatorTests.java @@ -0,0 +1,255 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.datascience.cumulativecardinality; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.time.DateFormatters; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalOrder; +import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory; +import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.xpack.datascience.StubAggregatorFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class CumulativeCardinalityAggregatorTests extends AggregatorTestCase { + + private static final String HISTO_FIELD = "histo"; + private static final String VALUE_FIELD = "value_field"; + + private static final List datasetTimes = Arrays.asList( + "2017-01-01T01:07:45", //1 + "2017-01-01T03:43:34", //1 + "2017-01-03T04:11:00", //3 + "2017-01-03T05:11:31", //1 + "2017-01-05T08:24:05", //5 + "2017-01-05T13:09:32", //1 + "2017-01-07T13:47:43", //7 + "2017-01-08T16:14:34", //1 + "2017-01-09T17:09:50", //9 + "2017-01-09T22:55:46");//10 + + private static final List datasetValues = Arrays.asList(1,1,3,1,5,1,7,1,9,10); + private static final List cumulativeCardinality = Arrays.asList(1.0,1.0,2.0,2.0,3.0,3.0,4.0,4.0,6.0); + + public void testSimple() throws IOException { + + Query query = new MatchAllDocsQuery(); + + DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); + aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD); + aggBuilder.subAggregation(new CardinalityAggregationBuilder("the_cardinality", ValueType.NUMERIC).field(VALUE_FIELD)); + aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_cardinality")); + + executeTestCase(query, aggBuilder, histogram -> { + assertEquals(9, ((Histogram)histogram).getBuckets().size()); + List buckets = ((Histogram)histogram).getBuckets(); + int counter = 0; + for (Histogram.Bucket bucket : buckets) { + assertThat(((InternalSimpleLongValue) (bucket.getAggregations().get("cumulative_card"))).value(), + equalTo(cumulativeCardinality.get(counter))); + counter += 1; + } + }); + } + + public void testAllNull() throws IOException { + Query query = new MatchAllDocsQuery(); + + DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); + aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD); + aggBuilder.subAggregation(new CardinalityAggregationBuilder("the_cardinality", ValueType.NUMERIC).field("foo")); + aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_cardinality")); + + executeTestCase(query, aggBuilder, histogram -> { + assertEquals(9, ((Histogram)histogram).getBuckets().size()); + List buckets = ((Histogram)histogram).getBuckets(); + for (Histogram.Bucket bucket : buckets) { + assertThat(((InternalSimpleLongValue) (bucket.getAggregations().get("cumulative_card"))).value(), equalTo(0.0)); + } + }); + } + + public void testParentValidations() throws IOException { + ValuesSourceConfig numericVS = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC); + + // Histogram + Set aggBuilders = new HashSet<>(); + aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum")); + AggregatorFactory parent = new HistogramAggregatorFactory("name", numericVS, 0.0d, 0.0d, + mock(InternalOrder.class), false, 0L, 0.0d, 1.0d, mock(SearchContext.class), null, + new AggregatorFactories.Builder(), Collections.emptyMap()); + CumulativeCardinalityPipelineAggregationBuilder builder + = new CumulativeCardinalityPipelineAggregationBuilder("name", "valid"); + builder.validate(parent, Collections.emptySet(), aggBuilders); + + // Date Histogram + aggBuilders.clear(); + aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum")); + parent = new DateHistogramAggregatorFactory("name", numericVS, 0L, + mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class), + mock(ExtendedBounds.class), mock(SearchContext.class), mock(AggregatorFactory.class), + new AggregatorFactories.Builder(), Collections.emptyMap()); + builder = new CumulativeCardinalityPipelineAggregationBuilder("name", "valid"); + builder.validate(parent, Collections.emptySet(), aggBuilders); + + // Auto Date Histogram + aggBuilders.clear(); + aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum")); + AutoDateHistogramAggregationBuilder.RoundingInfo[] roundings = new AutoDateHistogramAggregationBuilder.RoundingInfo[1]; + parent = new AutoDateHistogramAggregatorFactory("name", numericVS, + 1, roundings, + mock(SearchContext.class), null, new AggregatorFactories.Builder(), Collections.emptyMap()); + builder = new CumulativeCardinalityPipelineAggregationBuilder("name", "valid"); + builder.validate(parent, Collections.emptySet(), aggBuilders); + + // Mocked "test" agg, should fail validation + aggBuilders.clear(); + aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum")); + StubAggregatorFactory parentFactory = StubAggregatorFactory.createInstance(); + + CumulativeCardinalityPipelineAggregationBuilder failBuilder + = new CumulativeCardinalityPipelineAggregationBuilder("name", "invalid_agg>metric"); + IllegalStateException ex = expectThrows(IllegalStateException.class, + () -> failBuilder.validate(parentFactory, Collections.emptySet(), aggBuilders)); + assertEquals("cumulative_cardinality aggregation [name] must have a histogram, date_histogram or auto_date_histogram as parent", + ex.getMessage()); + } + + public void testNonCardinalityAgg() { + Query query = new MatchAllDocsQuery(); + + DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); + aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD); + aggBuilder.subAggregation(new SumAggregationBuilder("the_sum").field("foo")); + aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_sum")); + + AggregationExecutionException e = expectThrows(AggregationExecutionException.class, + () -> executeTestCase(query, aggBuilder, histogram -> fail("Test should not have executed"))); + assertThat(e.getMessage(), equalTo("buckets_path must reference a cardinality aggregation, " + + "got: [InternalSum] at aggregation [the_sum]")); + } + + private void executeTestCase(Query query, AggregationBuilder aggBuilder, Consumer verify) throws IOException { + executeTestCase(query, aggBuilder, verify, indexWriter -> { + Document document = new Document(); + int counter = 0; + for (String date : datasetTimes) { + if (frequently()) { + indexWriter.commit(); + } + + long instant = asLong(date); + document.add(new SortedNumericDocValuesField(HISTO_FIELD, instant)); + document.add(new NumericDocValuesField(VALUE_FIELD, datasetValues.get(counter))); + indexWriter.addDocument(document); + document.clear(); + counter += 1; + } + }); + } + + private void executeTestCase(Query query, AggregationBuilder aggBuilder, Consumer verify, + CheckedConsumer setup) throws IOException { + + + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + setup.accept(indexWriter); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + DateFieldMapper.Builder builder = new DateFieldMapper.Builder("_name"); + DateFieldMapper.DateFieldType fieldType = builder.fieldType(); + fieldType.setHasDocValues(true); + fieldType.setName(HISTO_FIELD); + + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + valueFieldType.setHasDocValues(true); + valueFieldType.setName("value_field"); + + InternalAggregation histogram; + histogram = searchAndReduce(indexSearcher, query, aggBuilder, fieldType, valueFieldType); + verify.accept(histogram); + } + } + } + + private static long asLong(String dateTime) { + return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli(); + } + + + private static AggregatorFactory getRandomSequentiallyOrderedParentAgg() throws IOException { + AggregatorFactory factory; + ValuesSourceConfig numericVS = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC); + switch (randomIntBetween(0, 2)) { + case 0: + factory = new HistogramAggregatorFactory("name", numericVS, 0.0d, 0.0d, + mock(InternalOrder.class), false, 0L, 0.0d, 1.0d, mock(SearchContext.class), null, + new AggregatorFactories.Builder(), Collections.emptyMap()); + break; + case 1: + factory = new DateHistogramAggregatorFactory("name", numericVS, 0L, + mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class), + mock(ExtendedBounds.class), mock(SearchContext.class), mock(AggregatorFactory.class), + new AggregatorFactories.Builder(), Collections.emptyMap()); + break; + case 2: + default: + AutoDateHistogramAggregationBuilder.RoundingInfo[] roundings = new AutoDateHistogramAggregationBuilder.RoundingInfo[1]; + factory = new AutoDateHistogramAggregatorFactory("name", numericVS, + 1, roundings, + mock(SearchContext.class), null, new AggregatorFactories.Builder(), Collections.emptyMap()); + } + + return factory; + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_science/cumulative_cardinality.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_science/cumulative_cardinality.yml new file mode 100644 index 00000000000..b59912e86f2 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_science/cumulative_cardinality.yml @@ -0,0 +1,86 @@ +setup: + - skip: + features: headers + - do: + indices.create: + index: foo + body: + mappings: + properties: + timestamp: + type: date + user: + type: keyword + + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + bulk: + refresh: true + body: + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "a" + + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "b" + + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "c" + + - index: + _index: "foo" + - timestamp: "2017-01-02T05:00:00Z" + user: "a" + + - index: + _index: "foo" + - timestamp: "2017-01-02T05:00:00Z" + user: "b" + + - index: + _index: "foo" + - timestamp: "2017-01-03T05:00:00Z" + user: "d" + +--- +"Basic Search": + + - do: + search: + index: "foo" + body: + size: 0 + aggs: + histo: + date_histogram: + field: "timestamp" + calendar_interval: "day" + aggs: + distinct_users: + cardinality: + field: "user" + total_users: + cumulative_cardinality: + buckets_path: "distinct_users" + + - length: { aggregations.histo.buckets: 3 } + - match: { aggregations.histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" } + - match: { aggregations.histo.buckets.0.doc_count: 3 } + - match: { aggregations.histo.buckets.0.distinct_users.value: 3 } + - match: { aggregations.histo.buckets.0.total_users.value: 3 } + - match: { aggregations.histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" } + - match: { aggregations.histo.buckets.1.doc_count: 2 } + - match: { aggregations.histo.buckets.1.distinct_users.value: 2 } + - match: { aggregations.histo.buckets.1.total_users.value: 3 } + - match: { aggregations.histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" } + - match: { aggregations.histo.buckets.2.doc_count: 1 } + - match: { aggregations.histo.buckets.2.distinct_users.value: 1 } + - match: { aggregations.histo.buckets.2.total_users.value: 4 } + diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/10_basic.yml index 514ba618240..0b593a87d7e 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/10_basic.yml @@ -25,3 +25,4 @@ - contains: { nodes.$master.modules: { name: x-pack-security } } - contains: { nodes.$master.modules: { name: x-pack-sql } } - contains: { nodes.$master.modules: { name: x-pack-watcher } } + - contains: { nodes.$master.modules: { name: x-pack-data-science } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/15_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/15_basic.yml index 1f2e5ce9625..d0e1a9b773e 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/15_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/15_basic.yml @@ -28,6 +28,8 @@ - is_true: features.monitoring - is_true: features.monitoring.enabled # - is_false: features.monitoring.available TODO fix once licensing is fixed + - is_true: features.data_science + - is_true: features.data_science.enabled - do: license.post: @@ -77,6 +79,8 @@ - is_true: features.monitoring - is_true: features.monitoring.enabled - is_true: features.monitoring.available + - is_true: features.data_science.enabled + - is_true: features.data_science.available - is_true: tagline - do: @@ -89,6 +93,7 @@ - is_true: graph.available - is_true: monitoring.enabled - is_true: monitoring.available + - is_true: data_science.available - do: xpack.info: