From 3f168ac85cbd5834fbcd8e622e8993fb53c80398 Mon Sep 17 00:00:00 2001 From: Nick Knize Date: Tue, 2 Feb 2021 16:25:35 -0600 Subject: [PATCH] [PURIFY] Revert "Move data stream transport and rest action to xpack (#59593)" (#28) This commit reverts commit 2a89e13e43d62ec9c443bfaba42c9a1da7e81223. Relicensing data streams from OSS to XPack. Signed-off-by: Peter Nied --- .../client/RestHighLevelClientTests.java | 32 -- .../MiscellaneousDocumentationIT.java | 59 --- .../DataStreamsStatsResponseTests.java | 2 +- .../indices/GetDataStreamResponseTests.java | 4 +- .../elasticsearch/action/ActionModule.java | 27 +- .../datastream/CreateDataStreamAction.java | 148 ++++++ .../datastream/DataStreamsStatsAction.java | 441 ++++++++++++++++++ .../datastream/DeleteDataStreamAction.java | 233 +++++++++ .../datastream/GetDataStreamAction.java | 323 +++++++++++++ .../client/IndicesAdminClient.java | 33 ++ .../client/support/AbstractClient.java | 33 ++ .../indices/RestCreateDataStreamAction.java | 50 ++ .../indices/RestDataStreamsStatsAction.java | 60 +++ .../indices/RestDeleteDataStreamAction.java | 50 ++ .../indices/RestGetDataStreamsAction.java | 52 +++ .../CreateDataStreamRequestTests.java | 55 +++ .../DataStreamsStatsResponseTests.java | 76 +++ .../DeleteDataStreamRequestTests.java | 222 +++++++++ .../GetDataStreamsRequestTests.java | 143 ++++++ .../GetDataStreamsResponseTests.java | 47 ++ .../mapping/put/PutMappingRequestTests.java | 12 +- .../MetadataRolloverServiceTests.java | 11 +- .../metadata/DataStreamMetadataTests.java | 3 +- .../cluster/metadata/DataStreamTests.java | 22 +- .../MetadataDeleteIndexServiceTests.java | 2 + .../MetadataIndexStateServiceTests.java | 4 +- .../cluster/metadata/MetadataTests.java | 2 +- .../cluster/DataStreamTestHelper.java | 4 + .../org/elasticsearch/test/TestCluster.java | 16 + .../test/rest/ESRestTestCase.java | 4 +- 30 files changed, 2049 insertions(+), 121 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DataStreamsStatsAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDataStreamsStatsAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DataStreamsStatsResponseTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 56d1fc4f1a5..9b2e0020252 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -45,37 +45,6 @@ import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.core.MainRequest; import org.elasticsearch.client.core.MainResponse; -import org.elasticsearch.client.ml.dataframe.DataFrameAnalysis; -import org.elasticsearch.client.ml.dataframe.evaluation.classification.AccuracyMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.classification.AucRocMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.classification.Classification; -import org.elasticsearch.client.ml.dataframe.evaluation.classification.MulticlassConfusionMatrixMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.classification.PrecisionMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.classification.RecallMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.outlierdetection.ConfusionMatrixMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.outlierdetection.OutlierDetection; -import org.elasticsearch.client.ml.dataframe.evaluation.regression.HuberMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.regression.MeanSquaredErrorMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.regression.MeanSquaredLogarithmicErrorMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.regression.RSquaredMetric; -import org.elasticsearch.client.ml.dataframe.evaluation.regression.Regression; -import org.elasticsearch.client.ml.dataframe.stats.classification.ClassificationStats; -import org.elasticsearch.client.ml.dataframe.stats.outlierdetection.OutlierDetectionStats; -import org.elasticsearch.client.ml.dataframe.stats.regression.RegressionStats; -import org.elasticsearch.client.ml.inference.preprocessing.CustomWordEmbedding; -import org.elasticsearch.client.ml.inference.preprocessing.FrequencyEncoding; -import org.elasticsearch.client.ml.inference.preprocessing.NGram; -import org.elasticsearch.client.ml.inference.preprocessing.OneHotEncoding; -import org.elasticsearch.client.ml.inference.preprocessing.TargetMeanEncoding; -import org.elasticsearch.client.ml.inference.trainedmodel.ClassificationConfig; -import org.elasticsearch.client.ml.inference.trainedmodel.RegressionConfig; -import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.Ensemble; -import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.Exponent; -import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.LogisticRegression; -import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.WeightedMode; -import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.WeightedSum; -import org.elasticsearch.client.ml.inference.trainedmodel.langident.LangIdentNeuralNetwork; -import org.elasticsearch.client.ml.inference.trainedmodel.tree.Tree; import org.elasticsearch.client.transform.transforms.SyncConfig; import org.elasticsearch.client.transform.transforms.TimeSyncConfig; import org.elasticsearch.common.CheckedFunction; @@ -128,7 +97,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.elasticsearch.client.ml.dataframe.evaluation.MlEvaluationNamedXContentProvider.registeredMetricName; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MiscellaneousDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MiscellaneousDocumentationIT.java index 70eef41375e..a6368338d14 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MiscellaneousDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MiscellaneousDocumentationIT.java @@ -27,13 +27,6 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.core.MainResponse; -import org.elasticsearch.client.xpack.XPackInfoRequest; -import org.elasticsearch.client.xpack.XPackInfoResponse; -import org.elasticsearch.client.xpack.XPackInfoResponse.BuildInfo; -import org.elasticsearch.client.xpack.XPackInfoResponse.FeatureSetsInfo; -import org.elasticsearch.client.xpack.XPackInfoResponse.LicenseInfo; -import org.elasticsearch.client.xpack.XPackUsageRequest; -import org.elasticsearch.client.xpack.XPackUsageResponse; import java.io.IOException; import java.time.Instant; @@ -94,58 +87,6 @@ public class MiscellaneousDocumentationIT extends ESRestHighLevelClientTestCase assertTrue(response); } - public void testXPackInfo() throws Exception { - RestHighLevelClient client = highLevelClient(); - { - //tag::x-pack-info-execute - XPackInfoRequest request = new XPackInfoRequest(); - request.setVerbose(true); // <1> - request.setCategories(EnumSet.of( // <2> - XPackInfoRequest.Category.BUILD, - XPackInfoRequest.Category.LICENSE, - XPackInfoRequest.Category.FEATURES)); - XPackInfoResponse response = client.xpack().info(request, RequestOptions.DEFAULT); - //end::x-pack-info-execute - - //tag::x-pack-info-response - BuildInfo build = response.getBuildInfo(); // <1> - LicenseInfo license = response.getLicenseInfo(); // <2> - assertThat(license.getExpiryDate(), is(greaterThan(Instant.now().toEpochMilli()))); // <3> - FeatureSetsInfo features = response.getFeatureSetsInfo(); // <4> - //end::x-pack-info-response - - assertNotNull(response.getBuildInfo()); - assertNotNull(response.getLicenseInfo()); - assertNotNull(response.getFeatureSetsInfo()); - } - { - XPackInfoRequest request = new XPackInfoRequest(); - // tag::x-pack-info-execute-listener - ActionListener listener = new ActionListener() { - @Override - public void onResponse(XPackInfoResponse indexResponse) { - // <1> - } - - @Override - public void onFailure(Exception e) { - // <2> - } - }; - // end::x-pack-info-execute-listener - - // Replace the empty listener by a blocking listener in test - final CountDownLatch latch = new CountDownLatch(1); - listener = new LatchedActionListener<>(listener, latch); - - // tag::x-pack-info-execute-async - client.xpack().infoAsync(request, RequestOptions.DEFAULT, listener); // <1> - // end::x-pack-info-execute-async - - assertTrue(latch.await(30L, TimeUnit.SECONDS)); - } - } - public void testInitializationFromClientBuilder() throws IOException { //tag::rest-high-level-client-init RestHighLevelClient client = new RestHighLevelClient( diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/DataStreamsStatsResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/DataStreamsStatsResponseTests.java index 78fd777d249..f95bcad1740 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/DataStreamsStatsResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/DataStreamsStatsResponseTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.client.indices; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.xpack.core.action.DataStreamsStatsAction; +import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.client.AbstractResponseTestCase; import org.elasticsearch.common.unit.ByteSizeValue; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java index 34b5b2af9bf..6192a1c5dbf 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java @@ -19,8 +19,8 @@ package org.elasticsearch.client.indices; -import org.elasticsearch.xpack.core.action.GetDataStreamAction; -import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction.Response.DataStreamInfo; import org.elasticsearch.client.AbstractResponseTestCase; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.DataStream; diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 96ff879c686..e45b5f68cb6 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -106,7 +106,6 @@ import org.elasticsearch.action.admin.indices.cache.clear.TransportClearIndicesC import org.elasticsearch.action.admin.indices.close.CloseIndexAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.admin.indices.create.AutoCreateAction; -import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexAction; @@ -117,6 +116,10 @@ import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDangli import org.elasticsearch.action.admin.indices.dangling.import_index.TransportImportDanglingIndexAction; import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction; import org.elasticsearch.action.admin.indices.dangling.list.TransportListDanglingIndicesAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction; @@ -308,10 +311,12 @@ import org.elasticsearch.rest.action.admin.indices.RestAddIndexBlockAction; import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction; import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction; import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction; +import org.elasticsearch.rest.action.admin.indices.RestCreateDataStreamAction; import org.elasticsearch.rest.action.admin.indices.RestCreateIndexAction; +import org.elasticsearch.rest.action.admin.indices.RestDataStreamsStatsAction; import org.elasticsearch.rest.action.admin.indices.RestDeleteComponentTemplateAction; import org.elasticsearch.rest.action.admin.indices.RestDeleteComposableIndexTemplateAction; -import org.elasticsearch.rest.action.admin.indices.RestDeleteComposableIndexTemplateAction; +import org.elasticsearch.rest.action.admin.indices.RestDeleteDataStreamAction; import org.elasticsearch.rest.action.admin.indices.RestDeleteIndexAction; import org.elasticsearch.rest.action.admin.indices.RestDeleteIndexTemplateAction; import org.elasticsearch.rest.action.admin.indices.RestFlushAction; @@ -319,7 +324,7 @@ import org.elasticsearch.rest.action.admin.indices.RestForceMergeAction; import org.elasticsearch.rest.action.admin.indices.RestGetAliasesAction; import org.elasticsearch.rest.action.admin.indices.RestGetComponentTemplateAction; import org.elasticsearch.rest.action.admin.indices.RestGetComposableIndexTemplateAction; -import org.elasticsearch.rest.action.admin.indices.RestGetComposableIndexTemplateAction; +import org.elasticsearch.rest.action.admin.indices.RestGetDataStreamsAction; import org.elasticsearch.rest.action.admin.indices.RestGetFieldMappingAction; import org.elasticsearch.rest.action.admin.indices.RestGetIndexTemplateAction; import org.elasticsearch.rest.action.admin.indices.RestGetIndicesAction; @@ -593,7 +598,6 @@ public class ActionModule extends AbstractModule { actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class); actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class); actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class); - actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class); //Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); @@ -612,6 +616,13 @@ public class ActionModule extends AbstractModule { actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register); + // Data streams: + actions.register(CreateDataStreamAction.INSTANCE, CreateDataStreamAction.TransportAction.class); + actions.register(DeleteDataStreamAction.INSTANCE, DeleteDataStreamAction.TransportAction.class); + actions.register(GetDataStreamAction.INSTANCE, GetDataStreamAction.TransportAction.class); + actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class); + actions.register(DataStreamsStatsAction.INSTANCE, DataStreamsStatsAction.TransportAction.class); + // Persistent tasks: actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class); actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class); @@ -718,7 +729,6 @@ public class ActionModule extends AbstractModule { registerHandler.accept(new RestUpgradeAction()); registerHandler.accept(new RestUpgradeStatusAction()); registerHandler.accept(new RestClearIndicesCacheAction()); - registerHandler.accept(new RestResolveIndexAction()); registerHandler.accept(new RestIndexAction()); registerHandler.accept(new CreateHandler()); @@ -771,6 +781,13 @@ public class ActionModule extends AbstractModule { registerHandler.accept(new RestImportDanglingIndexAction()); registerHandler.accept(new RestDeleteDanglingIndexAction()); + // Data Stream API + registerHandler.accept(new RestCreateDataStreamAction()); + registerHandler.accept(new RestDeleteDataStreamAction()); + registerHandler.accept(new RestGetDataStreamsAction()); + registerHandler.accept(new RestResolveIndexAction()); + registerHandler.accept(new RestDataStreamsStatsAction()); + // CAT API registerHandler.accept(new RestAllocationAction()); registerHandler.accept(new RestShardsAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java new file mode 100644 index 00000000000..eab167ba5f6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService; +import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class CreateDataStreamAction extends ActionType { + + public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/create"; + + private CreateDataStreamAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest { + + private final String name; + + public Request(String name) { + this.name = name; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (Strings.hasText(name) == false) { + validationException = ValidateActions.addValidationError("name is missing", validationException); + } + return validationException; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public String[] indices() { + return new String[]{name}; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + private final MetadataCreateDataStreamService metadataCreateDataStreamService; + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + MetadataCreateDataStreamService metadataCreateDataStreamService) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.metadataCreateDataStreamService = metadataCreateDataStreamService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { + CreateDataStreamClusterStateUpdateRequest updateRequest = new CreateDataStreamClusterStateUpdateRequest( + request.name, + request.masterNodeTimeout(), + request.timeout() + ); + metadataCreateDataStreamService.createDataStream(updateRequest, listener); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DataStreamsStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DataStreamsStatsAction.java new file mode 100644 index 00000000000..2ca3a9c19c1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DataStreamsStatsAction.java @@ -0,0 +1,441 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.datastream; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.PointValues; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexAbstractionResolver; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.index.store.StoreStats; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.stream.Stream; + +public class DataStreamsStatsAction extends ActionType { + + public static final DataStreamsStatsAction INSTANCE = new DataStreamsStatsAction(); + public static final String NAME = "indices:monitor/data_stream/stats"; + + public DataStreamsStatsAction() { + super(NAME, DataStreamsStatsAction.Response::new); + } + + public static class Request extends BroadcastRequest { + public Request() { + super((String[]) null); + } + + public Request(StreamInput in) throws IOException { + super(in); + } + } + + public static class Response extends BroadcastResponse { + private final int dataStreamCount; + private final int backingIndices; + private final ByteSizeValue totalStoreSize; + private final DataStreamStats[] dataStreams; + + public Response(int totalShards, int successfulShards, int failedShards, List shardFailures, + int dataStreamCount, int backingIndices, ByteSizeValue totalStoreSize, DataStreamStats[] dataStreams) { + super(totalShards, successfulShards, failedShards, shardFailures); + this.dataStreamCount = dataStreamCount; + this.backingIndices = backingIndices; + this.totalStoreSize = totalStoreSize; + this.dataStreams = dataStreams; + } + + public Response(StreamInput in) throws IOException { + super(in); + this.dataStreamCount = in.readVInt(); + this.backingIndices = in.readVInt(); + this.totalStoreSize = new ByteSizeValue(in); + this.dataStreams = in.readArray(DataStreamStats::new, DataStreamStats[]::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(dataStreamCount); + out.writeVInt(backingIndices); + totalStoreSize.writeTo(out); + out.writeArray(dataStreams); + } + + @Override + protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException { + builder.field("data_stream_count", dataStreamCount); + builder.field("backing_indices", backingIndices); + builder.humanReadableField("total_store_size_bytes", "total_store_size", totalStoreSize); + builder.array("data_streams", (Object[]) dataStreams); + } + + public int getDataStreamCount() { + return dataStreamCount; + } + + public int getBackingIndices() { + return backingIndices; + } + + public ByteSizeValue getTotalStoreSize() { + return totalStoreSize; + } + + public DataStreamStats[] getDataStreams() { + return dataStreams; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Response response = (Response) obj; + return dataStreamCount == response.dataStreamCount && + backingIndices == response.backingIndices && + Objects.equals(totalStoreSize, response.totalStoreSize) && + Arrays.equals(dataStreams, response.dataStreams); + } + + @Override + public int hashCode() { + int result = Objects.hash(dataStreamCount, backingIndices, totalStoreSize); + result = 31 * result + Arrays.hashCode(dataStreams); + return result; + } + + @Override + public String toString() { + return "Response{" + + "dataStreamCount=" + dataStreamCount + + ", backingIndices=" + backingIndices + + ", totalStoreSize=" + totalStoreSize + + ", dataStreams=" + Arrays.toString(dataStreams) + + '}'; + } + } + + public static class DataStreamStats implements ToXContentObject, Writeable { + private final String dataStream; + private final int backingIndices; + private final ByteSizeValue storeSize; + private final long maximumTimestamp; + + public DataStreamStats(String dataStream, int backingIndices, ByteSizeValue storeSize, long maximumTimestamp) { + this.dataStream = dataStream; + this.backingIndices = backingIndices; + this.storeSize = storeSize; + this.maximumTimestamp = maximumTimestamp; + } + + public DataStreamStats(StreamInput in) throws IOException { + this.dataStream = in.readString(); + this.backingIndices = in.readVInt(); + this.storeSize = new ByteSizeValue(in); + this.maximumTimestamp = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(dataStream); + out.writeVInt(backingIndices); + storeSize.writeTo(out); + out.writeVLong(maximumTimestamp); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("data_stream", dataStream); + builder.field("backing_indices", backingIndices); + builder.humanReadableField("store_size_bytes", "store_size", storeSize); + builder.field("maximum_timestamp", maximumTimestamp); + builder.endObject(); + return builder; + } + + public String getDataStream() { + return dataStream; + } + + public int getBackingIndices() { + return backingIndices; + } + + public ByteSizeValue getStoreSize() { + return storeSize; + } + + public long getMaximumTimestamp() { + return maximumTimestamp; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + DataStreamStats that = (DataStreamStats) obj; + return backingIndices == that.backingIndices && + maximumTimestamp == that.maximumTimestamp && + Objects.equals(dataStream, that.dataStream) && + Objects.equals(storeSize, that.storeSize); + } + + @Override + public int hashCode() { + return Objects.hash(dataStream, backingIndices, storeSize, maximumTimestamp); + } + + @Override + public String toString() { + return "DataStreamStats{" + + "dataStream='" + dataStream + '\'' + + ", backingIndices=" + backingIndices + + ", storeSize=" + storeSize + + ", maximumTimestamp=" + maximumTimestamp + + '}'; + } + } + + public static class DataStreamShardStats implements Writeable { + private final ShardRouting shardRouting; + private final StoreStats storeStats; + private final long maxTimestamp; + + public DataStreamShardStats(ShardRouting shardRouting, StoreStats storeStats, long maxTimestamp) { + this.shardRouting = shardRouting; + this.storeStats = storeStats; + this.maxTimestamp = maxTimestamp; + } + + public DataStreamShardStats(StreamInput in) throws IOException { + this.shardRouting = new ShardRouting(in); + this.storeStats = new StoreStats(in); + this.maxTimestamp = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardRouting.writeTo(out); + storeStats.writeTo(out); + out.writeVLong(maxTimestamp); + } + + public ShardRouting getShardRouting() { + return shardRouting; + } + + public StoreStats getStoreStats() { + return storeStats; + } + + public long getMaxTimestamp() { + return maxTimestamp; + } + } + + private static class AggregatedStats { + Set backingIndices = new HashSet<>(); + long storageBytes = 0L; + long maxTimestamp = 0L; + } + + public static class TransportAction extends TransportBroadcastByNodeAction { + + private final ClusterService clusterService; + private final IndicesService indicesService; + private final IndexAbstractionResolver indexAbstractionResolver; + + @Inject + public TransportAction(ClusterService clusterService, TransportService transportService, IndicesService indicesService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(DataStreamsStatsAction.NAME, clusterService, transportService, actionFilters, indexNameExpressionResolver, + Request::new, ThreadPool.Names.MANAGEMENT); + this.clusterService = clusterService; + this.indicesService = indicesService; + this.indexAbstractionResolver = new IndexAbstractionResolver(indexNameExpressionResolver); + } + + @Override + protected Request readRequestFrom(StreamInput in) throws IOException { + return new Request(in); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); + } + + @Override + protected ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices) { + String[] requestIndices = request.indices(); + if (requestIndices == null || requestIndices.length == 0) { + requestIndices = new String[]{"*"}; + } + List abstractionNames = indexAbstractionResolver.resolveIndexAbstractions(requestIndices, request.indicesOptions(), + clusterState.getMetadata(), true); // Always include data streams for data streams stats api + SortedMap indicesLookup = clusterState.getMetadata().getIndicesLookup(); + + String[] concreteDatastreamIndices = abstractionNames.stream().flatMap(abstractionName -> { + IndexAbstraction indexAbstraction = indicesLookup.get(abstractionName); + assert indexAbstraction != null; + if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) { + IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) indexAbstraction; + List indices = dataStream.getIndices(); + return indices.stream().map(idx -> idx.getIndex().getName()); + } else { + return Stream.empty(); + } + }).toArray(String[]::new); + return clusterState.getRoutingTable().allShards(concreteDatastreamIndices); + } + + @Override + protected DataStreamShardStats shardOperation(Request request, ShardRouting shardRouting) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); + // if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet + if (indexShard.routingEntry() == null) { + throw new ShardNotFoundException(indexShard.shardId()); + } + StoreStats storeStats = indexShard.storeStats(); + IndexAbstraction indexAbstraction = clusterService.state().getMetadata().getIndicesLookup().get(shardRouting.getIndexName()); + assert indexAbstraction != null; + IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream(); + assert dataStream != null; + long maxTimestamp = 0L; + try (Engine.Searcher searcher = indexShard.acquireSearcher("data_stream_stats")) { + IndexReader indexReader = searcher.getIndexReader(); + String fieldName = dataStream.getDataStream().getTimeStampField().getName(); + byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, fieldName); + if (maxPackedValue != null) { + maxTimestamp = LongPoint.decodeDimension(maxPackedValue, 0); + } + } + return new DataStreamShardStats( + indexShard.routingEntry(), + storeStats, + maxTimestamp + ); + } + + @Override + protected DataStreamShardStats readShardResult(StreamInput in) throws IOException { + return new DataStreamShardStats(in); + } + + @Override + protected Response newResponse(Request request, int totalShards, int successfulShards, + int failedShards, List dataStreamShardStats, + List shardFailures, + ClusterState clusterState) { + Map aggregatedDataStreamsStats = new HashMap<>(); + Set allBackingIndices = new HashSet<>(); + long totalStoreSizeBytes = 0L; + + SortedMap indicesLookup = clusterState.getMetadata().getIndicesLookup(); + for (DataStreamShardStats shardStat : dataStreamShardStats) { + String indexName = shardStat.getShardRouting().getIndexName(); + IndexAbstraction indexAbstraction = indicesLookup.get(indexName); + IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream(); + assert dataStream != null; + + // Aggregate global stats + totalStoreSizeBytes += shardStat.getStoreStats().sizeInBytes(); + allBackingIndices.add(indexName); + + // Aggregate data stream stats + AggregatedStats stats = aggregatedDataStreamsStats.computeIfAbsent(dataStream.getName(), s -> new AggregatedStats()); + stats.storageBytes += shardStat.getStoreStats().sizeInBytes(); + stats.maxTimestamp = Math.max(stats.maxTimestamp, shardStat.getMaxTimestamp()); + stats.backingIndices.add(indexName); + } + + DataStreamStats[] dataStreamStats = aggregatedDataStreamsStats.entrySet().stream() + .map(entry -> new DataStreamStats( + entry.getKey(), + entry.getValue().backingIndices.size(), + new ByteSizeValue(entry.getValue().storageBytes), + entry.getValue().maxTimestamp)) + .toArray(DataStreamStats[]::new); + + return new Response( + totalShards, + successfulShards, + failedShards, + shardFailures, + aggregatedDataStreamsStats.size(), + allBackingIndices.size(), + new ByteSizeValue(totalStoreSizeBytes), + dataStreamStats + ); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java new file mode 100644 index 00000000000..7a0b93ff446 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -0,0 +1,233 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.index.Index; +import org.elasticsearch.snapshots.SnapshotInProgressException; +import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class DeleteDataStreamAction extends ActionType { + + private static final Logger logger = LogManager.getLogger(DeleteDataStreamAction.class); + + public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/delete"; + + private DeleteDataStreamAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends MasterNodeRequest implements IndicesRequest.Replaceable { + + private String[] names; + + public Request(String[] names) { + this.names = Objects.requireNonNull(names); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (CollectionUtils.isEmpty(names)) { + validationException = addValidationError("no data stream(s) specified", validationException); + } + return validationException; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.names = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(names); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(names, request.names); + } + + @Override + public int hashCode() { + return Arrays.hashCode(names); + } + + @Override + public String[] indices() { + return names; + } + + @Override + public IndicesOptions indicesOptions() { + // this doesn't really matter since data stream name resolution isn't affected by IndicesOptions and + // a data stream's backing indices are retrieved from its metadata + return IndicesOptions.fromOptions(false, true, true, true, false, false, true, false); + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public IndicesRequest indices(String... indices) { + this.names = indices; + return this; + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + private final MetadataDeleteIndexService deleteIndexService; + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + MetadataDeleteIndexService deleteIndexService) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.deleteIndexService = deleteIndexService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("remove-data-stream [" + Strings.arrayToCommaDelimitedString(request.names) + "]", + new ClusterStateUpdateTask(Priority.HIGH) { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) { + return removeDataStream(deleteIndexService, currentState, request); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } + + static ClusterState removeDataStream(MetadataDeleteIndexService deleteIndexService, ClusterState currentState, Request request) { + Set dataStreams = new HashSet<>(); + Set snapshottingDataStreams = new HashSet<>(); + for (String name : request.names) { + for (String dataStreamName : currentState.metadata().dataStreams().keySet()) { + if (Regex.simpleMatch(name, dataStreamName)) { + dataStreams.add(dataStreamName); + } + } + + snapshottingDataStreams.addAll(SnapshotsService.snapshottingDataStreams(currentState, dataStreams)); + } + + if (snapshottingDataStreams.isEmpty() == false) { + throw new SnapshotInProgressException("Cannot delete data streams that are being snapshotted: " + snapshottingDataStreams + + ". Try again after snapshot finishes or cancel the currently running snapshot."); + } + + Set backingIndicesToRemove = new HashSet<>(); + for (String dataStreamName : dataStreams) { + DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName); + assert dataStream != null; + backingIndicesToRemove.addAll(dataStream.getIndices()); + } + + // first delete the data streams and then the indices: + // (this to avoid data stream validation from failing when deleting an index that is part of a data stream + // without updating the data stream) + // TODO: change order when delete index api also updates the data stream the index to be removed is member of + Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + for (String ds : dataStreams) { + logger.info("removing data stream [{}]", ds); + metadata.removeDataStream(ds); + } + currentState = ClusterState.builder(currentState).metadata(metadata).build(); + return deleteIndexService.deleteIndices(currentState, backingIndicesToRemove); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamAction.java new file mode 100644 index 00000000000..16696eb7153 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamAction.java @@ -0,0 +1,323 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterStateHealth; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class GetDataStreamAction extends ActionType { + + public static final GetDataStreamAction INSTANCE = new GetDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/get"; + + private GetDataStreamAction() { + super(NAME, Response::new); + } + + public static class Request extends MasterNodeReadRequest implements IndicesRequest.Replaceable { + + private String[] names; + + public Request(String[] names) { + this.names = names; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.names = in.readOptionalStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalStringArray(names); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(names, request.names); + } + + @Override + public int hashCode() { + return Arrays.hashCode(names); + } + + @Override + public String[] indices() { + return names; + } + + @Override + public IndicesOptions indicesOptions() { + // this doesn't really matter since data stream name resolution isn't affected by IndicesOptions and + // a data stream's backing indices are retrieved from its metadata + return IndicesOptions.fromOptions(false, true, true, true, false, false, true, false); + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public IndicesRequest indices(String... indices) { + this.names = indices; + return this; + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + public static final ParseField DATASTREAMS_FIELD = new ParseField("data_streams"); + + public static class DataStreamInfo extends AbstractDiffable implements ToXContentObject { + + public static final ParseField STATUS_FIELD = new ParseField("status"); + public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template"); + public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy"); + + DataStream dataStream; + ClusterHealthStatus dataStreamStatus; + @Nullable String indexTemplate; + @Nullable String ilmPolicyName; + + public DataStreamInfo(DataStream dataStream, ClusterHealthStatus dataStreamStatus, @Nullable String indexTemplate, + @Nullable String ilmPolicyName) { + this.dataStream = dataStream; + this.dataStreamStatus = dataStreamStatus; + this.indexTemplate = indexTemplate; + this.ilmPolicyName = ilmPolicyName; + } + + public DataStreamInfo(StreamInput in) throws IOException { + this(new DataStream(in), ClusterHealthStatus.readFrom(in), in.readOptionalString(), in.readOptionalString()); + } + + public DataStream getDataStream() { + return dataStream; + } + + public ClusterHealthStatus getDataStreamStatus() { + return dataStreamStatus; + } + + @Nullable + public String getIndexTemplate() { + return indexTemplate; + } + + @Nullable + public String getIlmPolicy() { + return ilmPolicyName; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + dataStream.writeTo(out); + dataStreamStatus.writeTo(out); + out.writeOptionalString(indexTemplate); + out.writeOptionalString(ilmPolicyName); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DataStream.NAME_FIELD.getPreferredName(), dataStream.getName()); + builder.field(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName(), dataStream.getTimeStampField()); + builder.field(DataStream.INDICES_FIELD.getPreferredName(), dataStream.getIndices()); + builder.field(DataStream.GENERATION_FIELD.getPreferredName(), dataStream.getGeneration()); + builder.field(STATUS_FIELD.getPreferredName(), dataStreamStatus); + if (indexTemplate != null) { + builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate); + } + if (ilmPolicyName != null) { + builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DataStreamInfo that = (DataStreamInfo) o; + return dataStream.equals(that.dataStream) && + dataStreamStatus == that.dataStreamStatus && + Objects.equals(indexTemplate, that.indexTemplate) && + Objects.equals(ilmPolicyName, that.ilmPolicyName); + } + + @Override + public int hashCode() { + return Objects.hash(dataStream, dataStreamStatus, indexTemplate, ilmPolicyName); + } + } + + private final List dataStreams; + + public Response(List dataStreams) { + this.dataStreams = dataStreams; + } + + public Response(StreamInput in) throws IOException { + this(in.readList(DataStreamInfo::new)); + } + + public List getDataStreams() { + return dataStreams; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(dataStreams); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray(DATASTREAMS_FIELD.getPreferredName()); + for (DataStreamInfo dataStream : dataStreams) { + dataStream.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return dataStreams.equals(response.dataStreams); + } + + @Override + public int hashCode() { + return Objects.hash(dataStreams); + } + } + + public static class TransportAction extends TransportMasterNodeReadAction { + + private static final Logger logger = LogManager.getLogger(TransportAction.class); + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response read(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { + List dataStreams = getDataStreams(state, indexNameExpressionResolver, request); + List dataStreamInfos = new ArrayList<>(dataStreams.size()); + for (DataStream dataStream : dataStreams) { + String indexTemplate = MetadataIndexTemplateService.findV2Template(state.metadata(), dataStream.getName(), false); + String ilmPolicyName = null; + if (indexTemplate != null) { + Settings settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), indexTemplate); + ilmPolicyName = settings.get("index.lifecycle.name"); + } else { + logger.warn("couldn't find any matching template for data stream [{}]. has it been restored (and possibly renamed)" + + "from a snapshot?", dataStream.getName()); + } + ClusterStateHealth streamHealth = new ClusterStateHealth(state, + dataStream.getIndices().stream().map(Index::getName).toArray(String[]::new)); + dataStreamInfos.add(new Response.DataStreamInfo(dataStream, streamHealth.getStatus(), indexTemplate, ilmPolicyName)); + } + listener.onResponse(new Response(dataStreamInfos)); + } + + static List getDataStreams(ClusterState clusterState, IndexNameExpressionResolver iner, Request request) { + List results = iner.dataStreamNames(clusterState, request.indicesOptions(), request.names); + Map dataStreams = clusterState.metadata().dataStreams(); + + return results.stream() + .map(dataStreams::get) + .sorted(Comparator.comparing(DataStream::getName)) + .collect(Collectors.toList()); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index e73f6a1cf48..f63ecd4d39e 100644 --- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -39,6 +39,9 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; @@ -841,6 +844,36 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ void rolloverIndex(RolloverRequest request, ActionListener listener); + /** + * Store a data stream + */ + void createDataStream(CreateDataStreamAction.Request request, ActionListener listener); + + /** + * Store a data stream + */ + ActionFuture createDataStream(CreateDataStreamAction.Request request); + + /** + * Delete a data stream + */ + void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener); + + /** + * Delete a data stream + */ + ActionFuture deleteDataStream(DeleteDataStreamAction.Request request); + + /** + * Get data streams + */ + void getDataStreams(GetDataStreamAction.Request request, ActionListener listener); + + /** + * Get data streams + */ + ActionFuture getDataStreams(GetDataStreamAction.Request request); + /** * Resolves names and wildcard expressions to indices, aliases, and data streams */ diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 12fae90ab95..d5ad6d73e1b 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -169,6 +169,9 @@ import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDangli import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction; import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; @@ -1814,6 +1817,36 @@ public abstract class AbstractClient implements Client { execute(GetSettingsAction.INSTANCE, request, listener); } + @Override + public void createDataStream(CreateDataStreamAction.Request request, ActionListener listener) { + execute(CreateDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture createDataStream(CreateDataStreamAction.Request request) { + return execute(CreateDataStreamAction.INSTANCE, request); + } + + @Override + public void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener) { + execute(DeleteDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture deleteDataStream(DeleteDataStreamAction.Request request) { + return execute(DeleteDataStreamAction.INSTANCE, request); + } + + @Override + public void getDataStreams(GetDataStreamAction.Request request, ActionListener listener) { + execute(GetDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture getDataStreams(GetDataStreamAction.Request request) { + return execute(GetDataStreamAction.INSTANCE, request); + } + @Override public void resolveIndex(ResolveIndexAction.Request request, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java new file mode 100644 index 00000000000..94dd6c9f0d0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class RestCreateDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "create_data_stream_action"; + } + + @Override + public List routes() { + return Collections.singletonList( + new Route(RestRequest.Method.PUT, "/_data_stream/{name}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(request.param("name")); + return channel -> client.admin().indices().createDataStream(putDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDataStreamsStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDataStreamsStatsAction.java new file mode 100644 index 00000000000..a9fc8816d48 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDataStreamsStatsAction.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class RestDataStreamsStatsAction extends BaseRestHandler { + @Override + public String getName() { + return "data_stream_stats_action"; + } + + @Override + public List routes() { + return Arrays.asList( + new Route(RestRequest.Method.GET, "/_data_stream/_stats"), + new Route(RestRequest.Method.GET, "/_data_stream/{name}/_stats") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + DataStreamsStatsAction.Request dataStreamsStatsRequest = new DataStreamsStatsAction.Request(); + boolean forbidClosedIndices = request.paramAsBoolean("forbid_closed_indices", true); + IndicesOptions defaultIndicesOption = forbidClosedIndices ? dataStreamsStatsRequest.indicesOptions() + : IndicesOptions.strictExpandOpen(); + assert dataStreamsStatsRequest.indicesOptions() == IndicesOptions.strictExpandOpenAndForbidClosed() : "DataStreamStats default " + + "indices options changed"; + dataStreamsStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, defaultIndicesOption)); + dataStreamsStatsRequest.indices(Strings.splitStringByCommaToArray(request.param("name"))); + return channel -> client.execute(DataStreamsStatsAction.INSTANCE, dataStreamsStatsRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java new file mode 100644 index 00000000000..f963444ecb6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class RestDeleteDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "delete_data_stream_action"; + } + + @Override + public List routes() { + return Collections.singletonList(new Route(RestRequest.Method.DELETE, "/_data_stream/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request( + Strings.splitStringByCommaToArray(request.param("name"))); + return channel -> client.admin().indices().deleteDataStream(deleteDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java new file mode 100644 index 00000000000..569715c0e5e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +public class RestGetDataStreamsAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_data_streams_action"; + } + + @Override + public List routes() { + return org.elasticsearch.common.collect.List.of( + new Route(RestRequest.Method.GET, "/_data_stream"), + new Route(RestRequest.Method.GET, "/_data_stream/{name}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + GetDataStreamAction.Request getDataStreamsRequest = new GetDataStreamAction.Request( + Strings.splitStringByCommaToArray(request.param("name"))); + return channel -> client.admin().indices().getDataStreams(getDataStreamsRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java new file mode 100644 index 00000000000..863c2f3245e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLength(8)); + } + + public void testValidateRequest() { + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream"); + ActionRequestValidationException e = req.validate(); + assertNull(e); + } + + public void testValidateRequestWithoutName() { + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(""); + ActionRequestValidationException e = req.validate(); + assertNotNull(e); + assertThat(e.validationErrors().size(), equalTo(1)); + assertThat(e.validationErrors().get(0), containsString("name is missing")); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DataStreamsStatsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DataStreamsStatsResponseTests.java new file mode 100644 index 00000000000..c64a342e605 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DataStreamsStatsResponseTests.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +public class DataStreamsStatsResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return DataStreamsStatsAction.Response::new; + } + + @Override + protected DataStreamsStatsAction.Response createTestInstance() { + return randomStatsResponse(); + } + + public static DataStreamsStatsAction.Response randomStatsResponse() { + int dataStreamCount = randomInt(10); + int backingIndicesTotal = 0; + long totalStoreSize = 0L; + ArrayList dataStreamStats = new ArrayList<>(); + for (int i = 0; i < dataStreamCount; i++) { + String dataStreamName = randomAlphaOfLength(8).toLowerCase(Locale.getDefault()); + int backingIndices = randomInt(5); + backingIndicesTotal += backingIndices; + long storeSize = randomLongBetween(250, 1000000000); + totalStoreSize += storeSize; + long maximumTimestamp = randomRecentTimestamp(); + dataStreamStats.add(new DataStreamsStatsAction.DataStreamStats(dataStreamName, backingIndices, + new ByteSizeValue(storeSize), maximumTimestamp)); + } + int totalShards = randomIntBetween(backingIndicesTotal, backingIndicesTotal * 3); + int successfulShards = randomInt(totalShards); + int failedShards = totalShards - successfulShards; + List exceptions = new ArrayList<>(); + for (int i = 0; i < failedShards; i++) { + exceptions.add(new DefaultShardOperationFailedException(randomAlphaOfLength(8).toLowerCase(Locale.getDefault()), + randomInt(totalShards), new ElasticsearchException("boom"))); + } + return new DataStreamsStatsAction.Response(totalShards, successfulShards, failedShards, exceptions, + dataStreamCount, backingIndicesTotal, new ByteSizeValue(totalStoreSize), + dataStreamStats.toArray(new DataStreamsStatsAction.DataStreamStats[0])); + } + + private static long randomRecentTimestamp() { + long base = System.currentTimeMillis(); + return randomLongBetween(base - TimeUnit.HOURS.toMillis(1), base); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java new file mode 100644 index 00000000000..a899b97a65e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -0,0 +1,222 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInProgressException; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomArray(1, 3, String[]::new, () -> randomAlphaOfLength(6))); + } + + public void testValidateRequest() { + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{"my-data-stream"}); + ActionRequestValidationException e = req.validate(); + assertNull(e); + } + + public void testValidateRequestWithoutName() { + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[0]); + ActionRequestValidationException e = req.validate(); + assertNotNull(e); + assertThat(e.validationErrors().size(), equalTo(1)); + assertThat(e.validationErrors().get(0), containsString("no data stream(s) specified")); + } + + public void testDeleteDataStream() { + final String dataStreamName = "my-data-stream"; + final List otherIndices = randomSubsetOf(org.elasticsearch.common.collect.List.of("foo", "bar", "baz")); + ClusterState cs = getClusterStateWithDataStreams( + org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, 2)), otherIndices); + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{dataStreamName}); + ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req); + assertThat(newState.metadata().dataStreams().size(), equalTo(0)); + assertThat(newState.metadata().indices().size(), equalTo(otherIndices.size())); + for (String indexName : otherIndices) { + assertThat(newState.metadata().indices().get(indexName).getIndex().getName(), equalTo(indexName)); + } + } + + public void testDeleteMultipleDataStreams() { + String[] dataStreamNames = {"foo", "bar", "baz", "eggplant"}; + ClusterState cs = getClusterStateWithDataStreams(org.elasticsearch.common.collect.List.of( + new Tuple<>(dataStreamNames[0], randomIntBetween(1, 3)), + new Tuple<>(dataStreamNames[1], randomIntBetween(1, 3)), + new Tuple<>(dataStreamNames[2], randomIntBetween(1, 3)), + new Tuple<>(dataStreamNames[3], randomIntBetween(1, 3)) + ), org.elasticsearch.common.collect.List.of()); + + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{"ba*", "eggplant"}); + ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req); + assertThat(newState.metadata().dataStreams().size(), equalTo(1)); + DataStream remainingDataStream = newState.metadata().dataStreams().get(dataStreamNames[0]); + assertNotNull(remainingDataStream); + assertThat(newState.metadata().indices().size(), equalTo(remainingDataStream.getIndices().size())); + for (Index i : remainingDataStream.getIndices()) { + assertThat(newState.metadata().indices().get(i.getName()).getIndex(), equalTo(i)); + } + } + + public void testDeleteSnapshottingDataStream() { + final String dataStreamName = "my-data-stream1"; + final String dataStreamName2 = "my-data-stream2"; + final List otherIndices = randomSubsetOf(Arrays.asList("foo", "bar", "baz")); + + ClusterState cs = getClusterStateWithDataStreams(Arrays.asList(new Tuple<>(dataStreamName, 2), new Tuple<>(dataStreamName2, 2)), + otherIndices); + SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.of(Arrays.asList( + createEntry(dataStreamName, "repo1", false), + createEntry(dataStreamName2, "repo2", true))); + ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build(); + + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{dataStreamName}); + SnapshotInProgressException e = expectThrows(SnapshotInProgressException.class, + () -> DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), snapshotCs, req)); + + assertThat(e.getMessage(), equalTo("Cannot delete data streams that are being snapshotted: [my-data-stream1]. Try again after " + + "snapshot finishes or cancel the currently running snapshot.")); + } + + private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) { + return new SnapshotsInProgress.Entry(new Snapshot(repo, new SnapshotId("", "")), false, partial, + SnapshotsInProgress.State.STARTED, Collections.emptyList(), Collections.singletonList(dataStreamName), 0, 1, + ImmutableOpenMap.of(), null, null, null); + } + + public void testDeleteNonexistentDataStream() { + final String dataStreamName = "my-data-stream"; + String[] dataStreamNames = {"foo", "bar", "baz", "eggplant"}; + ClusterState cs = getClusterStateWithDataStreams(org.elasticsearch.common.collect.List.of( + new Tuple<>(dataStreamNames[0], randomIntBetween(1, 3)), + new Tuple<>(dataStreamNames[1], randomIntBetween(1, 3)), + new Tuple<>(dataStreamNames[2], randomIntBetween(1, 3)), + new Tuple<>(dataStreamNames[3], randomIntBetween(1, 3)) + ), org.elasticsearch.common.collect.List.of()); + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{dataStreamName}); + ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req); + assertThat(newState.metadata().dataStreams().size(), equalTo(cs.metadata().dataStreams().size())); + assertThat(newState.metadata().dataStreams().keySet(), + containsInAnyOrder(cs.metadata().dataStreams().keySet().toArray(Strings.EMPTY_ARRAY))); + } + + @SuppressWarnings("unchecked") + private static MetadataDeleteIndexService getMetadataDeleteIndexService() { + MetadataDeleteIndexService s = mock(MetadataDeleteIndexService.class); + when(s.deleteIndices(any(ClusterState.class), any(Set.class))) + .thenAnswer(mockInvocation -> { + ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0]; + Set indices = (Set) mockInvocation.getArguments()[1]; + + final Metadata.Builder b = Metadata.builder(currentState.metadata()); + for (Index index : indices) { + b.remove(index.getName()); + } + + return ClusterState.builder(currentState).metadata(b.build()).build(); + }); + + return s; + } + + /** + * Constructs {@code ClusterState} with the specified data streams and indices. + * + * @param dataStreams The names of the data streams to create with their respective number of backing indices + * @param indexNames The names of indices to create that do not back any data streams + */ + public static ClusterState getClusterStateWithDataStreams(List> dataStreams, List indexNames) { + Metadata.Builder builder = Metadata.builder(); + + List allIndices = new ArrayList<>(); + for (Tuple dsTuple : dataStreams) { + List backingIndices = new ArrayList<>(); + for (int backingIndexNumber = 1; backingIndexNumber <= dsTuple.v2(); backingIndexNumber++) { + backingIndices.add(createIndexMetadata(DataStream.getDefaultBackingIndexName(dsTuple.v1(), backingIndexNumber), true)); + } + allIndices.addAll(backingIndices); + + DataStream ds = new DataStream(dsTuple.v1(), createTimestampField("@timestamp"), + backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), dsTuple.v2()); + builder.put(ds); + } + + for (String indexName : indexNames) { + allIndices.add(createIndexMetadata(indexName, false)); + } + + for (IndexMetadata index : allIndices) { + builder.put(index, false); + } + + return ClusterState.builder(new ClusterName("_name")).metadata(builder).build(); + } + + private static IndexMetadata createIndexMetadata(String name, boolean hidden) { + Settings.Builder b = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.hidden", hidden); + + return IndexMetadata.builder(name) + .settings(b) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java new file mode 100644 index 00000000000..c0a29e00bf0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java @@ -0,0 +1,143 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.List; + +import static org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests.getClusterStateWithDataStreams; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + final String[] searchParameter; + switch (randomIntBetween(1, 4)) { + case 1: + searchParameter = generateRandomStringArray(3, 8, false, false); + break; + case 2: + String[] parameters = generateRandomStringArray(3, 8, false, false); + for (int k = 0; k < parameters.length; k++) { + parameters[k] = parameters[k] + "*"; + } + searchParameter = parameters; + break; + case 3: + searchParameter = new String[]{"*"}; + break; + default: + searchParameter = null; + break; + } + return new Request(searchParameter); + } + + public void testGetDataStream() { + final String dataStreamName = "my-data-stream"; + ClusterState cs = getClusterStateWithDataStreams( + org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, 1)), org.elasticsearch.common.collect.List.of()); + GetDataStreamAction.Request req = new GetDataStreamAction.Request(new String[]{dataStreamName}); + List dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req); + assertThat(dataStreams.size(), equalTo(1)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamName)); + } + + public void testGetDataStreamsWithWildcards() { + final String[] dataStreamNames = {"my-data-stream", "another-data-stream"}; + ClusterState cs = getClusterStateWithDataStreams( + org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamNames[0], 1), new Tuple<>(dataStreamNames[1], 1)), + org.elasticsearch.common.collect.List.of()); + + GetDataStreamAction.Request req = new GetDataStreamAction.Request(new String[]{dataStreamNames[1].substring(0, 5) + "*"}); + List dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req); + assertThat(dataStreams.size(), equalTo(1)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1])); + + req = new GetDataStreamAction.Request(new String[]{"*"}); + dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req); + assertThat(dataStreams.size(), equalTo(2)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1])); + assertThat(dataStreams.get(1).getName(), equalTo(dataStreamNames[0])); + + req = new GetDataStreamAction.Request((String[]) null); + dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req); + assertThat(dataStreams.size(), equalTo(2)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1])); + assertThat(dataStreams.get(1).getName(), equalTo(dataStreamNames[0])); + + req = new GetDataStreamAction.Request(new String[]{"matches-none*"}); + dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req); + assertThat(dataStreams.size(), equalTo(0)); + } + + public void testGetDataStreamsWithoutWildcards() { + final String[] dataStreamNames = {"my-data-stream", "another-data-stream"}; + ClusterState cs = getClusterStateWithDataStreams( + org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamNames[0], 1), new Tuple<>(dataStreamNames[1], 1)), + org.elasticsearch.common.collect.List.of()); + + GetDataStreamAction.Request req = new GetDataStreamAction.Request(new String[]{dataStreamNames[0], dataStreamNames[1]}); + List dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req); + assertThat(dataStreams.size(), equalTo(2)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1])); + assertThat(dataStreams.get(1).getName(), equalTo(dataStreamNames[0])); + + req = new GetDataStreamAction.Request(new String[]{dataStreamNames[1]}); + dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req); + assertThat(dataStreams.size(), equalTo(1)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1])); + + req = new GetDataStreamAction.Request(new String[]{dataStreamNames[0]}); + dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req); + assertThat(dataStreams.size(), equalTo(1)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[0])); + + GetDataStreamAction.Request req2 = new GetDataStreamAction.Request(new String[]{"foo"}); + IndexNotFoundException e = expectThrows(IndexNotFoundException.class, + () -> GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req2)); + assertThat(e.getMessage(), containsString("no such index [foo]")); + } + + public void testGetNonexistentDataStream() { + final String dataStreamName = "my-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + GetDataStreamAction.Request req = new GetDataStreamAction.Request(new String[]{dataStreamName}); + IndexNotFoundException e = expectThrows(IndexNotFoundException.class, + () -> GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req)); + assertThat(e.getMessage(), containsString("no such index [" + dataStreamName + "]")); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java new file mode 100644 index 00000000000..e90924d3ddf --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction.Response; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.DataStreamTests; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.ArrayList; +import java.util.List; + +public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response createTestInstance() { + int numDataStreams = randomIntBetween(0, 8); + List dataStreams = new ArrayList<>(); + for (int i = 0; i < numDataStreams; i++) { + dataStreams.add(new Response.DataStreamInfo(DataStreamTests.randomInstance(), ClusterHealthStatus.GREEN, + randomAlphaOfLengthBetween(2, 10), randomAlphaOfLengthBetween(2, 10))); + } + return new Response(dataStreams); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequestTests.java index 354d38b2500..43ed4506bb4 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequestTests.java @@ -20,8 +20,8 @@ package org.elasticsearch.action.admin.indices.mapping.put; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -163,7 +163,7 @@ public class PutMappingRequestTests extends ESTestCase { tuple(dataStreamNames[1], randomIntBetween(1, 3)), tuple(dataStreamNames[2], randomIntBetween(1, 3))); - ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata, + ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata, org.elasticsearch.common.collect.List.of("index1", "index2", "index3")); cs = addAliases(cs, org.elasticsearch.common.collect.List.of( tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", true))), @@ -185,7 +185,7 @@ public class PutMappingRequestTests extends ESTestCase { tuple(dataStreamNames[1], randomIntBetween(1, 3)), tuple(dataStreamNames[2], randomIntBetween(1, 3))); - ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata, + ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata, org.elasticsearch.common.collect.List.of("index1", "index2", "index3")); cs = addAliases(cs, org.elasticsearch.common.collect.List.of( tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", true))), @@ -209,7 +209,7 @@ public class PutMappingRequestTests extends ESTestCase { tuple(dataStreamNames[1], randomIntBetween(1, 3)), tuple(dataStreamNames[2], randomIntBetween(1, 3))); - ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata, + ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata, org.elasticsearch.common.collect.List.of("index1", "index2", "index3")); cs = addAliases(cs, org.elasticsearch.common.collect.List.of( tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", true))), @@ -233,7 +233,7 @@ public class PutMappingRequestTests extends ESTestCase { tuple(dataStreamNames[1], randomIntBetween(1, 3)), tuple(dataStreamNames[2], randomIntBetween(1, 3))); - ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata, + ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata, org.elasticsearch.common.collect.List.of("index1", "index2", "index3")); final ClusterState cs2 = addAliases(cs, org.elasticsearch.common.collect.List.of( tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", true))), @@ -253,7 +253,7 @@ public class PutMappingRequestTests extends ESTestCase { tuple(dataStreamNames[1], randomIntBetween(1, 3)), tuple(dataStreamNames[2], randomIntBetween(1, 3))); - ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata, + ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata, org.elasticsearch.common.collect.List.of("index1", "index2", "index3")); final ClusterState cs2 = addAliases(cs, org.elasticsearch.common.collect.List.of( tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", false))), diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index e1bac1919b6..ff5509476e4 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamTests; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -261,7 +262,7 @@ public class MetadataRolloverServiceTests extends ESTestCase { public void testDataStreamValidation() throws IOException { Metadata.Builder md = Metadata.builder(); - DataStream randomDataStream = DataStreamTestHelper.randomInstance(); + DataStream randomDataStream = DataStreamTests.randomInstance(); for (Index index : randomDataStream.getIndices()) { md.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index)); } @@ -335,7 +336,7 @@ public class MetadataRolloverServiceTests extends ESTestCase { } public void testCreateIndexRequestForDataStream() { - DataStream dataStream = DataStreamTestHelper.randomInstance(); + DataStream dataStream = DataStreamTests.randomInstance(); final String newWriteIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1); final RolloverRequest rolloverRequest = new RolloverRequest(dataStream.getName(), randomAlphaOfLength(10)); final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE; @@ -540,7 +541,7 @@ public class MetadataRolloverServiceTests extends ESTestCase { } public void testRolloverClusterStateForDataStream() throws Exception { - final DataStream dataStream = DataStreamTestHelper.randomInstance(); + final DataStream dataStream = DataStreamTests.randomInstance(); ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStream.getName() + "*"), null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate()); Metadata.Builder builder = Metadata.builder(); @@ -636,7 +637,7 @@ public class MetadataRolloverServiceTests extends ESTestCase { final boolean useDataStream = randomBoolean(); final Metadata.Builder builder = Metadata.builder(); if (useDataStream) { - DataStream dataStream = DataStreamTestHelper.randomInstance(); + DataStream dataStream = DataStreamTests.randomInstance(); rolloverTarget = dataStream.getName(); sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName(); defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1); @@ -693,7 +694,7 @@ public class MetadataRolloverServiceTests extends ESTestCase { } public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception { - final DataStream dataStream = DataStreamTestHelper.randomInstance(); + final DataStream dataStream = DataStreamTests.randomInstance(); Metadata.Builder builder = Metadata.builder(); for (Index index : dataStream.getIndices()) { builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java index 46c3210a495..7ef4aad1a18 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.cluster.DataStreamTestHelper; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.test.AbstractNamedWriteableTestCase; @@ -37,7 +36,7 @@ public class DataStreamMetadataTests extends AbstractNamedWriteableTestCase dataStreams = new HashMap<>(); for (int i = 0; i < randomIntBetween(1, 5); i++) { - dataStreams.put(randomAlphaOfLength(5), DataStreamTestHelper.randomInstance()); + dataStreams.put(randomAlphaOfLength(5), DataStreamTests.randomInstance()); } return new DataStreamMetadata(dataStreams); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 826d5940245..0bccae72461 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.cluster.DataStreamTestHelper; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; @@ -36,6 +35,23 @@ import static org.hamcrest.Matchers.equalTo; public class DataStreamTests extends AbstractSerializingTestCase { + public static List randomIndexInstances() { + int numIndices = randomIntBetween(0, 128); + List indices = new ArrayList<>(numIndices); + for (int i = 0; i < numIndices; i++) { + indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()))); + } + return indices; + } + + public static DataStream randomInstance() { + List indices = randomIndexInstances(); + long generation = indices.size() + randomLongBetween(1, 128); + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random()))); + return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation); + } + @Override protected DataStream doParseInstance(XContentParser parser) throws IOException { return DataStream.fromXContent(parser); @@ -48,11 +64,11 @@ public class DataStreamTests extends AbstractSerializingTestCase { @Override protected DataStream createTestInstance() { - return DataStreamTestHelper.randomInstance(); + return randomInstance(); } public void testRollover() { - DataStream ds = DataStreamTestHelper.randomInstance(); + DataStream ds = randomInstance(); Index newWriteIndex = new Index(getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random())); DataStream rolledDs = ds.rollover(newWriteIndex); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java index e95e3698ce1..bdbc78b4f6f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java @@ -16,8 +16,10 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.cluster.metadata; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DataStreamTestHelper; diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java index b483b6372e2..83633386f47 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java @@ -23,9 +23,9 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.DataStreamTestHelper; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.block.ClusterBlock; @@ -401,7 +401,7 @@ public class MetadataIndexStateServiceTests extends ESTestCase { dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices)); writeIndices.add(DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices)); } - ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dataStreamsToCreate, + ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dataStreamsToCreate, org.elasticsearch.common.collect.List.of()); ClusterService clusterService = mock(ClusterService.class); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index c6c235b81f3..4ac77177b95 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -1289,7 +1289,7 @@ public class MetadataTests extends ESTestCase { .put("component_template_" + randomAlphaOfLength(3), ComponentTemplateTests.randomInstance()) .put("index_template_v2_" + randomAlphaOfLength(3), ComposableIndexTemplateTests.randomInstance()); - DataStream randomDataStream = DataStreamTestHelper.randomInstance(); + DataStream randomDataStream = DataStreamTests.randomInstance(); for (Index index : randomDataStream.getIndices()) { md.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index)); } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java index c8a7e3f1a8e..35178c49cdf 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.cluster; import org.apache.lucene.util.LuceneTestCase; @@ -89,6 +90,7 @@ public final class DataStreamTestHelper { " }"; } + public static List randomIndexInstances() { int numIndices = ESTestCase.randomIntBetween(0, 128); List indices = new ArrayList<>(numIndices); @@ -148,4 +150,6 @@ public final class DataStreamTestHelper { return IndexMetadata.builder(name).settings(b).numberOfShards(1).numberOfReplicas(1).build(); } + + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java index 9bb633b2efe..092a5b06efe 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java @@ -20,11 +20,14 @@ package org.elasticsearch.test; import com.carrotsearch.hppc.ObjectArrayList; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; @@ -76,6 +79,7 @@ public abstract class TestCluster implements Closeable { * Wipes any data that a test can leave behind: indices, templates (except exclude templates) and repositories */ public void wipe(Set excludeTemplates) { + wipeAllDataStreams(); wipeIndices("_all"); wipeAllTemplates(excludeTemplates); wipeRepositories(); @@ -131,6 +135,18 @@ public abstract class TestCluster implements Closeable { @Override public abstract void close() throws IOException; + /** + * Deletes all data streams from the test cluster. + */ + public void wipeAllDataStreams() { + // Feature flag may not be enabled in all gradle modules that use ESIntegTestCase + if (size() > 0) { + AcknowledgedResponse response = + client().admin().indices().deleteDataStream(new DeleteDataStreamAction.Request(new String[]{"*"})).actionGet(); + assertAcked(response); + } + } + /** * Deletes the given indices from the tests cluster. If no index name is passed to this method * all indices are removed. diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index f200d723415..93531cdb85b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -652,9 +652,7 @@ public abstract class ESRestTestCase extends ESTestCase { protected static void wipeDataStreams() throws IOException { try { - if (hasXPack()) { - adminClient().performRequest(new Request("DELETE", "_data_stream/*")); - } + adminClient().performRequest(new Request("DELETE", "_data_stream/*")); } catch (ResponseException e) { // We hit a version of ES that doesn't serialize DeleteDataStreamAction.Request#wildcardExpressionsOriginallySpecified field or // that doesn't support data streams so it's safe to ignore