Backport: initial data stream commit (#53959)

This commits adds a data stream feature flag, initial definition of a data stream and
the stubs for the data stream create, delete and get APIs. Also simple serialization
tests are added and a rest test to thest the data stream API stubs.

This is a large amount of code and mainly mechanical, but this commit should be
straightforward to review, because there isn't any real logic.

The data stream transport and rest action are behind the data stream feature flag and
are only intialized if the feature flag is enabled. The feature flag is enabled if
elasticsearch is build as snapshot or a release build and the
'es.datastreams_feature_flag_registered' is enabled.

The integ-test-zip sets the feature flag if building a release build, otherwise
rest tests would fail.

Relates to #53100
This commit is contained in:
Martijn van Groningen 2020-03-23 12:58:09 +01:00 committed by GitHub
parent 1d1e177a62
commit aef7b89219
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1141 additions and 3 deletions

View File

@ -554,6 +554,7 @@ subprojects {
testClusters.all {
if (org.elasticsearch.gradle.info.BuildParams.isSnapshotBuild() == false) {
systemProperty 'es.itv2_feature_flag_registered', 'true'
systemProperty 'es.datastreams_feature_flag_registered', 'true'
}
}
}

View File

@ -798,7 +798,10 @@ public class RestHighLevelClientTests extends ESTestCase {
"scripts_painless_execute",
"cluster.put_component_template",
"cluster.get_component_template",
"cluster.delete_component_template"
"cluster.delete_component_template",
"indices.create_data_stream",
"indices.get_data_streams",
"indices.delete_data_stream"
};
//These API are not required for high-level client feature completeness
String[] notRequiredApi = new String[] {

View File

@ -0,0 +1,31 @@
{
"indices.create_data_stream":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
"description":"Creates or updates a data stream"
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_data_stream/{name}",
"methods":[
"PUT"
],
"parts":{
"name":{
"type":"string",
"description":"The name of the data stream"
}
}
}
]
},
"params":{
},
"body":{
"description":"The data stream definition",
"required":true
}
}
}

View File

@ -0,0 +1,26 @@
{
"indices.delete_data_stream":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
"description":"Deletes a data stream."
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_data_stream/{name}",
"methods":[
"DELETE"
],
"parts":{
"name":{
"type":"string",
"description":"The name of the data stream"
}
}
}
]
},
"params":{}
}
}

View File

@ -0,0 +1,33 @@
{
"indices.get_data_streams":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
"description":"Returns data streams."
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_data_streams",
"methods":[
"GET"
]
},
{
"path":"/_data_streams/{name}",
"methods":[
"GET"
],
"parts":{
"name":{
"type":"list",
"description":"The comma separated names of data streams"
}
}
}
]
},
"params":{
}
}
}

View File

@ -0,0 +1,26 @@
---
"Test stubs":
- skip:
version: " - 7.6.99"
reason: only available in 7.7+
- do:
indices.create_data_stream:
name: data-stream2
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
- do:
indices.get_data_streams: {}
- match: { 0.name: my_data_stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.indices: ['my_data_stream1-000000'] }
- match: { 1.name: my_data_stream2 }
- match: { 1.timestamp_field: '@timestamp' }
- match: { 1.indices: [] }
- do:
indices.delete_data_stream:
name: data-stream2
- is_true: acknowledged

View File

