[7.x] Add telemetery for data streams (#59433) (#59454)

This commit adds data stream info to the `/_xpack` and `/_xpack/usage` APIs. Currently the usage is
pretty minimal, returning only the number of data streams and the number of indices currently
abstracted by a data stream:

```
  ...
  "data_streams" : {
    "available" : true,
    "enabled" : true,
    "data_streams" : 3,
    "indices_count" : 17
  }
  ...
```
This commit is contained in:
Lee Hinman 2020-07-13 14:30:11 -06:00 committed by GitHub
parent aa260636e5
commit bf1a60130d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 370 additions and 6 deletions

View File

@ -150,6 +150,10 @@ Example response:
"watcher" : {
"available" : true,
"enabled" : true
},
"data_streams" : {
"available" : true,
"enabled" : true,
}
},
"tagline" : "You know, for X"

View File

@ -280,6 +280,12 @@ GET /_xpack/usage
"available" : true,
"enabled" : true,
...
},
"data_streams" : {
"available" : true,
"enabled" : true,
"data_streams" : 0,
"indices_count" : 0
}
}
------------------------------------------------------------

View File

@ -41,8 +41,10 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackUsageAction;
import org.elasticsearch.xpack.core.analytics.AnalyticsFeatureSetUsage;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage;
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
import org.elasticsearch.xpack.core.enrich.EnrichFeatureSet;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
@ -138,8 +140,8 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
@ -189,7 +191,6 @@ import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotFeatureSetUsage;
@ -635,7 +636,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ENRICH, EnrichFeatureSet.Usage::new),
// Searchable snapshots
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SEARCHABLE_SNAPSHOTS,
SearchableSnapshotFeatureSetUsage::new)
SearchableSnapshotFeatureSetUsage::new),
// Data Streams
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_STREAMS, DataStreamFeatureSetUsage::new)
).stream(),
MlEvaluationNamedXContentProvider.getNamedWriteables().stream()
).collect(toList());

View File

