diff --git a/docs/reference/rest-api/info.asciidoc b/docs/reference/rest-api/info.asciidoc index fa01fd4067d..415bf1ec3c6 100644 --- a/docs/reference/rest-api/info.asciidoc +++ b/docs/reference/rest-api/info.asciidoc @@ -150,6 +150,10 @@ Example response: "watcher" : { "available" : true, "enabled" : true + }, + "data_streams" : { + "available" : true, + "enabled" : true, } }, "tagline" : "You know, for X" diff --git a/docs/reference/rest-api/usage.asciidoc b/docs/reference/rest-api/usage.asciidoc index 41f81402d4c..fc224097a52 100644 --- a/docs/reference/rest-api/usage.asciidoc +++ b/docs/reference/rest-api/usage.asciidoc @@ -280,6 +280,12 @@ GET /_xpack/usage "available" : true, "enabled" : true, ... + }, + "data_streams" : { + "available" : true, + "enabled" : true, + "data_streams" : 0, + "indices_count" : 0 } } ------------------------------------------------------------ diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 702bfcc178b..e6bfd1d5b1e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -41,8 +41,10 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackUsageAction; import org.elasticsearch.xpack.core.analytics.AnalyticsFeatureSetUsage; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.CCRFeatureSet; +import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage; import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction; import org.elasticsearch.xpack.core.enrich.EnrichFeatureSet; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; @@ -138,8 +140,8 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; -import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction; @@ -189,7 +191,6 @@ import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; -import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotFeatureSetUsage; @@ -635,7 +636,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ENRICH, EnrichFeatureSet.Usage::new), // Searchable snapshots new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SEARCHABLE_SNAPSHOTS, - SearchableSnapshotFeatureSetUsage::new) + SearchableSnapshotFeatureSetUsage::new), + // Data Streams + new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_STREAMS, DataStreamFeatureSetUsage::new) ).stream(), MlEvaluationNamedXContentProvider.getNamedWriteables().stream() ).collect(toList()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java index 88f85a44517..ab5a9f4d16b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java @@ -59,6 +59,8 @@ public final class XPackField { public static final String CONSTANT_KEYWORD = "constant_keyword"; /** Name constant for the searchable snapshots feature. */ public static final String SEARCHABLE_SNAPSHOTS = "searchable_snapshots"; + /** Name constant for the data streams feature. */ + public static final String DATA_STREAMS = "data_streams"; private XPackField() {} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamFeatureSetUsage.java new file mode 100644 index 00000000000..540a8e3a216 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamFeatureSetUsage.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.datastreams; + +import org.elasticsearch.Version; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.XPackFeatureSet; +import org.elasticsearch.xpack.core.XPackField; + +import java.io.IOException; +import java.util.Objects; + +public class DataStreamFeatureSetUsage extends XPackFeatureSet.Usage { + private final DataStreamStats streamStats; + + public DataStreamFeatureSetUsage(StreamInput input) throws IOException { + super(input); + this.streamStats = new DataStreamStats(input); + } + + public DataStreamFeatureSetUsage(DataStreamStats stats) { + super(XPackField.DATA_STREAMS, true, true); + this.streamStats = stats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + streamStats.writeTo(out); + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_7_9_0; + } + + @Override + protected void innerXContent(XContentBuilder builder, Params params) throws IOException { + super.innerXContent(builder, params); + builder.field("data_streams", streamStats.totalDataStreamCount); + builder.field("indices_count", streamStats.indicesBehindDataStream); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public int hashCode() { + return streamStats.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + DataStreamFeatureSetUsage other = (DataStreamFeatureSetUsage) obj; + return Objects.equals(streamStats, other.streamStats); + } + + public static class DataStreamStats implements Writeable { + + private final long totalDataStreamCount; + private final long indicesBehindDataStream; + + public DataStreamStats(long totalDataStreamCount, long indicesBehindDataStream) { + this.totalDataStreamCount = totalDataStreamCount; + this.indicesBehindDataStream = indicesBehindDataStream; + } + + public DataStreamStats(StreamInput in) throws IOException { + this.totalDataStreamCount = in.readVLong(); + this.indicesBehindDataStream = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(this.totalDataStreamCount); + out.writeVLong(this.indicesBehindDataStream); + } + + @Override + public int hashCode() { + return Objects.hash(totalDataStreamCount, indicesBehindDataStream); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() != getClass()) { + return false; + } + DataStreamStats other = (DataStreamStats) obj; + return totalDataStreamCount == other.totalDataStreamCount && + indicesBehindDataStream == other.indicesBehindDataStream; + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/DataStreamFeatureSetUsageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/DataStreamFeatureSetUsageTests.java new file mode 100644 index 00000000000..d21b8dd0c7a --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/DataStreamFeatureSetUsageTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.searchablesnapshots; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage; + +import java.io.IOException; + +public class DataStreamFeatureSetUsageTests extends AbstractWireSerializingTestCase { + + @Override + protected DataStreamFeatureSetUsage createTestInstance() { + return new DataStreamFeatureSetUsage(new DataStreamFeatureSetUsage.DataStreamStats(randomNonNegativeLong(), + randomNonNegativeLong())); + } + + @Override + protected DataStreamFeatureSetUsage mutateInstance(DataStreamFeatureSetUsage instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected Writeable.Reader instanceReader() { + return DataStreamFeatureSetUsage::new; + } + +} diff --git a/x-pack/plugin/data-streams/qa/multi-node/build.gradle b/x-pack/plugin/data-streams/qa/multi-node/build.gradle new file mode 100644 index 00000000000..bf386e76c30 --- /dev/null +++ b/x-pack/plugin/data-streams/qa/multi-node/build.gradle @@ -0,0 +1,30 @@ +import org.elasticsearch.gradle.info.BuildParams + +apply plugin: 'elasticsearch.testclusters' +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' + +dependencies { + testImplementation project(path: xpackProject('plugin').path, configuration: 'testArtifacts') +} + +File repoDir = file("$buildDir/testclusters/repo") + +integTest.runner { + /* To support taking index snapshots, we have to set path.repo setting */ + systemProperty 'tests.path.repo', repoDir +} + +testClusters.integTest { + testDistribution = 'DEFAULT' + if (BuildParams.isSnapshotBuild() == false) { + systemProperty 'es.searchable_snapshots_feature_enabled', 'true' + } + numberOfNodes = 4 + + setting 'path.repo', repoDir.absolutePath + setting 'xpack.security.enabled', 'false' + setting 'xpack.watcher.enabled', 'false' + setting 'xpack.ml.enabled', 'false' + setting 'xpack.license.self_generated.type', 'trial' +} diff --git a/x-pack/plugin/data-streams/qa/multi-node/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamRestIT.java b/x-pack/plugin/data-streams/qa/multi-node/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamRestIT.java new file mode 100644 index 00000000000..81214bb13b3 --- /dev/null +++ b/x-pack/plugin/data-streams/qa/multi-node/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamRestIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.xpack.datastreams; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.util.Map; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +public class DataStreamRestIT extends ESRestTestCase { + + @SuppressWarnings("unchecked") + public void testDSXpackInfo() { + Map features = (Map) getLocation("/_xpack").get("features"); + assertNotNull(features); + Map dataStreams = (Map) features.get("data_streams"); + assertNotNull(dataStreams); + assertTrue((boolean) dataStreams.get("available")); + assertTrue((boolean) dataStreams.get("enabled")); + } + + @SuppressWarnings("unchecked") + public void testDSXpackUsage() throws Exception { + Map dataStreams = (Map) getLocation("/_xpack/usage").get("data_streams"); + assertNotNull(dataStreams); + assertTrue((boolean) dataStreams.get("available")); + assertTrue((boolean) dataStreams.get("enabled")); + assertThat(dataStreams.get("data_streams"), anyOf(equalTo(null), equalTo(0))); + + // Create a data stream + Request indexRequest = new Request("POST", "/logs-mysql-default/_doc"); + indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}"); + client().performRequest(indexRequest); + + // Roll over the data stream + Request rollover = new Request("POST", "/logs-mysql-default/_rollover"); + client().performRequest(rollover); + + dataStreams = (Map) getLocation("/_xpack/usage").get("data_streams"); + assertNotNull(dataStreams); + assertTrue((boolean) dataStreams.get("available")); + assertTrue((boolean) dataStreams.get("enabled")); + assertThat("got: " + dataStreams, dataStreams.get("data_streams"), equalTo(1)); + assertThat("got: " + dataStreams, dataStreams.get("indices_count"), equalTo(2)); + } + + public Map getLocation(String path) { + try { + Response executeRepsonse = client().performRequest(new Request("GET", path)); + try ( + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + EntityUtils.toByteArray(executeRepsonse.getEntity()) + ) + ) { + return parser.map(); + } + } catch (Exception e) { + fail("failed to execute GET request to " + path + " - got: " + e); + throw new RuntimeException(e); + } + } +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamFeatureSet.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamFeatureSet.java new file mode 100644 index 00000000000..fcbcc934aff --- /dev/null +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamFeatureSet.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.datastreams; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.xpack.core.XPackFeatureSet; +import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage; + +import java.util.Map; + +public class DataStreamFeatureSet implements XPackFeatureSet { + + private ClusterService clusterService; + + @Inject + public DataStreamFeatureSet(ClusterService clusterService) { + this.clusterService = clusterService; + } + + @Override + public String name() { + return XPackField.DATA_STREAMS; + } + + @Override + public boolean available() { + return true; + } + + @Override + public boolean enabled() { + return true; + } + + @Override + public Map nativeCodeInfo() { + return null; + } + + @Override + public void usage(ActionListener listener) { + final ClusterState state = clusterService.state(); + final Map dataStreams = state.metadata().dataStreams(); + final DataStreamFeatureSetUsage.DataStreamStats stats = new DataStreamFeatureSetUsage.DataStreamStats( + dataStreams.size(), + dataStreams.values().stream().map(ds -> ds.getIndices().size()).reduce(Integer::sum).orElse(0) + ); + final DataStreamFeatureSetUsage usage = new DataStreamFeatureSetUsage(stats); + listener.onResponse(usage); + } + +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java index 3e7008e4edb..f505a2ddfd3 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java @@ -6,17 +6,30 @@ package org.elasticsearch.xpack.datastreams; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.MetadataFieldMapper; -import org.elasticsearch.xpack.datastreams.mapper.DataStreamTimestampFieldMapper; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.datastreams.mapper.DataStreamTimestampFieldMapper; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import static org.elasticsearch.action.ActionModule.DATASTREAMS_FEATURE_ENABLED; -public class DataStreamsPlugin extends Plugin implements MapperPlugin { +public class DataStreamsPlugin extends Plugin implements ActionPlugin, MapperPlugin { + + private final boolean transportClientMode; + + public DataStreamsPlugin(Settings settings) { + this.transportClientMode = XPackPlugin.transportClientMode(settings); + } @Override public Map getMetadataMappers() { @@ -26,4 +39,16 @@ public class DataStreamsPlugin extends Plugin implements MapperPlugin { return Collections.emptyMap(); } } + + public Collection createGuiceModules() { + List modules = new ArrayList<>(); + + if (transportClientMode) { + return modules; + } + + modules.add(b -> XPackPlugin.bindFeatureSet(b, DataStreamFeatureSet.class)); + + return modules; + } } diff --git a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/mapper/MetadataCreateDataStreamServiceTests.java b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/mapper/MetadataCreateDataStreamServiceTests.java index abe5986f072..3c3a7fbfc17 100644 --- a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/mapper/MetadataCreateDataStreamServiceTests.java +++ b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/mapper/MetadataCreateDataStreamServiceTests.java @@ -72,7 +72,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase { ) .putMapping("_doc", mapping) .build(); - IndicesModule indicesModule = new IndicesModule(List.of(new DataStreamsPlugin())); + IndicesModule indicesModule = new IndicesModule(List.of(new DataStreamsPlugin(Settings.EMPTY))); MapperService mapperService = MapperTestUtils.newMapperService( xContentRegistry(), createTempDir(),