@ -28,6 +28,9 @@ import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclu
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
@ -251,9 +254,11 @@ import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterStatsAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterUpdateSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.RestCreateSnapshotAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteDataStreamAction;
import org.elasticsearch.rest.action.admin.cluster.RestDeleteRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.RestDeleteSnapshotAction;
import org.elasticsearch.rest.action.admin.cluster.RestDeleteStoredScriptAction;
import org.elasticsearch.rest.action.admin.indices.RestGetDataStreamsAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetScriptContextAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetScriptLanguageAction;
@ -266,6 +271,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestNodesInfoAction;
import org.elasticsearch.rest.action.admin.cluster.RestNodesStatsAction;
import org.elasticsearch.rest.action.admin.cluster.RestNodesUsageAction;
import org.elasticsearch.rest.action.admin.cluster.RestPendingClusterTasksAction;
import org.elasticsearch.rest.action.admin.indices.RestCreateDataStreamAction;
import org.elasticsearch.rest.action.admin.cluster.RestPutRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.RestPutStoredScriptAction;
import org.elasticsearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
@ -388,6 +394,24 @@ public class ActionModule extends AbstractModule {
}
}
private static final boolean DATASTREAMS_FEATURE_FLAG_REGISTERED;
static {
final String property = System.getProperty("es.datastreams_feature_flag_registered");
if (Build.CURRENT.isSnapshot() && property != null) {
throw new IllegalArgumentException("es.datastreams_feature_flag_registered is only supported in non-snapshot builds");
}
if (Build.CURRENT.isSnapshot() || "true".equals(property)) {
DATASTREAMS_FEATURE_FLAG_REGISTERED = true;
} else if ("false".equals(property) || property == null) {
DATASTREAMS_FEATURE_FLAG_REGISTERED = false;
} else {
throw new IllegalArgumentException(
"expected es.datastreams_feature_flag_registered to be unset or [true|false] but was [" + property + "]"
);
}
}
private final Settings settings;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final IndexScopedSettings indexScopedSettings;
@ -576,6 +600,13 @@ public class ActionModule extends AbstractModule {
actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);
// Data streams:
if (DATASTREAMS_FEATURE_FLAG_REGISTERED) {
actions.register(CreateDataStreamAction.INSTANCE, CreateDataStreamAction.TransportAction.class);
actions.register(DeleteDataStreamAction.INSTANCE, DeleteDataStreamAction.TransportAction.class);
actions.register(GetDataStreamsAction.INSTANCE, GetDataStreamsAction.TransportAction.class);
}
// Persistent tasks:
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
@ -718,6 +749,13 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestDeletePipelineAction());
registerHandler.accept(new RestSimulatePipelineAction());
// Data Stream API
if (DATASTREAMS_FEATURE_FLAG_REGISTERED) {
registerHandler.accept(new RestCreateDataStreamAction());
registerHandler.accept(new RestDeleteDataStreamAction());
registerHandler.accept(new RestGetDataStreamsAction());
}
// CAT API
registerHandler.accept(new RestAllocationAction());
registerHandler.accept(new RestShardsAction());

View File