@ -59,6 +59,8 @@ public final class XPackField {
public static final String CONSTANT_KEYWORD = "constant_keyword";
/** Name constant for the searchable snapshots feature. */
public static final String SEARCHABLE_SNAPSHOTS = "searchable_snapshots";
/** Name constant for the data streams feature. */
public static final String DATA_STREAMS = "data_streams";
private XPackField() {}

View File

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.datastreams;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import java.io.IOException;
import java.util.Objects;
public class DataStreamFeatureSetUsage extends XPackFeatureSet.Usage {
private final DataStreamStats streamStats;
public DataStreamFeatureSetUsage(StreamInput input) throws IOException {
super(input);
this.streamStats = new DataStreamStats(input);
}
public DataStreamFeatureSetUsage(DataStreamStats stats) {
super(XPackField.DATA_STREAMS, true, true);
this.streamStats = stats;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
streamStats.writeTo(out);
}
@Override
public Version getMinimalSupportedVersion() {
return Version.V_7_9_0;
}
@Override
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
super.innerXContent(builder, params);
builder.field("data_streams", streamStats.totalDataStreamCount);
builder.field("indices_count", streamStats.indicesBehindDataStream);
}
@Override
public String toString() {
return Strings.toString(this);
}
@Override
public int hashCode() {
return streamStats.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
DataStreamFeatureSetUsage other = (DataStreamFeatureSetUsage) obj;
return Objects.equals(streamStats, other.streamStats);
}
public static class DataStreamStats implements Writeable {
private final long totalDataStreamCount;
private final long indicesBehindDataStream;
public DataStreamStats(long totalDataStreamCount, long indicesBehindDataStream) {
this.totalDataStreamCount = totalDataStreamCount;
this.indicesBehindDataStream = indicesBehindDataStream;
}
public DataStreamStats(StreamInput in) throws IOException {
this.totalDataStreamCount = in.readVLong();
this.indicesBehindDataStream = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(this.totalDataStreamCount);
out.writeVLong(this.indicesBehindDataStream);
}
@Override
public int hashCode() {
return Objects.hash(totalDataStreamCount, indicesBehindDataStream);
}
@Override
public boolean equals(Object obj) {
if (obj.getClass() != getClass()) {
return false;
}
DataStreamStats other = (DataStreamStats) obj;
return totalDataStreamCount == other.totalDataStreamCount &&
indicesBehindDataStream == other.indicesBehindDataStream;
}
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.searchablesnapshots;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage;
import java.io.IOException;
public class DataStreamFeatureSetUsageTests extends AbstractWireSerializingTestCase<DataStreamFeatureSetUsage> {
@Override
protected DataStreamFeatureSetUsage createTestInstance() {
return new DataStreamFeatureSetUsage(new DataStreamFeatureSetUsage.DataStreamStats(randomNonNegativeLong(),
randomNonNegativeLong()));
}
@Override
protected DataStreamFeatureSetUsage mutateInstance(DataStreamFeatureSetUsage instance) throws IOException {
return randomValueOtherThan(instance, this::createTestInstance);
}
@Override
protected Writeable.Reader<DataStreamFeatureSetUsage> instanceReader() {
return DataStreamFeatureSetUsage::new;
}
}

View File

@ -0,0 +1,30 @@
import org.elasticsearch.gradle.info.BuildParams
apply plugin: 'elasticsearch.testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
dependencies {
testImplementation project(path: xpackProject('plugin').path, configuration: 'testArtifacts')
}
File repoDir = file("$buildDir/testclusters/repo")
integTest.runner {
/* To support taking index snapshots, we have to set path.repo setting */
systemProperty 'tests.path.repo', repoDir
}
testClusters.integTest {
testDistribution = 'DEFAULT'
if (BuildParams.isSnapshotBuild() == false) {
systemProperty 'es.searchable_snapshots_feature_enabled', 'true'
}
numberOfNodes = 4
setting 'path.repo', repoDir.absolutePath
setting 'xpack.security.enabled', 'false'
setting 'xpack.watcher.enabled', 'false'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.xpack.datastreams;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.util.Map;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
public class DataStreamRestIT extends ESRestTestCase {
@SuppressWarnings("unchecked")
public void testDSXpackInfo() {
Map<String, Object> features = (Map<String, Object>) getLocation("/_xpack").get("features");
assertNotNull(features);
Map<String, Object> dataStreams = (Map<String, Object>) features.get("data_streams");
assertNotNull(dataStreams);
assertTrue((boolean) dataStreams.get("available"));
assertTrue((boolean) dataStreams.get("enabled"));
}
@SuppressWarnings("unchecked")
public void testDSXpackUsage() throws Exception {
Map<String, Object> dataStreams = (Map<String, Object>) getLocation("/_xpack/usage").get("data_streams");
assertNotNull(dataStreams);
assertTrue((boolean) dataStreams.get("available"));
assertTrue((boolean) dataStreams.get("enabled"));
assertThat(dataStreams.get("data_streams"), anyOf(equalTo(null), equalTo(0)));
// Create a data stream
Request indexRequest = new Request("POST", "/logs-mysql-default/_doc");
indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
client().performRequest(indexRequest);
// Roll over the data stream
Request rollover = new Request("POST", "/logs-mysql-default/_rollover");
client().performRequest(rollover);
dataStreams = (Map<String, Object>) getLocation("/_xpack/usage").get("data_streams");
assertNotNull(dataStreams);
assertTrue((boolean) dataStreams.get("available"));
assertTrue((boolean) dataStreams.get("enabled"));
assertThat("got: " + dataStreams, dataStreams.get("data_streams"), equalTo(1));
assertThat("got: " + dataStreams, dataStreams.get("indices_count"), equalTo(2));
}
public Map<String, Object> getLocation(String path) {
try {
Response executeRepsonse = client().performRequest(new Request("GET", path));
try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
EntityUtils.toByteArray(executeRepsonse.getEntity())
)
) {
return parser.map();
}
} catch (Exception e) {
fail("failed to execute GET request to " + path + " - got: " + e);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.datastreams;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage;
import java.util.Map;
public class DataStreamFeatureSet implements XPackFeatureSet {
private ClusterService clusterService;
@Inject
public DataStreamFeatureSet(ClusterService clusterService) {
this.clusterService = clusterService;
}
@Override
public String name() {
return XPackField.DATA_STREAMS;
}
@Override
public boolean available() {
return true;
}
@Override
public boolean enabled() {
return true;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public void usage(ActionListener<Usage> listener) {
final ClusterState state = clusterService.state();
final Map<String, DataStream> dataStreams = state.metadata().dataStreams();
final DataStreamFeatureSetUsage.DataStreamStats stats = new DataStreamFeatureSetUsage.DataStreamStats(
dataStreams.size(),
dataStreams.values().stream().map(ds -> ds.getIndices().size()).reduce(Integer::sum).orElse(0)
);
final DataStreamFeatureSetUsage usage = new DataStreamFeatureSetUsage(stats);
listener.onResponse(usage);
}
}

View File

@ -6,17 +6,30 @@
package org.elasticsearch.xpack.datastreams;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.xpack.datastreams.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.datastreams.mapper.DataStreamTimestampFieldMapper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.ActionModule.DATASTREAMS_FEATURE_ENABLED;
public class DataStreamsPlugin extends Plugin implements MapperPlugin {
public class DataStreamsPlugin extends Plugin implements ActionPlugin, MapperPlugin {
private final boolean transportClientMode;
public DataStreamsPlugin(Settings settings) {
this.transportClientMode = XPackPlugin.transportClientMode(settings);
}
@Override
public Map<String, MetadataFieldMapper.TypeParser> getMetadataMappers() {
@ -26,4 +39,16 @@ public class DataStreamsPlugin extends Plugin implements MapperPlugin {
return Collections.emptyMap();
}
}
public Collection<Module> createGuiceModules() {
List<Module> modules = new ArrayList<>();
if (transportClientMode) {
return modules;
}
modules.add(b -> XPackPlugin.bindFeatureSet(b, DataStreamFeatureSet.class));
return modules;
}
}

View File

@ -72,7 +72,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
)
.putMapping("_doc", mapping)
.build();
IndicesModule indicesModule = new IndicesModule(List.of(new DataStreamsPlugin()));
IndicesModule indicesModule = new IndicesModule(List.of(new DataStreamsPlugin(Settings.EMPTY)));
MapperService mapperService = MapperTestUtils.newMapperService(
xContentRegistry(),
createTempDir(),