Add Cumulative Cardinality agg (and Data Science plugin) (#45990)
This adds a pipeline aggregation that calculates the cumulative cardinality of a field. It does this by iteratively merging in the HLL sketch from consecutive buckets and emitting the cardinality up to that point. This is useful for things like finding the total "new" users that have visited a website (as opposed to "repeat" visitors). This is a Basic+ aggregation and adds a new Data Science plugin to house it and future advanced analytics/data science aggregations.
This commit is contained in:
parent
a3a4ae0ac2
commit
943a016bb2
|
@ -425,6 +425,7 @@ task run(type: RunTask) {
|
|||
setting 'xpack.monitoring.enabled', 'true'
|
||||
setting 'xpack.sql.enabled', 'true'
|
||||
setting 'xpack.rollup.enabled', 'true'
|
||||
setting 'xpack.data-science.enabled', 'true'
|
||||
keystoreSetting 'bootstrap.password', 'password'
|
||||
}
|
||||
}
|
||||
|
|
|
@ -218,6 +218,42 @@ buildRestTests.setups['sales'] = '''
|
|||
{"index":{}}
|
||||
{"date": "2015/03/01 00:00:00", "price": 175, "promoted": false, "rating": 2, "type": "t-shirt"}'''
|
||||
|
||||
// Used by cumulative cardinality aggregation docs
|
||||
buildRestTests.setups['user_hits'] = '''
|
||||
- do:
|
||||
indices.create:
|
||||
index: user_hits
|
||||
body:
|
||||
settings:
|
||||
number_of_shards: 1
|
||||
number_of_replicas: 0
|
||||
mappings:
|
||||
properties:
|
||||
user_id:
|
||||
type: keyword
|
||||
timestamp:
|
||||
type: date
|
||||
- do:
|
||||
bulk:
|
||||
index: user_hits
|
||||
refresh: true
|
||||
body: |
|
||||
{"index":{}}
|
||||
{"timestamp": "2019-01-01T13:00:00", "user_id": "1"}
|
||||
{"index":{}}
|
||||
{"timestamp": "2019-01-01T13:00:00", "user_id": "2"}
|
||||
{"index":{}}
|
||||
{"timestamp": "2019-01-02T13:00:00", "user_id": "1"}
|
||||
{"index":{}}
|
||||
{"timestamp": "2019-01-02T13:00:00", "user_id": "3"}
|
||||
{"index":{}}
|
||||
{"timestamp": "2019-01-03T13:00:00", "user_id": "1"}
|
||||
{"index":{}}
|
||||
{"timestamp": "2019-01-03T13:00:00", "user_id": "2"}
|
||||
{"index":{}}
|
||||
{"timestamp": "2019-01-03T13:00:00", "user_id": "4"}'''
|
||||
|
||||
|
||||
// Dummy bank account data used by getting-started.asciidoc
|
||||
buildRestTests.setups['bank'] = '''
|
||||
- do:
|
||||
|
|
|
@ -0,0 +1,235 @@
|
|||
[role="xpack"]
|
||||
[testenv="basic"]
|
||||
[[search-aggregations-pipeline-cumulative-cardinality-aggregation]]
|
||||
=== Cumulative Cardinality Aggregation
|
||||
|
||||
A parent pipeline aggregation which calculates the Cumulative Cardinality in a parent histogram (or date_histogram)
|
||||
aggregation. The specified metric must be a cardinality aggregation and the enclosing histogram
|
||||
must have `min_doc_count` set to `0` (default for `histogram` aggregations).
|
||||
|
||||
The `cumulative_cardinality` agg is useful for finding "total new items", like the number of new visitors to your
|
||||
website each day. A regular cardinality aggregation will tell you how many unique visitors came each day, but doesn't
|
||||
differentiate between "new" or "repeat" visitors. The Cumulative Cardinality aggregation can be used to determine
|
||||
how many of each day's unique visitors are "new".
|
||||
|
||||
==== Syntax
|
||||
|
||||
A `cumulative_cardinality` aggregation looks like this in isolation:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"cumulative_cardinality": {
|
||||
"buckets_path": "my_cardinality_agg"
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// NOTCONSOLE
|
||||
|
||||
[[cumulative-cardinality-params]]
|
||||
.`cumulative_cardinality` Parameters
|
||||
[options="header"]
|
||||
|===
|
||||
|Parameter Name |Description |Required |Default Value
|
||||
|`buckets_path` |The path to the cardinality aggregation we wish to find the cumulative cardinality for (see <<buckets-path-syntax>> for more
|
||||
details) |Required |
|
||||
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|
||||
|===
|
||||
|
||||
The following snippet calculates the cumulative cardinality of the total daily `users`:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
GET /user_hits/_search
|
||||
{
|
||||
"size": 0,
|
||||
"aggs" : {
|
||||
"users_per_day" : {
|
||||
"date_histogram" : {
|
||||
"field" : "timestamp",
|
||||
"calendar_interval" : "day"
|
||||
},
|
||||
"aggs": {
|
||||
"distinct_users": {
|
||||
"cardinality": {
|
||||
"field": "user_id"
|
||||
}
|
||||
},
|
||||
"total_new_users": {
|
||||
"cumulative_cardinality": {
|
||||
"buckets_path": "distinct_users" <1>
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:user_hits]
|
||||
|
||||
<1> `buckets_path` instructs this aggregation to use the output of the `distinct_users` aggregation for the cumulative cardinality
|
||||
|
||||
And the following may be the response:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"took": 11,
|
||||
"timed_out": false,
|
||||
"_shards": ...,
|
||||
"hits": ...,
|
||||
"aggregations": {
|
||||
"users_per_day": {
|
||||
"buckets": [
|
||||
{
|
||||
"key_as_string": "2019-01-01T00:00:00.000Z",
|
||||
"key": 1546300800000,
|
||||
"doc_count": 2,
|
||||
"distinct_users": {
|
||||
"value": 2
|
||||
},
|
||||
"total_new_users": {
|
||||
"value": 2
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_as_string": "2019-01-02T00:00:00.000Z",
|
||||
"key": 1546387200000,
|
||||
"doc_count": 2,
|
||||
"distinct_users": {
|
||||
"value": 2
|
||||
},
|
||||
"total_new_users": {
|
||||
"value": 3
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_as_string": "2019-01-03T00:00:00.000Z",
|
||||
"key": 1546473600000,
|
||||
"doc_count": 3,
|
||||
"distinct_users": {
|
||||
"value": 3
|
||||
},
|
||||
"total_new_users": {
|
||||
"value": 4
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
|
||||
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
|
||||
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
|
||||
|
||||
|
||||
Note how the second day, `2019-01-02`, has two distinct users but the `total_new_users` metric generated by the
|
||||
cumulative pipeline agg only increments to three. This means that only one of the two users that day were
|
||||
new, the other had already been seen in the previous day. This happens again on the third day, where only
|
||||
one of three users is completely new.
|
||||
|
||||
==== Incremental cumulative cardinality
|
||||
|
||||
The `cumulative_cardinality` agg will show you the total, distinct count since the beginning of the time period
|
||||
being queried. Sometimes, however, it is useful to see the "incremental" count. Meaning, how many new users
|
||||
are added each day, rather than the total cumulative count.
|
||||
|
||||
This can be accomplished by adding a `derivative` aggregation to our query:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
GET /user_hits/_search
|
||||
{
|
||||
"size": 0,
|
||||
"aggs" : {
|
||||
"users_per_day" : {
|
||||
"date_histogram" : {
|
||||
"field" : "timestamp",
|
||||
"calendar_interval" : "day"
|
||||
},
|
||||
"aggs": {
|
||||
"distinct_users": {
|
||||
"cardinality": {
|
||||
"field": "user_id"
|
||||
}
|
||||
},
|
||||
"total_new_users": {
|
||||
"cumulative_cardinality": {
|
||||
"buckets_path": "distinct_users"
|
||||
}
|
||||
},
|
||||
"incremental_new_users": {
|
||||
"derivative": {
|
||||
"buckets_path": "total_new_users"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:user_hits]
|
||||
|
||||
|
||||
And the following may be the response:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"took": 11,
|
||||
"timed_out": false,
|
||||
"_shards": ...,
|
||||
"hits": ...,
|
||||
"aggregations": {
|
||||
"users_per_day": {
|
||||
"buckets": [
|
||||
{
|
||||
"key_as_string": "2019-01-01T00:00:00.000Z",
|
||||
"key": 1546300800000,
|
||||
"doc_count": 2,
|
||||
"distinct_users": {
|
||||
"value": 2
|
||||
},
|
||||
"total_new_users": {
|
||||
"value": 2
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_as_string": "2019-01-02T00:00:00.000Z",
|
||||
"key": 1546387200000,
|
||||
"doc_count": 2,
|
||||
"distinct_users": {
|
||||
"value": 2
|
||||
},
|
||||
"total_new_users": {
|
||||
"value": 3
|
||||
},
|
||||
"incremental_new_users": {
|
||||
"value": 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_as_string": "2019-01-03T00:00:00.000Z",
|
||||
"key": 1546473600000,
|
||||
"doc_count": 3,
|
||||
"distinct_users": {
|
||||
"value": 3
|
||||
},
|
||||
"total_new_users": {
|
||||
"value": 4
|
||||
},
|
||||
"incremental_new_users": {
|
||||
"value": 1.0
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
|
||||
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
|
||||
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
|
|
@ -71,6 +71,10 @@ Example response:
|
|||
"available" : true,
|
||||
"enabled" : true
|
||||
},
|
||||
"data_science" : {
|
||||
"available" : true,
|
||||
"enabled" : true
|
||||
},
|
||||
"flattened" : {
|
||||
"available" : true,
|
||||
"enabled" : true
|
||||
|
|
|
@ -80,7 +80,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
|
|||
return counts == null ? 0 : counts.cardinality(0);
|
||||
}
|
||||
|
||||
HyperLogLogPlusPlus getCounts() {
|
||||
public HyperLogLogPlusPlus getCounts() {
|
||||
return counts;
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,9 @@ public class XPackLicenseState {
|
|||
"Creating and Starting rollup jobs will no longer be allowed.",
|
||||
"Stopping/Deleting existing jobs, RollupCaps API and RollupSearch continue to function."
|
||||
});
|
||||
messages.put(XPackField.DATA_SCIENCE, new String[] {
|
||||
"Aggregations provided by Data Science plugin are no longer usable."
|
||||
});
|
||||
EXPIRATION_MESSAGES = Collections.unmodifiableMap(messages);
|
||||
}
|
||||
|
||||
|
@ -744,6 +747,15 @@ public class XPackLicenseState {
|
|||
return localStatus.active;
|
||||
}
|
||||
|
||||
/**
|
||||
* Datascience is always available as long as there is a valid license
|
||||
*
|
||||
* @return true if the license is active
|
||||
*/
|
||||
public synchronized boolean isDataScienceAllowed() {
|
||||
return status.active;
|
||||
}
|
||||
|
||||
public synchronized boolean isTrialLicense() {
|
||||
return status.mode == OperationMode.TRIAL;
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
|
|||
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
|
||||
import org.elasticsearch.xpack.core.dataframe.transforms.SyncConfig;
|
||||
import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
|
||||
import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
|
||||
import org.elasticsearch.xpack.core.flattened.FlattenedFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.frozen.FrozenIndicesFeatureSetUsage;
|
||||
|
@ -544,7 +545,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
// Frozen indices
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.FROZEN_INDICES, FrozenIndicesFeatureSetUsage::new),
|
||||
// Spatial
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SPATIAL, SpatialFeatureSetUsage::new)
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SPATIAL, SpatialFeatureSetUsage::new),
|
||||
// data science
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_SCIENCE, DataScienceFeatureSetUsage::new)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,8 @@ public final class XPackField {
|
|||
public static final String FROZEN_INDICES = "frozen_indices";
|
||||
/** Name constant for spatial features. */
|
||||
public static final String SPATIAL = "spatial";
|
||||
/** Name constant for the data science plugin. */
|
||||
public static final String DATA_SCIENCE = "data_science";
|
||||
|
||||
private XPackField() {}
|
||||
|
||||
|
|
|
@ -126,6 +126,10 @@ public class XPackSettings {
|
|||
/** Setting for enabling or disabling vectors. Defaults to true. */
|
||||
public static final Setting<Boolean> VECTORS_ENABLED = Setting.boolSetting("xpack.vectors.enabled", true, Setting.Property.NodeScope);
|
||||
|
||||
/** Setting for enabling or disabling data science plugin. Defaults to true. */
|
||||
public static final Setting<Boolean> DATA_SCIENCE_ENABLED = Setting.boolSetting("xpack.data-science.enabled",
|
||||
true, Setting.Property.NodeScope);
|
||||
|
||||
public static final List<String> DEFAULT_SUPPORTED_PROTOCOLS;
|
||||
|
||||
static {
|
||||
|
@ -258,6 +262,7 @@ public class XPackSettings {
|
|||
settings.add(DATA_FRAME_ENABLED);
|
||||
settings.add(FLATTENED_ENABLED);
|
||||
settings.add(VECTORS_ENABLED);
|
||||
settings.add(DATA_SCIENCE_ENABLED);
|
||||
return Collections.unmodifiableList(settings);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.datascience;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.XPackFeatureSet;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class DataScienceFeatureSetUsage extends XPackFeatureSet.Usage {
|
||||
|
||||
private final DataScienceStatsAction.Response response;
|
||||
|
||||
public DataScienceFeatureSetUsage(boolean available, boolean enabled, DataScienceStatsAction.Response response) {
|
||||
super(XPackField.DATA_SCIENCE, available, enabled);
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
public DataScienceFeatureSetUsage(StreamInput input) throws IOException {
|
||||
super(input);
|
||||
this.response = new DataScienceStatsAction.Response(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(available, enabled, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
super.innerXContent(builder, params);
|
||||
if (response != null) {
|
||||
response.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
response.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
DataScienceFeatureSetUsage other = (DataScienceFeatureSetUsage) obj;
|
||||
return Objects.equals(available, other.available)
|
||||
&& Objects.equals(enabled, other.enabled)
|
||||
&& Objects.equals(response, other.response);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.datascience.action;
|
||||
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
|
||||
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
|
||||
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
|
||||
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class DataScienceStatsAction extends ActionType<DataScienceStatsAction.Response> {
|
||||
public static final DataScienceStatsAction INSTANCE = new DataScienceStatsAction();
|
||||
public static final String NAME = "cluster:monitor/xpack/datascience/stats";
|
||||
|
||||
private DataScienceStatsAction() {
|
||||
super(NAME, Response::new);
|
||||
}
|
||||
|
||||
public static class Request extends BaseNodesRequest<Request> implements ToXContentObject {
|
||||
|
||||
public Request() {
|
||||
super((String[]) null);
|
||||
}
|
||||
|
||||
public Request(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
// Nothing to hash atm, so just use the action name
|
||||
return Objects.hashCode(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class NodeRequest extends BaseNodeRequest {
|
||||
public NodeRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
public NodeRequest(Request request) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response extends BaseNodesResponse<NodeResponse> implements Writeable, ToXContentObject {
|
||||
public Response(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
public Response(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
|
||||
super(clusterName, nodes, failures);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
|
||||
return in.readList(NodeResponse::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
|
||||
out.writeList(nodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startArray("stats");
|
||||
for (NodeResponse node : getNodes()) {
|
||||
node.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
||||
public static class NodeResponse extends BaseNodeResponse implements ToXContentObject {
|
||||
static ParseField CUMULATIVE_CARDINALITY_USAGE = new ParseField("cumulative_cardinality_usage");
|
||||
private long cumulativeCardinalityUsage;
|
||||
|
||||
public NodeResponse(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
cumulativeCardinalityUsage = in.readZLong();
|
||||
}
|
||||
|
||||
public NodeResponse(DiscoveryNode node) {
|
||||
super(node);
|
||||
}
|
||||
|
||||
public void setCumulativeCardinalityUsage(long cumulativeCardinalityUsage) {
|
||||
this.cumulativeCardinalityUsage = cumulativeCardinalityUsage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeZLong(cumulativeCardinalityUsage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(CUMULATIVE_CARDINALITY_USAGE.getPreferredName(), cumulativeCardinalityUsage);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
evaluationDependsOn(xpackModule('core'))
|
||||
|
||||
apply plugin: 'elasticsearch.esplugin'
|
||||
esplugin {
|
||||
name 'x-pack-data-science'
|
||||
description 'Elasticsearch Expanded Pack Plugin - Data Science'
|
||||
classname 'org.elasticsearch.xpack.datascience.DataSciencePlugin'
|
||||
extendedPlugins = ['x-pack-core']
|
||||
}
|
||||
archivesBaseName = 'x-pack-data-science'
|
||||
|
||||
compileJava.options.compilerArgs << "-Xlint:-rawtypes"
|
||||
compileTestJava.options.compilerArgs << "-Xlint:-rawtypes"
|
||||
|
||||
|
||||
dependencies {
|
||||
compileOnly project(":server")
|
||||
|
||||
compileOnly project(path: xpackModule('core'), configuration: 'default')
|
||||
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
|
||||
if (isEclipse) {
|
||||
testCompile project(path: xpackModule('core-tests'), configuration: 'testArtifacts')
|
||||
}
|
||||
}
|
||||
|
||||
integTest.enabled = false
|
|
@ -0,0 +1,15 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience;
|
||||
|
||||
import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
|
||||
|
||||
public class DataScienceAggregationBuilders {
|
||||
|
||||
public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCaardinality(String name, String bucketsPath) {
|
||||
return new CumulativeCardinalityPipelineAggregationBuilder(name, bucketsPath);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.xpack.core.XPackFeatureSet;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class DataScienceFeatureSet implements XPackFeatureSet {
|
||||
|
||||
private final boolean enabled;
|
||||
private final XPackLicenseState licenseState;
|
||||
private Client client;
|
||||
|
||||
@Inject
|
||||
public DataScienceFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState, Client client) {
|
||||
this.enabled = XPackSettings.DATA_SCIENCE_ENABLED.get(settings);
|
||||
this.licenseState = licenseState;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return XPackField.DATA_SCIENCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean available() {
|
||||
return licenseState != null && licenseState.isDataScienceAllowed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean enabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> nativeCodeInfo() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
|
||||
if (enabled) {
|
||||
DataScienceStatsAction.Request request = new DataScienceStatsAction.Request();
|
||||
client.execute(DataScienceStatsAction.INSTANCE, request,
|
||||
ActionListener.wrap(r -> listener.onResponse(new DataScienceFeatureSetUsage(available(), enabled(), r)),
|
||||
listener::onFailure));
|
||||
|
||||
} else {
|
||||
listener.onResponse(new DataScienceFeatureSetUsage(available(), enabled(), null));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.plugins.SearchPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction;
|
||||
import org.elasticsearch.xpack.datascience.action.TransportDataScienceStatsAction;
|
||||
import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
|
||||
import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
public class DataSciencePlugin extends Plugin implements SearchPlugin, ActionPlugin {
|
||||
|
||||
// TODO this should probably become more structured once DataScience plugin has more than just one agg
|
||||
public static AtomicLong cumulativeCardUsage = new AtomicLong(0);
|
||||
private final boolean transportClientMode;
|
||||
|
||||
public DataSciencePlugin(Settings settings) {
|
||||
this.transportClientMode = XPackPlugin.transportClientMode(settings);
|
||||
}
|
||||
|
||||
public static XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
|
||||
|
||||
@Override
|
||||
public List<PipelineAggregationSpec> getPipelineAggregations() {
|
||||
return singletonList(new PipelineAggregationSpec(
|
||||
CumulativeCardinalityPipelineAggregationBuilder.NAME,
|
||||
CumulativeCardinalityPipelineAggregationBuilder::new,
|
||||
CumulativeCardinalityPipelineAggregator::new,
|
||||
CumulativeCardinalityPipelineAggregationBuilder::parse));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActionPlugin.ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
|
||||
return singletonList(
|
||||
new ActionHandler<>(DataScienceStatsAction.INSTANCE, TransportDataScienceStatsAction.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Module> createGuiceModules() {
|
||||
List<Module> modules = new ArrayList<>();
|
||||
|
||||
if (transportClientMode) {
|
||||
return modules;
|
||||
}
|
||||
|
||||
modules.add(b -> XPackPlugin.bindFeatureSet(b, DataScienceFeatureSet.class));
|
||||
return modules;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience.action;
|
||||
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.nodes.TransportNodesAction;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction;
|
||||
import org.elasticsearch.xpack.datascience.DataSciencePlugin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class TransportDataScienceStatsAction extends TransportNodesAction<DataScienceStatsAction.Request, DataScienceStatsAction.Response,
|
||||
DataScienceStatsAction.NodeRequest, DataScienceStatsAction.NodeResponse> {
|
||||
|
||||
|
||||
@Inject
|
||||
public TransportDataScienceStatsAction(TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters) {
|
||||
super(DataScienceStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
||||
DataScienceStatsAction.Request::new, DataScienceStatsAction.NodeRequest::new, ThreadPool.Names.MANAGEMENT,
|
||||
DataScienceStatsAction.NodeResponse.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DataScienceStatsAction.Response newResponse(DataScienceStatsAction.Request request,
|
||||
List<DataScienceStatsAction.NodeResponse> nodes,
|
||||
List<FailedNodeException> failures) {
|
||||
return new DataScienceStatsAction.Response(clusterService.getClusterName(), nodes, failures);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DataScienceStatsAction.NodeRequest newNodeRequest(DataScienceStatsAction.Request request) {
|
||||
return new DataScienceStatsAction.NodeRequest(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DataScienceStatsAction.NodeResponse newNodeResponse(StreamInput in) throws IOException {
|
||||
return new DataScienceStatsAction.NodeResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DataScienceStatsAction.NodeResponse nodeOperation(DataScienceStatsAction.NodeRequest request) {
|
||||
DataScienceStatsAction.NodeResponse statsResponse = new DataScienceStatsAction.NodeResponse(clusterService.localNode());
|
||||
statsResponse.setCumulativeCardinalityUsage(DataSciencePlugin.cumulativeCardUsage.get());
|
||||
return statsResponse;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience.cumulativecardinality;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.datascience.DataSciencePlugin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH;
|
||||
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
|
||||
|
||||
public class CumulativeCardinalityPipelineAggregationBuilder
|
||||
extends AbstractPipelineAggregationBuilder<CumulativeCardinalityPipelineAggregationBuilder> {
|
||||
public static final String NAME = "cumulative_cardinality";
|
||||
|
||||
private String format;
|
||||
|
||||
private static final Function<String, ConstructingObjectParser<CumulativeCardinalityPipelineAggregationBuilder, Void>> PARSER
|
||||
= name -> {
|
||||
ConstructingObjectParser<CumulativeCardinalityPipelineAggregationBuilder, Void> parser = new ConstructingObjectParser<>(
|
||||
CumulativeCardinalityPipelineAggregationBuilder.NAME,
|
||||
false,
|
||||
o -> new CumulativeCardinalityPipelineAggregationBuilder(name, (String) o[0]));
|
||||
|
||||
parser.declareString(ConstructingObjectParser.constructorArg(), BUCKETS_PATH_FIELD);
|
||||
parser.declareString(CumulativeCardinalityPipelineAggregationBuilder::format, FORMAT);
|
||||
return parser;
|
||||
};
|
||||
|
||||
public CumulativeCardinalityPipelineAggregationBuilder(String name, String bucketsPath) {
|
||||
super(name, NAME, new String[] { bucketsPath });
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
public CumulativeCardinalityPipelineAggregationBuilder(StreamInput in) throws IOException {
|
||||
super(in, NAME);
|
||||
format = in.readOptionalString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void doWriteTo(StreamOutput out) throws IOException {
|
||||
out.writeOptionalString(format);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the format to use on the output of this aggregation.
|
||||
*/
|
||||
public CumulativeCardinalityPipelineAggregationBuilder format(String format) {
|
||||
if (format == null) {
|
||||
throw new IllegalArgumentException("[format] must not be null: [" + name + "]");
|
||||
}
|
||||
this.format = format;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the format to use on the output of this aggregation.
|
||||
*/
|
||||
public String format() {
|
||||
return format;
|
||||
}
|
||||
|
||||
protected DocValueFormat formatter() {
|
||||
if (format != null) {
|
||||
return new DocValueFormat.Decimal(format);
|
||||
} else {
|
||||
return DocValueFormat.RAW;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PipelineAggregator createInternal(Map<String, Object> metaData) {
|
||||
return new CumulativeCardinalityPipelineAggregator(name, bucketsPaths, formatter(), metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> aggFactories,
|
||||
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
|
||||
if (bucketsPaths.length != 1) {
|
||||
throw new IllegalStateException(BUCKETS_PATH.getPreferredName()
|
||||
+ " must contain a single entry for aggregation [" + name + "]");
|
||||
}
|
||||
|
||||
validateSequentiallyOrderedParentAggs(parent, NAME, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (format != null) {
|
||||
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static CumulativeCardinalityPipelineAggregationBuilder parse(String aggName, XContentParser parser) {
|
||||
if (DataSciencePlugin.getLicenseState().isDataScienceAllowed() == false) {
|
||||
throw LicenseUtils.newComplianceException(XPackField.DATA_SCIENCE);
|
||||
}
|
||||
|
||||
// Increment usage here since it is a good boundary between internal and external, and should correlate 1:1 with
|
||||
// usage and not internal instantiations
|
||||
DataSciencePlugin.cumulativeCardUsage.incrementAndGet();
|
||||
return PARSER.apply(aggName).apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), format);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) return true;
|
||||
if (obj == null || getClass() != obj.getClass()) return false;
|
||||
if (super.equals(obj) == false) return false;
|
||||
CumulativeCardinalityPipelineAggregationBuilder other = (CumulativeCardinalityPipelineAggregationBuilder) obj;
|
||||
return Objects.equals(format, other.format);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience.cumulativecardinality;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.AggregationExecutionException;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
|
||||
import org.elasticsearch.search.aggregations.metrics.HyperLogLogPlusPlus;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalCardinality;
|
||||
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator {
|
||||
private final DocValueFormat formatter;
|
||||
|
||||
CumulativeCardinalityPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, Map<String, Object> metadata) {
|
||||
super(name, bucketsPaths, metadata);
|
||||
this.formatter = formatter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
public CumulativeCardinalityPipelineAggregator(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
formatter = in.readNamedWriteable(DocValueFormat.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWriteTo(StreamOutput out) throws IOException {
|
||||
out.writeNamedWriteable(formatter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return CumulativeCardinalityPipelineAggregationBuilder.NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
|
||||
InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
|
||||
histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
|
||||
InternalMultiBucketAggregation.InternalBucket>) aggregation;
|
||||
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
|
||||
HistogramFactory factory = (HistogramFactory) histo;
|
||||
List<Bucket> newBuckets = new ArrayList<>(buckets.size());
|
||||
HyperLogLogPlusPlus hll = null;
|
||||
|
||||
try {
|
||||
long cardinality = 0;
|
||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||
HyperLogLogPlusPlus bucketHll = resolveBucketValue(histo, bucket, bucketsPaths()[0]);
|
||||
if (hll == null && bucketHll != null) {
|
||||
// We have to create a new HLL because otherwise it will alter the
|
||||
// existing cardinality sketch and bucket value
|
||||
hll = new HyperLogLogPlusPlus(bucketHll.precision(), reduceContext.bigArrays(), 1);
|
||||
}
|
||||
if (bucketHll != null) {
|
||||
hll.merge(0, bucketHll, 0);
|
||||
cardinality = hll.cardinality(0);
|
||||
}
|
||||
|
||||
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
|
||||
.map((p) -> (InternalAggregation) p)
|
||||
.collect(Collectors.toList());
|
||||
aggs.add(new InternalSimpleLongValue(name(), cardinality, formatter, new ArrayList<>(), metaData()));
|
||||
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
|
||||
newBuckets.add(newBucket);
|
||||
}
|
||||
return factory.createAggregation(newBuckets);
|
||||
} finally {
|
||||
if (hll != null) {
|
||||
hll.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private HyperLogLogPlusPlus resolveBucketValue(MultiBucketsAggregation agg,
|
||||
InternalMultiBucketAggregation.InternalBucket bucket,
|
||||
String aggPath) {
|
||||
List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList();
|
||||
Object propertyValue = bucket.getProperty(agg.getName(), aggPathsList);
|
||||
if (propertyValue == null) {
|
||||
throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
|
||||
+ " must reference a cardinality aggregation");
|
||||
}
|
||||
|
||||
if (propertyValue instanceof InternalCardinality) {
|
||||
return ((InternalCardinality) propertyValue).getCounts();
|
||||
}
|
||||
|
||||
String currentAggName;
|
||||
if (aggPathsList.isEmpty()) {
|
||||
currentAggName = agg.getName();
|
||||
} else {
|
||||
currentAggName = aggPathsList.get(0);
|
||||
}
|
||||
|
||||
throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
|
||||
+ " must reference a cardinality aggregation, got: ["
|
||||
+ propertyValue.getClass().getSimpleName() + "] at aggregation [" + currentAggName + "]");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience.cumulativecardinality;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class InternalSimpleLongValue extends InternalNumericMetricsAggregation.SingleValue implements SimpleValue {
|
||||
public static final String NAME = "simple_long_value";
|
||||
protected final long value;
|
||||
|
||||
public InternalSimpleLongValue(String name, long value, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators,
|
||||
Map<String, Object> metaData) {
|
||||
super(name, pipelineAggregators, metaData);
|
||||
this.format = formatter;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
public InternalSimpleLongValue(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
format = in.readNamedWriteable(DocValueFormat.class);
|
||||
value = in.readZLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
out.writeNamedWriteable(format);
|
||||
out.writeZLong(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public long getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
DocValueFormat formatter() {
|
||||
return format;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalSimpleLongValue doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
||||
throw new UnsupportedOperationException("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value));
|
||||
builder.field(CommonFields.VALUE.getPreferredName(), hasValue ? value : null);
|
||||
if (hasValue && format != DocValueFormat.RAW) {
|
||||
builder.field(CommonFields.VALUE_AS_STRING.getPreferredName(), format.format(value).toString());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) return true;
|
||||
if (obj == null || getClass() != obj.getClass()) return false;
|
||||
if (super.equals(obj) == false) return false;
|
||||
InternalSimpleLongValue other = (InternalSimpleLongValue) obj;
|
||||
return Objects.equals(value, other.value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test implementation for AggregatorFactory.
|
||||
*/
|
||||
public class StubAggregatorFactory extends AggregatorFactory {
|
||||
|
||||
private final Aggregator aggregator;
|
||||
|
||||
private StubAggregatorFactory(SearchContext context, Aggregator aggregator) throws IOException {
|
||||
super("_name", context, null, new AggregatorFactories.Builder(), Collections.emptyMap());
|
||||
this.aggregator = aggregator;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List list, Map metaData) throws IOException {
|
||||
return aggregator;
|
||||
}
|
||||
|
||||
public static StubAggregatorFactory createInstance() throws IOException {
|
||||
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
|
||||
SearchContext searchContext = mock(SearchContext.class);
|
||||
when(searchContext.bigArrays()).thenReturn(bigArrays);
|
||||
|
||||
Aggregator aggregator = mock(Aggregator.class);
|
||||
|
||||
return new StubAggregatorFactory(searchContext, aggregator);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience.action;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TransportDataScienceStatsActionTests extends ESTestCase {
|
||||
|
||||
private TransportDataScienceStatsAction action;
|
||||
|
||||
@Before
|
||||
public void setupTransportAction() {
|
||||
TransportService transportService = mock(TransportService.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
DiscoveryNode discoveryNode = new DiscoveryNode("nodeId", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
when(clusterService.localNode()).thenReturn(discoveryNode);
|
||||
|
||||
ClusterName clusterName = new ClusterName("cluster_name");
|
||||
when(clusterService.getClusterName()).thenReturn(clusterName);
|
||||
|
||||
ClusterState clusterState = mock(ClusterState.class);
|
||||
when(clusterState.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
|
||||
when(clusterService.state()).thenReturn(clusterState);
|
||||
|
||||
|
||||
action = new TransportDataScienceStatsAction(transportService, clusterService, threadPool, new
|
||||
ActionFilters(Collections.emptySet()));
|
||||
}
|
||||
|
||||
public void testCumulativeCardStats() throws Exception {
|
||||
DataScienceStatsAction.Request request = new DataScienceStatsAction.Request();
|
||||
DataScienceStatsAction.NodeResponse nodeResponse1 = action.nodeOperation(new DataScienceStatsAction.NodeRequest(request));
|
||||
DataScienceStatsAction.NodeResponse nodeResponse2 = action.nodeOperation(new DataScienceStatsAction.NodeRequest(request));
|
||||
|
||||
DataScienceStatsAction.Response response = action.newResponse(request,
|
||||
Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList());
|
||||
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
builder.startObject();
|
||||
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
|
||||
ObjectPath objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
|
||||
assertThat(objectPath.evaluate("stats.0.cumulative_cardinality_usage"), equalTo(0));
|
||||
assertThat(objectPath.evaluate("stats.1.cumulative_cardinality_usage"), equalTo(0));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,255 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.datascience.cumulativecardinality;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.document.SortedNumericDocValuesField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.Rounding;
|
||||
import org.elasticsearch.common.time.DateFormatters;
|
||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationExecutionException;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalOrder;
|
||||
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.xpack.datascience.StubAggregatorFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class CumulativeCardinalityAggregatorTests extends AggregatorTestCase {
|
||||
|
||||
private static final String HISTO_FIELD = "histo";
|
||||
private static final String VALUE_FIELD = "value_field";
|
||||
|
||||
private static final List<String> datasetTimes = Arrays.asList(
|
||||
"2017-01-01T01:07:45", //1
|
||||
"2017-01-01T03:43:34", //1
|
||||
"2017-01-03T04:11:00", //3
|
||||
"2017-01-03T05:11:31", //1
|
||||
"2017-01-05T08:24:05", //5
|
||||
"2017-01-05T13:09:32", //1
|
||||
"2017-01-07T13:47:43", //7
|
||||
"2017-01-08T16:14:34", //1
|
||||
"2017-01-09T17:09:50", //9
|
||||
"2017-01-09T22:55:46");//10
|
||||
|
||||
private static final List<Integer> datasetValues = Arrays.asList(1,1,3,1,5,1,7,1,9,10);
|
||||
private static final List<Double> cumulativeCardinality = Arrays.asList(1.0,1.0,2.0,2.0,3.0,3.0,4.0,4.0,6.0);
|
||||
|
||||
public void testSimple() throws IOException {
|
||||
|
||||
Query query = new MatchAllDocsQuery();
|
||||
|
||||
DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
|
||||
aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD);
|
||||
aggBuilder.subAggregation(new CardinalityAggregationBuilder("the_cardinality", ValueType.NUMERIC).field(VALUE_FIELD));
|
||||
aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_cardinality"));
|
||||
|
||||
executeTestCase(query, aggBuilder, histogram -> {
|
||||
assertEquals(9, ((Histogram)histogram).getBuckets().size());
|
||||
List<? extends Histogram.Bucket> buckets = ((Histogram)histogram).getBuckets();
|
||||
int counter = 0;
|
||||
for (Histogram.Bucket bucket : buckets) {
|
||||
assertThat(((InternalSimpleLongValue) (bucket.getAggregations().get("cumulative_card"))).value(),
|
||||
equalTo(cumulativeCardinality.get(counter)));
|
||||
counter += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testAllNull() throws IOException {
|
||||
Query query = new MatchAllDocsQuery();
|
||||
|
||||
DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
|
||||
aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD);
|
||||
aggBuilder.subAggregation(new CardinalityAggregationBuilder("the_cardinality", ValueType.NUMERIC).field("foo"));
|
||||
aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_cardinality"));
|
||||
|
||||
executeTestCase(query, aggBuilder, histogram -> {
|
||||
assertEquals(9, ((Histogram)histogram).getBuckets().size());
|
||||
List<? extends Histogram.Bucket> buckets = ((Histogram)histogram).getBuckets();
|
||||
for (Histogram.Bucket bucket : buckets) {
|
||||
assertThat(((InternalSimpleLongValue) (bucket.getAggregations().get("cumulative_card"))).value(), equalTo(0.0));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testParentValidations() throws IOException {
|
||||
ValuesSourceConfig<ValuesSource.Numeric> numericVS = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC);
|
||||
|
||||
// Histogram
|
||||
Set<PipelineAggregationBuilder> aggBuilders = new HashSet<>();
|
||||
aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
|
||||
AggregatorFactory parent = new HistogramAggregatorFactory("name", numericVS, 0.0d, 0.0d,
|
||||
mock(InternalOrder.class), false, 0L, 0.0d, 1.0d, mock(SearchContext.class), null,
|
||||
new AggregatorFactories.Builder(), Collections.emptyMap());
|
||||
CumulativeCardinalityPipelineAggregationBuilder builder
|
||||
= new CumulativeCardinalityPipelineAggregationBuilder("name", "valid");
|
||||
builder.validate(parent, Collections.emptySet(), aggBuilders);
|
||||
|
||||
// Date Histogram
|
||||
aggBuilders.clear();
|
||||
aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
|
||||
parent = new DateHistogramAggregatorFactory("name", numericVS, 0L,
|
||||
mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class),
|
||||
mock(ExtendedBounds.class), mock(SearchContext.class), mock(AggregatorFactory.class),
|
||||
new AggregatorFactories.Builder(), Collections.emptyMap());
|
||||
builder = new CumulativeCardinalityPipelineAggregationBuilder("name", "valid");
|
||||
builder.validate(parent, Collections.emptySet(), aggBuilders);
|
||||
|
||||
// Auto Date Histogram
|
||||
aggBuilders.clear();
|
||||
aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
|
||||
AutoDateHistogramAggregationBuilder.RoundingInfo[] roundings = new AutoDateHistogramAggregationBuilder.RoundingInfo[1];
|
||||
parent = new AutoDateHistogramAggregatorFactory("name", numericVS,
|
||||
1, roundings,
|
||||
mock(SearchContext.class), null, new AggregatorFactories.Builder(), Collections.emptyMap());
|
||||
builder = new CumulativeCardinalityPipelineAggregationBuilder("name", "valid");
|
||||
builder.validate(parent, Collections.emptySet(), aggBuilders);
|
||||
|
||||
// Mocked "test" agg, should fail validation
|
||||
aggBuilders.clear();
|
||||
aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
|
||||
StubAggregatorFactory parentFactory = StubAggregatorFactory.createInstance();
|
||||
|
||||
CumulativeCardinalityPipelineAggregationBuilder failBuilder
|
||||
= new CumulativeCardinalityPipelineAggregationBuilder("name", "invalid_agg>metric");
|
||||
IllegalStateException ex = expectThrows(IllegalStateException.class,
|
||||
() -> failBuilder.validate(parentFactory, Collections.emptySet(), aggBuilders));
|
||||
assertEquals("cumulative_cardinality aggregation [name] must have a histogram, date_histogram or auto_date_histogram as parent",
|
||||
ex.getMessage());
|
||||
}
|
||||
|
||||
public void testNonCardinalityAgg() {
|
||||
Query query = new MatchAllDocsQuery();
|
||||
|
||||
DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
|
||||
aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD);
|
||||
aggBuilder.subAggregation(new SumAggregationBuilder("the_sum").field("foo"));
|
||||
aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_sum"));
|
||||
|
||||
AggregationExecutionException e = expectThrows(AggregationExecutionException.class,
|
||||
() -> executeTestCase(query, aggBuilder, histogram -> fail("Test should not have executed")));
|
||||
assertThat(e.getMessage(), equalTo("buckets_path must reference a cardinality aggregation, " +
|
||||
"got: [InternalSum] at aggregation [the_sum]"));
|
||||
}
|
||||
|
||||
private void executeTestCase(Query query, AggregationBuilder aggBuilder, Consumer<InternalAggregation> verify) throws IOException {
|
||||
executeTestCase(query, aggBuilder, verify, indexWriter -> {
|
||||
Document document = new Document();
|
||||
int counter = 0;
|
||||
for (String date : datasetTimes) {
|
||||
if (frequently()) {
|
||||
indexWriter.commit();
|
||||
}
|
||||
|
||||
long instant = asLong(date);
|
||||
document.add(new SortedNumericDocValuesField(HISTO_FIELD, instant));
|
||||
document.add(new NumericDocValuesField(VALUE_FIELD, datasetValues.get(counter)));
|
||||
indexWriter.addDocument(document);
|
||||
document.clear();
|
||||
counter += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void executeTestCase(Query query, AggregationBuilder aggBuilder, Consumer<InternalAggregation> verify,
|
||||
CheckedConsumer<RandomIndexWriter, IOException> setup) throws IOException {
|
||||
|
||||
|
||||
try (Directory directory = newDirectory()) {
|
||||
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
|
||||
setup.accept(indexWriter);
|
||||
}
|
||||
|
||||
try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
||||
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
|
||||
|
||||
DateFieldMapper.Builder builder = new DateFieldMapper.Builder("_name");
|
||||
DateFieldMapper.DateFieldType fieldType = builder.fieldType();
|
||||
fieldType.setHasDocValues(true);
|
||||
fieldType.setName(HISTO_FIELD);
|
||||
|
||||
MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
|
||||
valueFieldType.setHasDocValues(true);
|
||||
valueFieldType.setName("value_field");
|
||||
|
||||
InternalAggregation histogram;
|
||||
histogram = searchAndReduce(indexSearcher, query, aggBuilder, fieldType, valueFieldType);
|
||||
verify.accept(histogram);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static long asLong(String dateTime) {
|
||||
return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli();
|
||||
}
|
||||
|
||||
|
||||
private static AggregatorFactory getRandomSequentiallyOrderedParentAgg() throws IOException {
|
||||
AggregatorFactory factory;
|
||||
ValuesSourceConfig<ValuesSource.Numeric> numericVS = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC);
|
||||
switch (randomIntBetween(0, 2)) {
|
||||
case 0:
|
||||
factory = new HistogramAggregatorFactory("name", numericVS, 0.0d, 0.0d,
|
||||
mock(InternalOrder.class), false, 0L, 0.0d, 1.0d, mock(SearchContext.class), null,
|
||||
new AggregatorFactories.Builder(), Collections.emptyMap());
|
||||
break;
|
||||
case 1:
|
||||
factory = new DateHistogramAggregatorFactory("name", numericVS, 0L,
|
||||
mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class),
|
||||
mock(ExtendedBounds.class), mock(SearchContext.class), mock(AggregatorFactory.class),
|
||||
new AggregatorFactories.Builder(), Collections.emptyMap());
|
||||
break;
|
||||
case 2:
|
||||
default:
|
||||
AutoDateHistogramAggregationBuilder.RoundingInfo[] roundings = new AutoDateHistogramAggregationBuilder.RoundingInfo[1];
|
||||
factory = new AutoDateHistogramAggregatorFactory("name", numericVS,
|
||||
1, roundings,
|
||||
mock(SearchContext.class), null, new AggregatorFactories.Builder(), Collections.emptyMap());
|
||||
}
|
||||
|
||||
return factory;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
setup:
|
||||
- skip:
|
||||
features: headers
|
||||
- do:
|
||||
indices.create:
|
||||
index: foo
|
||||
body:
|
||||
mappings:
|
||||
properties:
|
||||
timestamp:
|
||||
type: date
|
||||
user:
|
||||
type: keyword
|
||||
|
||||
|
||||
- do:
|
||||
headers:
|
||||
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
|
||||
bulk:
|
||||
refresh: true
|
||||
body:
|
||||
- index:
|
||||
_index: "foo"
|
||||
- timestamp: "2017-01-01T05:00:00Z"
|
||||
user: "a"
|
||||
|
||||
- index:
|
||||
_index: "foo"
|
||||
- timestamp: "2017-01-01T05:00:00Z"
|
||||
user: "b"
|
||||
|
||||
- index:
|
||||
_index: "foo"
|
||||
- timestamp: "2017-01-01T05:00:00Z"
|
||||
user: "c"
|
||||
|
||||
- index:
|
||||
_index: "foo"
|
||||
- timestamp: "2017-01-02T05:00:00Z"
|
||||
user: "a"
|
||||
|
||||
- index:
|
||||
_index: "foo"
|
||||
- timestamp: "2017-01-02T05:00:00Z"
|
||||
user: "b"
|
||||
|
||||
- index:
|
||||
_index: "foo"
|
||||
- timestamp: "2017-01-03T05:00:00Z"
|
||||
user: "d"
|
||||
|
||||
---
|
||||
"Basic Search":
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: "foo"
|
||||
body:
|
||||
size: 0
|
||||
aggs:
|
||||
histo:
|
||||
date_histogram:
|
||||
field: "timestamp"
|
||||
calendar_interval: "day"
|
||||
aggs:
|
||||
distinct_users:
|
||||
cardinality:
|
||||
field: "user"
|
||||
total_users:
|
||||
cumulative_cardinality:
|
||||
buckets_path: "distinct_users"
|
||||
|
||||
- length: { aggregations.histo.buckets: 3 }
|
||||
- match: { aggregations.histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" }
|
||||
- match: { aggregations.histo.buckets.0.doc_count: 3 }
|
||||
- match: { aggregations.histo.buckets.0.distinct_users.value: 3 }
|
||||
- match: { aggregations.histo.buckets.0.total_users.value: 3 }
|
||||
- match: { aggregations.histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" }
|
||||
- match: { aggregations.histo.buckets.1.doc_count: 2 }
|
||||
- match: { aggregations.histo.buckets.1.distinct_users.value: 2 }
|
||||
- match: { aggregations.histo.buckets.1.total_users.value: 3 }
|
||||
- match: { aggregations.histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" }
|
||||
- match: { aggregations.histo.buckets.2.doc_count: 1 }
|
||||
- match: { aggregations.histo.buckets.2.distinct_users.value: 1 }
|
||||
- match: { aggregations.histo.buckets.2.total_users.value: 4 }
|
||||
|
|
@ -25,3 +25,4 @@
|
|||
- contains: { nodes.$master.modules: { name: x-pack-security } }
|
||||
- contains: { nodes.$master.modules: { name: x-pack-sql } }
|
||||
- contains: { nodes.$master.modules: { name: x-pack-watcher } }
|
||||
- contains: { nodes.$master.modules: { name: x-pack-data-science } }
|
||||
|
|
|
@ -28,6 +28,8 @@
|
|||
- is_true: features.monitoring
|
||||
- is_true: features.monitoring.enabled
|
||||
# - is_false: features.monitoring.available TODO fix once licensing is fixed
|
||||
- is_true: features.data_science
|
||||
- is_true: features.data_science.enabled
|
||||
|
||||
- do:
|
||||
license.post:
|
||||
|
@ -77,6 +79,8 @@
|
|||
- is_true: features.monitoring
|
||||
- is_true: features.monitoring.enabled
|
||||
- is_true: features.monitoring.available
|
||||
- is_true: features.data_science.enabled
|
||||
- is_true: features.data_science.available
|
||||
- is_true: tagline
|
||||
|
||||
- do:
|
||||
|
@ -89,6 +93,7 @@
|
|||
- is_true: graph.available
|
||||
- is_true: monitoring.enabled
|
||||
- is_true: monitoring.available
|
||||
- is_true: data_science.available
|
||||
|
||||
- do:
|
||||
xpack.info:
|
||||
|
|
Loading…
Reference in New Issue