@ -0,0 +1,127 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction();
public static final String NAME = "indices:admin/data_stream/create";
private CreateDataStreamAction() {
super(NAME, AcknowledgedResponse::new);
}
public static class Request extends MasterNodeRequest<Request> {
private final String name;
private String timestampFieldName;
public Request(String name) {
this.name = name;
}
public void setTimestampFieldName(String timestampFieldName) {
this.timestampFieldName = timestampFieldName;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public Request(StreamInput in) throws IOException {
super(in);
this.name = in.readString();
this.timestampFieldName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeString(timestampFieldName);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return name.equals(request.name) &&
timestampFieldName.equals(request.timestampFieldName);
}
@Override
public int hashCode() {
return Objects.hash(name, timestampFieldName);
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
listener.onResponse(new AcknowledgedResponse(true));
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction();
public static final String NAME = "indices:admin/data_stream/delete";
private DeleteDataStreamAction() {
super(NAME, AcknowledgedResponse::new);
}
public static class Request extends MasterNodeRequest<Request> {
private final String name;
public Request(String name) {
this.name = Objects.requireNonNull(name);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public Request(StreamInput in) throws IOException {
super(in);
this.name = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return name.equals(request.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
listener.onResponse(new AcknowledgedResponse(true));
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class GetDataStreamsAction extends ActionType<GetDataStreamsAction.Response> {
public static final GetDataStreamsAction INSTANCE = new GetDataStreamsAction();
public static final String NAME = "indices:admin/data_stream/get";
private GetDataStreamsAction() {
super(NAME, Response::new);
}
public static class Request extends MasterNodeReadRequest<Request> {
private final String[] names;
public Request(String[] names) {
this.names = Objects.requireNonNull(names);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public Request(StreamInput in) throws IOException {
super(in);
this.names = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(names);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Arrays.equals(names, request.names);
}
@Override
public int hashCode() {
return Arrays.hashCode(names);
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private final List<DataStream> dataStreams;
public Response(List<DataStream> dataStreams) {
this.dataStreams = dataStreams;
}
public Response(StreamInput in) throws IOException {
this(in.readList(DataStream::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(dataStreams);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray();
for (DataStream dataStream : dataStreams) {
dataStream.toXContent(builder, params);
}
builder.endArray();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return dataStreams.equals(response.dataStreams);
}
@Override
public int hashCode() {
return Objects.hash(dataStreams);
}
}
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response read(StreamInput in) throws IOException {
return new Response(in);
}
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<Response> listener) throws Exception {
List<DataStream> dataStreams = Arrays.asList(
new DataStream("my_data_stream1", "@timestamp", Collections.singletonList("my_data_stream1-000000")),
new DataStream("my_data_stream2", "@timestamp", Collections.emptyList())
);
listener.onResponse(new Response(dataStreams));
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -21,6 +21,9 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistRequestBuilder;
@ -819,4 +822,33 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
void rolloverIndex(RolloverRequest request, ActionListener<RolloverResponse> listener);
/**
* Store a data stream
*/
void createDataStream(CreateDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener);
/**
* Store a data stream
*/
ActionFuture<AcknowledgedResponse> createDataStream(CreateDataStreamAction.Request request);
/**
* Delete a data stream
*/
void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener);
/**
* Delete a data stream
*/
ActionFuture<AcknowledgedResponse> deleteDataStream(DeleteDataStreamAction.Request request);
/**
* Get data streams
*/
void getDataStreams(GetDataStreamsAction.Request request, ActionListener<GetDataStreamsAction.Response> listener);
/**
* Get data streams
*/
ActionFuture<GetDataStreamsAction.Response> getDataStreams(GetDataStreamsAction.Request request);
}

View File

@ -30,6 +30,9 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplai
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
@ -1742,6 +1745,36 @@ public abstract class AbstractClient implements Client {
public void getSettings(GetSettingsRequest request, ActionListener<GetSettingsResponse> listener) {
execute(GetSettingsAction.INSTANCE, request, listener);
}
@Override
public void createDataStream(CreateDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener) {
execute(CreateDataStreamAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<AcknowledgedResponse> createDataStream(CreateDataStreamAction.Request request) {
return execute(CreateDataStreamAction.INSTANCE, request);
}
@Override
public void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener) {
execute(DeleteDataStreamAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<AcknowledgedResponse> deleteDataStream(DeleteDataStreamAction.Request request) {
return execute(DeleteDataStreamAction.INSTANCE, request);
}
@Override
public void getDataStreams(GetDataStreamsAction.Request request, ActionListener<GetDataStreamsAction.Response> listener) {
execute(GetDataStreamsAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<GetDataStreamsAction.Response> getDataStreams(GetDataStreamsAction.Request request) {
return execute(GetDataStreamsAction.INSTANCE, request);
}
}
@Override

View File

@ -0,0 +1,116 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
private final String name;
private final String timeStampField;
private final List<String> indices;
public DataStream(String name, String timeStampField, List<String> indices) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = indices;
}
public String getName() {
return name;
}
public String getTimeStampField() {
return timeStampField;
}
public List<String> getIndices() {
return indices;
}
public DataStream(StreamInput in) throws IOException {
this(in.readString(), in.readString(), in.readStringList());
}
public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(DataStream::new, in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(timeStampField);
out.writeStringCollection(indices);
}
public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
public static final ParseField INDICES_FIELD = new ParseField("indices");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
args -> new DataStream((String) args[0], (String) args[1], (List<String>) args[2]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
}
public static DataStream fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NAME_FIELD.getPreferredName(), name);
builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
builder.field(INDICES_FIELD.getPreferredName(), indices);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DataStream that = (DataStream) o;
return name.equals(that.name) &&
timeStampField.equals(that.timeStampField) &&
indices.equals(that.indices);
}
@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class RestCreateDataStreamAction extends BaseRestHandler {
@Override
public String getName() {
return "create_data_stream_action";
}
@Override
public List<Route> routes() {
return Collections.singletonList(
new Route(RestRequest.Method.PUT, "/_data_stream/{name}")
);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(request.param("name"));
request.withContentOrSourceParamParserOrNull(parser -> {
Map<String, Object> body = parser.map();
String timeStampFieldName = (String) body.get(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName());
if (timeStampFieldName != null) {
putDataStreamRequest.setTimestampFieldName(timeStampFieldName);
}
});
return channel -> client.admin().indices().createDataStream(putDataStreamRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class RestDeleteDataStreamAction extends BaseRestHandler {
@Override
public String getName() {
return "delete_data_stream_action";
}
@Override
public List<Route> routes() {
return Collections.singletonList(new Route(RestRequest.Method.DELETE, "/_data_stream/{name}"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(request.param("name"));
return channel -> client.admin().indices().deleteDataStream(deleteDataStreamRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class RestGetDataStreamsAction extends BaseRestHandler {
@Override
public String getName() {
return "get_data_streams_action";
}
@Override
public List<Route> routes() {
return Arrays.asList(
new Route(RestRequest.Method.GET, "/_data_streams"),
new Route(RestRequest.Method.GET, "/_data_streams/{name}"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String[] names = Strings.splitStringByCommaToArray(request.param("name"));
GetDataStreamsAction.Request getDataStreamsRequest = new GetDataStreamsAction.Request(names);
return channel -> client.admin().indices().getDataStreams(getDataStreamsRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLength(8));
request.setTimestampFieldName(randomAlphaOfLength(8));
return request;
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLength(8));
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Request;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request createTestInstance() {
return new Request(generateRandomStringArray(8, 8, false));
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Response;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class GetDataStreamsResponseTests extends AbstractSerializingTestCase<Response> {
@Override
protected Writeable.Reader<Response> instanceReader() {
return Response::new;
}
@Override
protected Response doParseInstance(XContentParser parser) throws IOException {
List<DataStream> dataStreams = new ArrayList<>();
for (Token token = parser.nextToken(); token != Token.END_ARRAY; token = parser.nextToken()) {
if (token == Token.START_OBJECT) {
dataStreams.add(DataStream.fromXContent(parser));
}
}
return new Response(dataStreams);
}
@Override
protected Response createTestInstance() {
int numDataStreams = randomIntBetween(0, 8);
List<DataStream> dataStreams = new ArrayList<>();
for (int i = 0; i < numDataStreams; i++) {
dataStreams.add(new DataStream(randomAlphaOfLength(4), randomAlphaOfLength(4),
Arrays.asList(generateRandomStringArray(8, 4, false))));
}
return new Response(dataStreams);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
@Override
protected DataStream doParseInstance(XContentParser parser) throws IOException {
return DataStream.fromXContent(parser);
}
@Override
protected Writeable.Reader<DataStream> instanceReader() {
return DataStream::new;
}
@Override
protected DataStream createTestInstance() {
int numIndices = randomIntBetween(0, 128);
List<String> indices = new ArrayList<>(numIndices);
for (int i = 0; i < numIndices; i++) {
indices.add(randomAlphaOfLength(10));
}
return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices);
}
}

View File

@ -56,7 +56,7 @@ public class ClusterPrivilegeResolver {
private static final Set<String> MONITOR_WATCHER_PATTERN = Collections.singleton("cluster:monitor/xpack/watcher/*");
private static final Set<String> MONITOR_ROLLUP_PATTERN = Collections.singleton("cluster:monitor/xpack/rollup/*");
private static final Set<String> ALL_CLUSTER_PATTERN = Collections.unmodifiableSet(
Sets.newHashSet("cluster:*", "indices:admin/template/*"));
Sets.newHashSet("cluster:*", "indices:admin/template/*", "indices:admin/data_stream/*"));
private static final Set<String> MANAGE_ML_PATTERN = Collections.unmodifiableSet(
Sets.newHashSet("cluster:admin/xpack/ml/*", "cluster:monitor/xpack/ml/*"));
private static final Set<String> MANAGE_TRANSFORM_PATTERN = Collections.unmodifiableSet(
@ -205,7 +205,10 @@ public class ClusterPrivilegeResolver {
}
public static boolean isClusterAction(String actionName) {
return actionName.startsWith("cluster:") || actionName.startsWith("indices:admin/template/");
return actionName.startsWith("cluster:") ||
actionName.startsWith("indices:admin/template/") ||
// todo: hack until we implement security of data_streams
actionName.startsWith("indices:admin/data_stream/");
}
private static String actionToPattern(String text) {