From aef7b8921957727abb15a29658dbcc7d7caed3e9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Mar 2020 12:58:09 +0100 Subject: [PATCH] Backport: initial data stream commit (#53959) This commits adds a data stream feature flag, initial definition of a data stream and the stubs for the data stream create, delete and get APIs. Also simple serialization tests are added and a rest test to thest the data stream API stubs. This is a large amount of code and mainly mechanical, but this commit should be straightforward to review, because there isn't any real logic. The data stream transport and rest action are behind the data stream feature flag and are only intialized if the feature flag is enabled. The feature flag is enabled if elasticsearch is build as snapshot or a release build and the 'es.datastreams_feature_flag_registered' is enabled. The integ-test-zip sets the feature flag if building a release build, otherwise rest tests would fail. Relates to #53100 --- build.gradle | 1 + .../client/RestHighLevelClientTests.java | 5 +- .../api/indices.create_data_stream.json | 31 ++++ .../api/indices.delete_data_stream.json | 26 +++ .../api/indices.get_data_streams.json | 33 ++++ .../test/indices.data_stream/10_basic.yml | 26 +++ .../elasticsearch/action/ActionModule.java | 38 ++++ .../datastream/CreateDataStreamAction.java | 127 +++++++++++++ .../datastream/DeleteDataStreamAction.java | 119 ++++++++++++ .../datastream/GetDataStreamsAction.java | 170 ++++++++++++++++++ .../client/IndicesAdminClient.java | 32 ++++ .../client/support/AbstractClient.java | 33 ++++ .../cluster/metadata/DataStream.java | 116 ++++++++++++ .../indices/RestCreateDataStreamAction.java | 59 ++++++ .../indices/RestDeleteDataStreamAction.java | 48 +++++ .../indices/RestGetDataStreamsAction.java | 52 ++++++ .../CreateDataStreamRequestTests.java | 38 ++++ .../DeleteDataStreamRequestTests.java | 36 ++++ .../GetDataStreamsRequestTests.java | 36 ++++ .../GetDataStreamsResponseTests.java | 61 +++++++ .../cluster/metadata/DataStreamTests.java | 50 ++++++ .../privilege/ClusterPrivilegeResolver.java | 7 +- 22 files changed, 1141 insertions(+), 3 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java diff --git a/build.gradle b/build.gradle index 53749c2a21c..edc37d47b9f 100644 --- a/build.gradle +++ b/build.gradle @@ -554,6 +554,7 @@ subprojects { testClusters.all { if (org.elasticsearch.gradle.info.BuildParams.isSnapshotBuild() == false) { systemProperty 'es.itv2_feature_flag_registered', 'true' + systemProperty 'es.datastreams_feature_flag_registered', 'true' } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 2dd4a9d0212..440b979d350 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -798,7 +798,10 @@ public class RestHighLevelClientTests extends ESTestCase { "scripts_painless_execute", "cluster.put_component_template", "cluster.get_component_template", - "cluster.delete_component_template" + "cluster.delete_component_template", + "indices.create_data_stream", + "indices.get_data_streams", + "indices.delete_data_stream" }; //These API are not required for high-level client feature completeness String[] notRequiredApi = new String[] { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json new file mode 100644 index 00000000000..ef8615a69b1 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json @@ -0,0 +1,31 @@ +{ + "indices.create_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Creates or updates a data stream" + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}", + "methods":[ + "PUT" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream" + } + } + } + ] + }, + "params":{ + }, + "body":{ + "description":"The data stream definition", + "required":true + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json new file mode 100644 index 00000000000..71ed5808cae --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json @@ -0,0 +1,26 @@ +{ + "indices.delete_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Deletes a data stream." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}", + "methods":[ + "DELETE" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream" + } + } + } + ] + }, + "params":{} + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json new file mode 100644 index 00000000000..42415068d4a --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json @@ -0,0 +1,33 @@ +{ + "indices.get_data_streams":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Returns data streams." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_streams", + "methods":[ + "GET" + ] + }, + { + "path":"/_data_streams/{name}", + "methods":[ + "GET" + ], + "parts":{ + "name":{ + "type":"list", + "description":"The comma separated names of data streams" + } + } + } + ] + }, + "params":{ + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml new file mode 100644 index 00000000000..035c20c1600 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -0,0 +1,26 @@ +--- +"Test stubs": + - skip: + version: " - 7.6.99" + reason: only available in 7.7+ + + - do: + indices.create_data_stream: + name: data-stream2 + body: + timestamp_field: "@timestamp" + - is_true: acknowledged + + - do: + indices.get_data_streams: {} + - match: { 0.name: my_data_stream1 } + - match: { 0.timestamp_field: '@timestamp' } + - match: { 0.indices: ['my_data_stream1-000000'] } + - match: { 1.name: my_data_stream2 } + - match: { 1.timestamp_field: '@timestamp' } + - match: { 1.indices: [] } + + - do: + indices.delete_data_stream: + name: data-stream2 + - is_true: acknowledged diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 430271a76c9..d3ba8b6eb46 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -28,6 +28,9 @@ import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclu import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; @@ -251,9 +254,11 @@ import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterStatsAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterUpdateSettingsAction; import org.elasticsearch.rest.action.admin.cluster.RestCreateSnapshotAction; +import org.elasticsearch.rest.action.admin.indices.RestDeleteDataStreamAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteStoredScriptAction; +import org.elasticsearch.rest.action.admin.indices.RestGetDataStreamsAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.rest.action.admin.cluster.RestGetScriptContextAction; import org.elasticsearch.rest.action.admin.cluster.RestGetScriptLanguageAction; @@ -266,6 +271,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestNodesInfoAction; import org.elasticsearch.rest.action.admin.cluster.RestNodesStatsAction; import org.elasticsearch.rest.action.admin.cluster.RestNodesUsageAction; import org.elasticsearch.rest.action.admin.cluster.RestPendingClusterTasksAction; +import org.elasticsearch.rest.action.admin.indices.RestCreateDataStreamAction; import org.elasticsearch.rest.action.admin.cluster.RestPutRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestPutStoredScriptAction; import org.elasticsearch.rest.action.admin.cluster.RestReloadSecureSettingsAction; @@ -388,6 +394,24 @@ public class ActionModule extends AbstractModule { } } + private static final boolean DATASTREAMS_FEATURE_FLAG_REGISTERED; + + static { + final String property = System.getProperty("es.datastreams_feature_flag_registered"); + if (Build.CURRENT.isSnapshot() && property != null) { + throw new IllegalArgumentException("es.datastreams_feature_flag_registered is only supported in non-snapshot builds"); + } + if (Build.CURRENT.isSnapshot() || "true".equals(property)) { + DATASTREAMS_FEATURE_FLAG_REGISTERED = true; + } else if ("false".equals(property) || property == null) { + DATASTREAMS_FEATURE_FLAG_REGISTERED = false; + } else { + throw new IllegalArgumentException( + "expected es.datastreams_feature_flag_registered to be unset or [true|false] but was [" + property + "]" + ); + } + } + private final Settings settings; private final IndexNameExpressionResolver indexNameExpressionResolver; private final IndexScopedSettings indexScopedSettings; @@ -576,6 +600,13 @@ public class ActionModule extends AbstractModule { actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register); + // Data streams: + if (DATASTREAMS_FEATURE_FLAG_REGISTERED) { + actions.register(CreateDataStreamAction.INSTANCE, CreateDataStreamAction.TransportAction.class); + actions.register(DeleteDataStreamAction.INSTANCE, DeleteDataStreamAction.TransportAction.class); + actions.register(GetDataStreamsAction.INSTANCE, GetDataStreamsAction.TransportAction.class); + } + // Persistent tasks: actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class); actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class); @@ -718,6 +749,13 @@ public class ActionModule extends AbstractModule { registerHandler.accept(new RestDeletePipelineAction()); registerHandler.accept(new RestSimulatePipelineAction()); + // Data Stream API + if (DATASTREAMS_FEATURE_FLAG_REGISTERED) { + registerHandler.accept(new RestCreateDataStreamAction()); + registerHandler.accept(new RestDeleteDataStreamAction()); + registerHandler.accept(new RestGetDataStreamsAction()); + } + // CAT API registerHandler.accept(new RestAllocationAction()); registerHandler.accept(new RestShardsAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java new file mode 100644 index 00000000000..08e7978f39f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -0,0 +1,127 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class CreateDataStreamAction extends ActionType { + + public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/create"; + + private CreateDataStreamAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends MasterNodeRequest { + + private final String name; + private String timestampFieldName; + + public Request(String name) { + this.name = name; + } + + public void setTimestampFieldName(String timestampFieldName) { + this.timestampFieldName = timestampFieldName; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + this.timestampFieldName = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeString(timestampFieldName); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name) && + timestampFieldName.equals(request.timestampFieldName); + } + + @Override + public int hashCode() { + return Objects.hash(name, timestampFieldName); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java new file mode 100644 index 00000000000..3443199381d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class DeleteDataStreamAction extends ActionType { + + public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/delete"; + + private DeleteDataStreamAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends MasterNodeRequest { + + private final String name; + + public Request(String name) { + this.name = Objects.requireNonNull(name); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java new file mode 100644 index 00000000000..8c9be4442a9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java @@ -0,0 +1,170 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class GetDataStreamsAction extends ActionType { + + public static final GetDataStreamsAction INSTANCE = new GetDataStreamsAction(); + public static final String NAME = "indices:admin/data_stream/get"; + + private GetDataStreamsAction() { + super(NAME, Response::new); + } + + public static class Request extends MasterNodeReadRequest { + + private final String[] names; + + public Request(String[] names) { + this.names = Objects.requireNonNull(names); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.names = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(names); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(names, request.names); + } + + @Override + public int hashCode() { + return Arrays.hashCode(names); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final List dataStreams; + + public Response(List dataStreams) { + this.dataStreams = dataStreams; + } + + public Response(StreamInput in) throws IOException { + this(in.readList(DataStream::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(dataStreams); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(); + for (DataStream dataStream : dataStreams) { + dataStream.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return dataStreams.equals(response.dataStreams); + } + + @Override + public int hashCode() { + return Objects.hash(dataStreams); + } + } + + public static class TransportAction extends TransportMasterNodeReadAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response read(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { + List dataStreams = Arrays.asList( + new DataStream("my_data_stream1", "@timestamp", Collections.singletonList("my_data_stream1-000000")), + new DataStream("my_data_stream2", "@timestamp", Collections.emptyList()) + ); + listener.onResponse(new Response(dataStreams)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 3324d0abcfa..c234cf29dee 100644 --- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -21,6 +21,9 @@ package org.elasticsearch.client; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistRequestBuilder; @@ -819,4 +822,33 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ void rolloverIndex(RolloverRequest request, ActionListener listener); + /** + * Store a data stream + */ + void createDataStream(CreateDataStreamAction.Request request, ActionListener listener); + + /** + * Store a data stream + */ + ActionFuture createDataStream(CreateDataStreamAction.Request request); + + /** + * Delete a data stream + */ + void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener); + + /** + * Delete a data stream + */ + ActionFuture deleteDataStream(DeleteDataStreamAction.Request request); + + /** + * Get data streams + */ + void getDataStreams(GetDataStreamsAction.Request request, ActionListener listener); + + /** + * Get data streams + */ + ActionFuture getDataStreams(GetDataStreamsAction.Request request); } diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 82e3ace2ee7..e53895a55c2 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -30,6 +30,9 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplai import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; @@ -1742,6 +1745,36 @@ public abstract class AbstractClient implements Client { public void getSettings(GetSettingsRequest request, ActionListener listener) { execute(GetSettingsAction.INSTANCE, request, listener); } + + @Override + public void createDataStream(CreateDataStreamAction.Request request, ActionListener listener) { + execute(CreateDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture createDataStream(CreateDataStreamAction.Request request) { + return execute(CreateDataStreamAction.INSTANCE, request); + } + + @Override + public void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener) { + execute(DeleteDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture deleteDataStream(DeleteDataStreamAction.Request request) { + return execute(DeleteDataStreamAction.INSTANCE, request); + } + + @Override + public void getDataStreams(GetDataStreamsAction.Request request, ActionListener listener) { + execute(GetDataStreamsAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture getDataStreams(GetDataStreamsAction.Request request) { + return execute(GetDataStreamsAction.INSTANCE, request); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java new file mode 100644 index 00000000000..f6bba191a17 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public final class DataStream extends AbstractDiffable implements ToXContentObject { + + private final String name; + private final String timeStampField; + private final List indices; + + public DataStream(String name, String timeStampField, List indices) { + this.name = name; + this.timeStampField = timeStampField; + this.indices = indices; + } + + public String getName() { + return name; + } + + public String getTimeStampField() { + return timeStampField; + } + + public List getIndices() { + return indices; + } + + public DataStream(StreamInput in) throws IOException { + this(in.readString(), in.readString(), in.readStringList()); + } + + public static Diff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(DataStream::new, in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeString(timeStampField); + out.writeStringCollection(indices); + } + + public static final ParseField NAME_FIELD = new ParseField("name"); + public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); + public static final ParseField INDICES_FIELD = new ParseField("indices"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", + args -> new DataStream((String) args[0], (String) args[1], (List) args[2])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD); + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); + } + + public static DataStream fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD.getPreferredName(), name); + builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField); + builder.field(INDICES_FIELD.getPreferredName(), indices); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DataStream that = (DataStream) o; + return name.equals(that.name) && + timeStampField.equals(that.timeStampField) && + indices.equals(that.indices); + } + + @Override + public int hashCode() { + return Objects.hash(name, timeStampField, indices); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java new file mode 100644 index 00000000000..7e80c227b55 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class RestCreateDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "create_data_stream_action"; + } + + @Override + public List routes() { + return Collections.singletonList( + new Route(RestRequest.Method.PUT, "/_data_stream/{name}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(request.param("name")); + request.withContentOrSourceParamParserOrNull(parser -> { + Map body = parser.map(); + String timeStampFieldName = (String) body.get(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName()); + if (timeStampFieldName != null) { + putDataStreamRequest.setTimestampFieldName(timeStampFieldName); + } + }); + return channel -> client.admin().indices().createDataStream(putDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java new file mode 100644 index 00000000000..bbbf7319844 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class RestDeleteDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "delete_data_stream_action"; + } + + @Override + public List routes() { + return Collections.singletonList(new Route(RestRequest.Method.DELETE, "/_data_stream/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(request.param("name")); + return channel -> client.admin().indices().deleteDataStream(deleteDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java new file mode 100644 index 00000000000..3dd1f67060c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class RestGetDataStreamsAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_data_streams_action"; + } + + @Override + public List routes() { + return Arrays.asList( + new Route(RestRequest.Method.GET, "/_data_streams"), + new Route(RestRequest.Method.GET, "/_data_streams/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String[] names = Strings.splitStringByCommaToArray(request.param("name")); + GetDataStreamsAction.Request getDataStreamsRequest = new GetDataStreamsAction.Request(names); + return channel -> client.admin().indices().getDataStreams(getDataStreamsRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java new file mode 100644 index 00000000000..d6a846c205f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + Request request = new Request(randomAlphaOfLength(8)); + request.setTimestampFieldName(randomAlphaOfLength(8)); + return request; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java new file mode 100644 index 00000000000..f4600656997 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLength(8)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java new file mode 100644 index 00000000000..062bdef629c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(generateRandomStringArray(8, 8, false)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java new file mode 100644 index 00000000000..c110def6d80 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Response; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class GetDataStreamsResponseTests extends AbstractSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response doParseInstance(XContentParser parser) throws IOException { + List dataStreams = new ArrayList<>(); + for (Token token = parser.nextToken(); token != Token.END_ARRAY; token = parser.nextToken()) { + if (token == Token.START_OBJECT) { + dataStreams.add(DataStream.fromXContent(parser)); + } + } + return new Response(dataStreams); + } + + @Override + protected Response createTestInstance() { + int numDataStreams = randomIntBetween(0, 8); + List dataStreams = new ArrayList<>(); + for (int i = 0; i < numDataStreams; i++) { + dataStreams.add(new DataStream(randomAlphaOfLength(4), randomAlphaOfLength(4), + Arrays.asList(generateRandomStringArray(8, 4, false)))); + } + return new Response(dataStreams); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java new file mode 100644 index 00000000000..072165ab098 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class DataStreamTests extends AbstractSerializingTestCase { + + @Override + protected DataStream doParseInstance(XContentParser parser) throws IOException { + return DataStream.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return DataStream::new; + } + + @Override + protected DataStream createTestInstance() { + int numIndices = randomIntBetween(0, 128); + List indices = new ArrayList<>(numIndices); + for (int i = 0; i < numIndices; i++) { + indices.add(randomAlphaOfLength(10)); + } + return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java index 2c40e784cbb..5b374146339 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java @@ -56,7 +56,7 @@ public class ClusterPrivilegeResolver { private static final Set MONITOR_WATCHER_PATTERN = Collections.singleton("cluster:monitor/xpack/watcher/*"); private static final Set MONITOR_ROLLUP_PATTERN = Collections.singleton("cluster:monitor/xpack/rollup/*"); private static final Set ALL_CLUSTER_PATTERN = Collections.unmodifiableSet( - Sets.newHashSet("cluster:*", "indices:admin/template/*")); + Sets.newHashSet("cluster:*", "indices:admin/template/*", "indices:admin/data_stream/*")); private static final Set MANAGE_ML_PATTERN = Collections.unmodifiableSet( Sets.newHashSet("cluster:admin/xpack/ml/*", "cluster:monitor/xpack/ml/*")); private static final Set MANAGE_TRANSFORM_PATTERN = Collections.unmodifiableSet( @@ -205,7 +205,10 @@ public class ClusterPrivilegeResolver { } public static boolean isClusterAction(String actionName) { - return actionName.startsWith("cluster:") || actionName.startsWith("indices:admin/template/"); + return actionName.startsWith("cluster:") || + actionName.startsWith("indices:admin/template/") || + // todo: hack until we implement security of data_streams + actionName.startsWith("indices:admin/data_stream/"); } private static String actionToPattern(String text) {