[PURIFY] Revert "Move data stream transport and rest action to xpack (#59593)" (#28)

This commit reverts commit 2a89e13e43. Relicensing data streams from OSS to XPack.

Signed-off-by: Peter Nied <petern@amazon.com>
This commit is contained in:
Nick Knize 2021-02-02 16:25:35 -06:00 committed by Peter Nied
parent c856534394
commit 3f168ac85c
30 changed files with 2049 additions and 121 deletions

View File

@ -45,37 +45,6 @@ import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.core.MainRequest;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalysis;
import org.elasticsearch.client.ml.dataframe.evaluation.classification.AccuracyMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.classification.AucRocMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.classification.Classification;
import org.elasticsearch.client.ml.dataframe.evaluation.classification.MulticlassConfusionMatrixMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.classification.PrecisionMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.classification.RecallMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.outlierdetection.ConfusionMatrixMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.outlierdetection.OutlierDetection;
import org.elasticsearch.client.ml.dataframe.evaluation.regression.HuberMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.regression.MeanSquaredErrorMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.regression.MeanSquaredLogarithmicErrorMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.regression.RSquaredMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.regression.Regression;
import org.elasticsearch.client.ml.dataframe.stats.classification.ClassificationStats;
import org.elasticsearch.client.ml.dataframe.stats.outlierdetection.OutlierDetectionStats;
import org.elasticsearch.client.ml.dataframe.stats.regression.RegressionStats;
import org.elasticsearch.client.ml.inference.preprocessing.CustomWordEmbedding;
import org.elasticsearch.client.ml.inference.preprocessing.FrequencyEncoding;
import org.elasticsearch.client.ml.inference.preprocessing.NGram;
import org.elasticsearch.client.ml.inference.preprocessing.OneHotEncoding;
import org.elasticsearch.client.ml.inference.preprocessing.TargetMeanEncoding;
import org.elasticsearch.client.ml.inference.trainedmodel.ClassificationConfig;
import org.elasticsearch.client.ml.inference.trainedmodel.RegressionConfig;
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.Ensemble;
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.Exponent;
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.LogisticRegression;
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.WeightedMode;
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.WeightedSum;
import org.elasticsearch.client.ml.inference.trainedmodel.langident.LangIdentNeuralNetwork;
import org.elasticsearch.client.ml.inference.trainedmodel.tree.Tree;
import org.elasticsearch.client.transform.transforms.SyncConfig;
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
import org.elasticsearch.common.CheckedFunction;
@ -128,7 +97,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.client.ml.dataframe.evaluation.MlEvaluationNamedXContentProvider.registeredMetricName;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;

View File

@ -27,13 +27,6 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.xpack.XPackInfoRequest;
import org.elasticsearch.client.xpack.XPackInfoResponse;
import org.elasticsearch.client.xpack.XPackInfoResponse.BuildInfo;
import org.elasticsearch.client.xpack.XPackInfoResponse.FeatureSetsInfo;
import org.elasticsearch.client.xpack.XPackInfoResponse.LicenseInfo;
import org.elasticsearch.client.xpack.XPackUsageRequest;
import org.elasticsearch.client.xpack.XPackUsageResponse;
import java.io.IOException;
import java.time.Instant;
@ -94,58 +87,6 @@ public class MiscellaneousDocumentationIT extends ESRestHighLevelClientTestCase
assertTrue(response);
}
public void testXPackInfo() throws Exception {
RestHighLevelClient client = highLevelClient();
{
//tag::x-pack-info-execute
XPackInfoRequest request = new XPackInfoRequest();
request.setVerbose(true); // <1>
request.setCategories(EnumSet.of( // <2>
XPackInfoRequest.Category.BUILD,
XPackInfoRequest.Category.LICENSE,
XPackInfoRequest.Category.FEATURES));
XPackInfoResponse response = client.xpack().info(request, RequestOptions.DEFAULT);
//end::x-pack-info-execute
//tag::x-pack-info-response
BuildInfo build = response.getBuildInfo(); // <1>
LicenseInfo license = response.getLicenseInfo(); // <2>
assertThat(license.getExpiryDate(), is(greaterThan(Instant.now().toEpochMilli()))); // <3>
FeatureSetsInfo features = response.getFeatureSetsInfo(); // <4>
//end::x-pack-info-response
assertNotNull(response.getBuildInfo());
assertNotNull(response.getLicenseInfo());
assertNotNull(response.getFeatureSetsInfo());
}
{
XPackInfoRequest request = new XPackInfoRequest();
// tag::x-pack-info-execute-listener
ActionListener<XPackInfoResponse> listener = new ActionListener<XPackInfoResponse>() {
@Override
public void onResponse(XPackInfoResponse indexResponse) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::x-pack-info-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::x-pack-info-execute-async
client.xpack().infoAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::x-pack-info-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testInitializationFromClientBuilder() throws IOException {
//tag::rest-high-level-client-init
RestHighLevelClient client = new RestHighLevelClient(

View File

@ -20,7 +20,7 @@
package org.elasticsearch.client.indices;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.xpack.core.action.DataStreamsStatsAction;
import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.common.unit.ByteSizeValue;

View File

@ -19,8 +19,8 @@
package org.elasticsearch.client.indices;
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction.Response.DataStreamInfo;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStream;

View File

@ -106,7 +106,6 @@ import org.elasticsearch.action.admin.indices.cache.clear.TransportClearIndicesC
import org.elasticsearch.action.admin.indices.close.CloseIndexAction;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexAction;
@ -117,6 +116,10 @@ import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDangli
import org.elasticsearch.action.admin.indices.dangling.import_index.TransportImportDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction;
import org.elasticsearch.action.admin.indices.dangling.list.TransportListDanglingIndicesAction;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
@ -308,10 +311,12 @@ import org.elasticsearch.rest.action.admin.indices.RestAddIndexBlockAction;
import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction;
import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction;
import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestCreateDataStreamAction;
import org.elasticsearch.rest.action.admin.indices.RestCreateIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestDataStreamsStatsAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteComponentTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteComposableIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteComposableIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteDataStreamAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestFlushAction;
@ -319,7 +324,7 @@ import org.elasticsearch.rest.action.admin.indices.RestForceMergeAction;
import org.elasticsearch.rest.action.admin.indices.RestGetAliasesAction;
import org.elasticsearch.rest.action.admin.indices.RestGetComponentTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestGetComposableIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestGetComposableIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestGetDataStreamsAction;
import org.elasticsearch.rest.action.admin.indices.RestGetFieldMappingAction;
import org.elasticsearch.rest.action.admin.indices.RestGetIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestGetIndicesAction;
@ -593,7 +598,6 @@ public class ActionModule extends AbstractModule {
actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class);
actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class);
actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class);
//Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
@ -612,6 +616,13 @@ public class ActionModule extends AbstractModule {
actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);
// Data streams:
actions.register(CreateDataStreamAction.INSTANCE, CreateDataStreamAction.TransportAction.class);
actions.register(DeleteDataStreamAction.INSTANCE, DeleteDataStreamAction.TransportAction.class);
actions.register(GetDataStreamAction.INSTANCE, GetDataStreamAction.TransportAction.class);
actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class);
actions.register(DataStreamsStatsAction.INSTANCE, DataStreamsStatsAction.TransportAction.class);
// Persistent tasks:
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
@ -718,7 +729,6 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestUpgradeAction());
registerHandler.accept(new RestUpgradeStatusAction());
registerHandler.accept(new RestClearIndicesCacheAction());
registerHandler.accept(new RestResolveIndexAction());
registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
@ -771,6 +781,13 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestImportDanglingIndexAction());
registerHandler.accept(new RestDeleteDanglingIndexAction());
// Data Stream API
registerHandler.accept(new RestCreateDataStreamAction());
registerHandler.accept(new RestDeleteDataStreamAction());
registerHandler.accept(new RestGetDataStreamsAction());
registerHandler.accept(new RestResolveIndexAction());
registerHandler.accept(new RestDataStreamsStatsAction());
// CAT API
registerHandler.accept(new RestAllocationAction());
registerHandler.accept(new RestShardsAction());

View File

@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction();
public static final String NAME = "indices:admin/data_stream/create";
private CreateDataStreamAction() {
super(NAME, AcknowledgedResponse::new);
}
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest {
private final String name;
public Request(String name) {
this.name = name;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (Strings.hasText(name) == false) {
validationException = ValidateActions.addValidationError("name is missing", validationException);
}
return validationException;
}
public Request(StreamInput in) throws IOException {
super(in);
this.name = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return name.equals(request.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public String[] indices() {
return new String[]{name};
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final MetadataCreateDataStreamService metadataCreateDataStreamService;
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateDataStreamService metadataCreateDataStreamService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
CreateDataStreamClusterStateUpdateRequest updateRequest = new CreateDataStreamClusterStateUpdateRequest(
request.name,
request.masterNodeTimeout(),
request.timeout()
);
metadataCreateDataStreamService.createDataStream(updateRequest, listener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -0,0 +1,441 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.PointValues;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexAbstractionResolver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Stream;
public class DataStreamsStatsAction extends ActionType<DataStreamsStatsAction.Response> {
public static final DataStreamsStatsAction INSTANCE = new DataStreamsStatsAction();
public static final String NAME = "indices:monitor/data_stream/stats";
public DataStreamsStatsAction() {
super(NAME, DataStreamsStatsAction.Response::new);
}
public static class Request extends BroadcastRequest<Request> {
public Request() {
super((String[]) null);
}
public Request(StreamInput in) throws IOException {
super(in);
}
}
public static class Response extends BroadcastResponse {
private final int dataStreamCount;
private final int backingIndices;
private final ByteSizeValue totalStoreSize;
private final DataStreamStats[] dataStreams;
public Response(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures,
int dataStreamCount, int backingIndices, ByteSizeValue totalStoreSize, DataStreamStats[] dataStreams) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.dataStreamCount = dataStreamCount;
this.backingIndices = backingIndices;
this.totalStoreSize = totalStoreSize;
this.dataStreams = dataStreams;
}
public Response(StreamInput in) throws IOException {
super(in);
this.dataStreamCount = in.readVInt();
this.backingIndices = in.readVInt();
this.totalStoreSize = new ByteSizeValue(in);
this.dataStreams = in.readArray(DataStreamStats::new, DataStreamStats[]::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(dataStreamCount);
out.writeVInt(backingIndices);
totalStoreSize.writeTo(out);
out.writeArray(dataStreams);
}
@Override
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
builder.field("data_stream_count", dataStreamCount);
builder.field("backing_indices", backingIndices);
builder.humanReadableField("total_store_size_bytes", "total_store_size", totalStoreSize);
builder.array("data_streams", (Object[]) dataStreams);
}
public int getDataStreamCount() {
return dataStreamCount;
}
public int getBackingIndices() {
return backingIndices;
}
public ByteSizeValue getTotalStoreSize() {
return totalStoreSize;
}
public DataStreamStats[] getDataStreams() {
return dataStreams;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Response response = (Response) obj;
return dataStreamCount == response.dataStreamCount &&
backingIndices == response.backingIndices &&
Objects.equals(totalStoreSize, response.totalStoreSize) &&
Arrays.equals(dataStreams, response.dataStreams);
}
@Override
public int hashCode() {
int result = Objects.hash(dataStreamCount, backingIndices, totalStoreSize);
result = 31 * result + Arrays.hashCode(dataStreams);
return result;
}
@Override
public String toString() {
return "Response{" +
"dataStreamCount=" + dataStreamCount +
", backingIndices=" + backingIndices +
", totalStoreSize=" + totalStoreSize +
", dataStreams=" + Arrays.toString(dataStreams) +
'}';
}
}
public static class DataStreamStats implements ToXContentObject, Writeable {
private final String dataStream;
private final int backingIndices;
private final ByteSizeValue storeSize;
private final long maximumTimestamp;
public DataStreamStats(String dataStream, int backingIndices, ByteSizeValue storeSize, long maximumTimestamp) {
this.dataStream = dataStream;
this.backingIndices = backingIndices;
this.storeSize = storeSize;
this.maximumTimestamp = maximumTimestamp;
}
public DataStreamStats(StreamInput in) throws IOException {
this.dataStream = in.readString();
this.backingIndices = in.readVInt();
this.storeSize = new ByteSizeValue(in);
this.maximumTimestamp = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(dataStream);
out.writeVInt(backingIndices);
storeSize.writeTo(out);
out.writeVLong(maximumTimestamp);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("data_stream", dataStream);
builder.field("backing_indices", backingIndices);
builder.humanReadableField("store_size_bytes", "store_size", storeSize);
builder.field("maximum_timestamp", maximumTimestamp);
builder.endObject();
return builder;
}
public String getDataStream() {
return dataStream;
}
public int getBackingIndices() {
return backingIndices;
}
public ByteSizeValue getStoreSize() {
return storeSize;
}
public long getMaximumTimestamp() {
return maximumTimestamp;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DataStreamStats that = (DataStreamStats) obj;
return backingIndices == that.backingIndices &&
maximumTimestamp == that.maximumTimestamp &&
Objects.equals(dataStream, that.dataStream) &&
Objects.equals(storeSize, that.storeSize);
}
@Override
public int hashCode() {
return Objects.hash(dataStream, backingIndices, storeSize, maximumTimestamp);
}
@Override
public String toString() {
return "DataStreamStats{" +
"dataStream='" + dataStream + '\'' +
", backingIndices=" + backingIndices +
", storeSize=" + storeSize +
", maximumTimestamp=" + maximumTimestamp +
'}';
}
}
public static class DataStreamShardStats implements Writeable {
private final ShardRouting shardRouting;
private final StoreStats storeStats;
private final long maxTimestamp;
public DataStreamShardStats(ShardRouting shardRouting, StoreStats storeStats, long maxTimestamp) {
this.shardRouting = shardRouting;
this.storeStats = storeStats;
this.maxTimestamp = maxTimestamp;
}
public DataStreamShardStats(StreamInput in) throws IOException {
this.shardRouting = new ShardRouting(in);
this.storeStats = new StoreStats(in);
this.maxTimestamp = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);
storeStats.writeTo(out);
out.writeVLong(maxTimestamp);
}
public ShardRouting getShardRouting() {
return shardRouting;
}
public StoreStats getStoreStats() {
return storeStats;
}
public long getMaxTimestamp() {
return maxTimestamp;
}
}
private static class AggregatedStats {
Set<String> backingIndices = new HashSet<>();
long storageBytes = 0L;
long maxTimestamp = 0L;
}
public static class TransportAction extends TransportBroadcastByNodeAction<Request, Response, DataStreamShardStats> {
private final ClusterService clusterService;
private final IndicesService indicesService;
private final IndexAbstractionResolver indexAbstractionResolver;
@Inject
public TransportAction(ClusterService clusterService, TransportService transportService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(DataStreamsStatsAction.NAME, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.indexAbstractionResolver = new IndexAbstractionResolver(indexNameExpressionResolver);
}
@Override
protected Request readRequestFrom(StreamInput in) throws IOException {
return new Request(in);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}
@Override
protected ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices) {
String[] requestIndices = request.indices();
if (requestIndices == null || requestIndices.length == 0) {
requestIndices = new String[]{"*"};
}
List<String> abstractionNames = indexAbstractionResolver.resolveIndexAbstractions(requestIndices, request.indicesOptions(),
clusterState.getMetadata(), true); // Always include data streams for data streams stats api
SortedMap<String, IndexAbstraction> indicesLookup = clusterState.getMetadata().getIndicesLookup();
String[] concreteDatastreamIndices = abstractionNames.stream().flatMap(abstractionName -> {
IndexAbstraction indexAbstraction = indicesLookup.get(abstractionName);
assert indexAbstraction != null;
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) indexAbstraction;
List<IndexMetadata> indices = dataStream.getIndices();
return indices.stream().map(idx -> idx.getIndex().getName());
} else {
return Stream.empty();
}
}).toArray(String[]::new);
return clusterState.getRoutingTable().allShards(concreteDatastreamIndices);
}
@Override
protected DataStreamShardStats shardOperation(Request request, ShardRouting shardRouting) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
// if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet
if (indexShard.routingEntry() == null) {
throw new ShardNotFoundException(indexShard.shardId());
}
StoreStats storeStats = indexShard.storeStats();
IndexAbstraction indexAbstraction = clusterService.state().getMetadata().getIndicesLookup().get(shardRouting.getIndexName());
assert indexAbstraction != null;
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
assert dataStream != null;
long maxTimestamp = 0L;
try (Engine.Searcher searcher = indexShard.acquireSearcher("data_stream_stats")) {
IndexReader indexReader = searcher.getIndexReader();
String fieldName = dataStream.getDataStream().getTimeStampField().getName();
byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, fieldName);
if (maxPackedValue != null) {
maxTimestamp = LongPoint.decodeDimension(maxPackedValue, 0);
}
}
return new DataStreamShardStats(
indexShard.routingEntry(),
storeStats,
maxTimestamp
);
}
@Override
protected DataStreamShardStats readShardResult(StreamInput in) throws IOException {
return new DataStreamShardStats(in);
}
@Override
protected Response newResponse(Request request, int totalShards, int successfulShards,
int failedShards, List<DataStreamShardStats> dataStreamShardStats,
List<DefaultShardOperationFailedException> shardFailures,
ClusterState clusterState) {
Map<String, AggregatedStats> aggregatedDataStreamsStats = new HashMap<>();
Set<String> allBackingIndices = new HashSet<>();
long totalStoreSizeBytes = 0L;
SortedMap<String, IndexAbstraction> indicesLookup = clusterState.getMetadata().getIndicesLookup();
for (DataStreamShardStats shardStat : dataStreamShardStats) {
String indexName = shardStat.getShardRouting().getIndexName();
IndexAbstraction indexAbstraction = indicesLookup.get(indexName);
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
assert dataStream != null;
// Aggregate global stats
totalStoreSizeBytes += shardStat.getStoreStats().sizeInBytes();
allBackingIndices.add(indexName);
// Aggregate data stream stats
AggregatedStats stats = aggregatedDataStreamsStats.computeIfAbsent(dataStream.getName(), s -> new AggregatedStats());
stats.storageBytes += shardStat.getStoreStats().sizeInBytes();
stats.maxTimestamp = Math.max(stats.maxTimestamp, shardStat.getMaxTimestamp());
stats.backingIndices.add(indexName);
}
DataStreamStats[] dataStreamStats = aggregatedDataStreamsStats.entrySet().stream()
.map(entry -> new DataStreamStats(
entry.getKey(),
entry.getValue().backingIndices.size(),
new ByteSizeValue(entry.getValue().storageBytes),
entry.getValue().maxTimestamp))
.toArray(DataStreamStats[]::new);
return new Response(
totalShards,
successfulShards,
failedShards,
shardFailures,
aggregatedDataStreamsStats.size(),
allBackingIndices.size(),
new ByteSizeValue(totalStoreSizeBytes),
dataStreamStats
);
}
}
}

View File

@ -0,0 +1,233 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(DeleteDataStreamAction.class);
public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction();
public static final String NAME = "indices:admin/data_stream/delete";
private DeleteDataStreamAction() {
super(NAME, AcknowledgedResponse::new);
}
public static class Request extends MasterNodeRequest<Request> implements IndicesRequest.Replaceable {
private String[] names;
public Request(String[] names) {
this.names = Objects.requireNonNull(names);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (CollectionUtils.isEmpty(names)) {
validationException = addValidationError("no data stream(s) specified", validationException);
}
return validationException;
}
public Request(StreamInput in) throws IOException {
super(in);
this.names = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(names);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Arrays.equals(names, request.names);
}
@Override
public int hashCode() {
return Arrays.hashCode(names);
}
@Override
public String[] indices() {
return names;
}
@Override
public IndicesOptions indicesOptions() {
// this doesn't really matter since data stream name resolution isn't affected by IndicesOptions and
// a data stream's backing indices are retrieved from its metadata
return IndicesOptions.fromOptions(false, true, true, true, false, false, true, false);
}
@Override
public boolean includeDataStreams() {
return true;
}
@Override
public IndicesRequest indices(String... indices) {
this.names = indices;
return this;
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final MetadataDeleteIndexService deleteIndexService;
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataDeleteIndexService deleteIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.deleteIndexService = deleteIndexService;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
clusterService.submitStateUpdateTask("remove-data-stream [" + Strings.arrayToCommaDelimitedString(request.names) + "]",
new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public ClusterState execute(ClusterState currentState) {
return removeDataStream(deleteIndexService, currentState, request);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}
static ClusterState removeDataStream(MetadataDeleteIndexService deleteIndexService, ClusterState currentState, Request request) {
Set<String> dataStreams = new HashSet<>();
Set<String> snapshottingDataStreams = new HashSet<>();
for (String name : request.names) {
for (String dataStreamName : currentState.metadata().dataStreams().keySet()) {
if (Regex.simpleMatch(name, dataStreamName)) {
dataStreams.add(dataStreamName);
}
}
snapshottingDataStreams.addAll(SnapshotsService.snapshottingDataStreams(currentState, dataStreams));
}
if (snapshottingDataStreams.isEmpty() == false) {
throw new SnapshotInProgressException("Cannot delete data streams that are being snapshotted: " + snapshottingDataStreams +
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
Set<Index> backingIndicesToRemove = new HashSet<>();
for (String dataStreamName : dataStreams) {
DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName);
assert dataStream != null;
backingIndicesToRemove.addAll(dataStream.getIndices());
}
// first delete the data streams and then the indices:
// (this to avoid data stream validation from failing when deleting an index that is part of a data stream
// without updating the data stream)
// TODO: change order when delete index api also updates the data stream the index to be removed is member of
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
for (String ds : dataStreams) {
logger.info("removing data stream [{}]", ds);
metadata.removeDataStream(ds);
}
currentState = ClusterState.builder(currentState).metadata(metadata).build();
return deleteIndexService.deleteIndices(currentState, backingIndicesToRemove);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -0,0 +1,323 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response> {
public static final GetDataStreamAction INSTANCE = new GetDataStreamAction();
public static final String NAME = "indices:admin/data_stream/get";
private GetDataStreamAction() {
super(NAME, Response::new);
}
public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
private String[] names;
public Request(String[] names) {
this.names = names;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public Request(StreamInput in) throws IOException {
super(in);
this.names = in.readOptionalStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(names);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Arrays.equals(names, request.names);
}
@Override
public int hashCode() {
return Arrays.hashCode(names);
}
@Override
public String[] indices() {
return names;
}
@Override
public IndicesOptions indicesOptions() {
// this doesn't really matter since data stream name resolution isn't affected by IndicesOptions and
// a data stream's backing indices are retrieved from its metadata
return IndicesOptions.fromOptions(false, true, true, true, false, false, true, false);
}
@Override
public boolean includeDataStreams() {
return true;
}
@Override
public IndicesRequest indices(String... indices) {
this.names = indices;
return this;
}
}
public static class Response extends ActionResponse implements ToXContentObject {
public static final ParseField DATASTREAMS_FIELD = new ParseField("data_streams");
public static class DataStreamInfo extends AbstractDiffable<DataStreamInfo> implements ToXContentObject {
public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template");
public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy");
DataStream dataStream;
ClusterHealthStatus dataStreamStatus;
@Nullable String indexTemplate;
@Nullable String ilmPolicyName;
public DataStreamInfo(DataStream dataStream, ClusterHealthStatus dataStreamStatus, @Nullable String indexTemplate,
@Nullable String ilmPolicyName) {
this.dataStream = dataStream;
this.dataStreamStatus = dataStreamStatus;
this.indexTemplate = indexTemplate;
this.ilmPolicyName = ilmPolicyName;
}
public DataStreamInfo(StreamInput in) throws IOException {
this(new DataStream(in), ClusterHealthStatus.readFrom(in), in.readOptionalString(), in.readOptionalString());
}
public DataStream getDataStream() {
return dataStream;
}
public ClusterHealthStatus getDataStreamStatus() {
return dataStreamStatus;
}
@Nullable
public String getIndexTemplate() {
return indexTemplate;
}
@Nullable
public String getIlmPolicy() {
return ilmPolicyName;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
dataStream.writeTo(out);
dataStreamStatus.writeTo(out);
out.writeOptionalString(indexTemplate);
out.writeOptionalString(ilmPolicyName);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataStream.NAME_FIELD.getPreferredName(), dataStream.getName());
builder.field(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName(), dataStream.getTimeStampField());
builder.field(DataStream.INDICES_FIELD.getPreferredName(), dataStream.getIndices());
builder.field(DataStream.GENERATION_FIELD.getPreferredName(), dataStream.getGeneration());
builder.field(STATUS_FIELD.getPreferredName(), dataStreamStatus);
if (indexTemplate != null) {
builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate);
}
if (ilmPolicyName != null) {
builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DataStreamInfo that = (DataStreamInfo) o;
return dataStream.equals(that.dataStream) &&
dataStreamStatus == that.dataStreamStatus &&
Objects.equals(indexTemplate, that.indexTemplate) &&
Objects.equals(ilmPolicyName, that.ilmPolicyName);
}
@Override
public int hashCode() {
return Objects.hash(dataStream, dataStreamStatus, indexTemplate, ilmPolicyName);
}
}
private final List<DataStreamInfo> dataStreams;
public Response(List<DataStreamInfo> dataStreams) {
this.dataStreams = dataStreams;
}
public Response(StreamInput in) throws IOException {
this(in.readList(DataStreamInfo::new));
}
public List<DataStreamInfo> getDataStreams() {
return dataStreams;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(dataStreams);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(DATASTREAMS_FIELD.getPreferredName());
for (DataStreamInfo dataStream : dataStreams) {
dataStream.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return dataStreams.equals(response.dataStreams);
}
@Override
public int hashCode() {
return Objects.hash(dataStreams);
}
}
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
private static final Logger logger = LogManager.getLogger(TransportAction.class);
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response read(StreamInput in) throws IOException {
return new Response(in);
}
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<Response> listener) throws Exception {
List<DataStream> dataStreams = getDataStreams(state, indexNameExpressionResolver, request);
List<Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
for (DataStream dataStream : dataStreams) {
String indexTemplate = MetadataIndexTemplateService.findV2Template(state.metadata(), dataStream.getName(), false);
String ilmPolicyName = null;
if (indexTemplate != null) {
Settings settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), indexTemplate);
ilmPolicyName = settings.get("index.lifecycle.name");
} else {
logger.warn("couldn't find any matching template for data stream [{}]. has it been restored (and possibly renamed)" +
"from a snapshot?", dataStream.getName());
}
ClusterStateHealth streamHealth = new ClusterStateHealth(state,
dataStream.getIndices().stream().map(Index::getName).toArray(String[]::new));
dataStreamInfos.add(new Response.DataStreamInfo(dataStream, streamHealth.getStatus(), indexTemplate, ilmPolicyName));
}
listener.onResponse(new Response(dataStreamInfos));
}
static List<DataStream> getDataStreams(ClusterState clusterState, IndexNameExpressionResolver iner, Request request) {
List<String> results = iner.dataStreamNames(clusterState, request.indicesOptions(), request.names);
Map<String, DataStream> dataStreams = clusterState.metadata().dataStreams();
return results.stream()
.map(dataStreams::get)
.sorted(Comparator.comparing(DataStream::getName))
.collect(Collectors.toList());
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -39,6 +39,9 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@ -841,6 +844,36 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
void rolloverIndex(RolloverRequest request, ActionListener<RolloverResponse> listener);
/**
* Store a data stream
*/
void createDataStream(CreateDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener);
/**
* Store a data stream
*/
ActionFuture<AcknowledgedResponse> createDataStream(CreateDataStreamAction.Request request);
/**
* Delete a data stream
*/
void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener);
/**
* Delete a data stream
*/
ActionFuture<AcknowledgedResponse> deleteDataStream(DeleteDataStreamAction.Request request);
/**
* Get data streams
*/
void getDataStreams(GetDataStreamAction.Request request, ActionListener<GetDataStreamAction.Response> listener);
/**
* Get data streams
*/
ActionFuture<GetDataStreamAction.Response> getDataStreams(GetDataStreamAction.Request request);
/**
* Resolves names and wildcard expressions to indices, aliases, and data streams
*/

View File

@ -169,6 +169,9 @@ import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDangli
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
@ -1814,6 +1817,36 @@ public abstract class AbstractClient implements Client {
execute(GetSettingsAction.INSTANCE, request, listener);
}
@Override
public void createDataStream(CreateDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener) {
execute(CreateDataStreamAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<AcknowledgedResponse> createDataStream(CreateDataStreamAction.Request request) {
return execute(CreateDataStreamAction.INSTANCE, request);
}
@Override
public void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener) {
execute(DeleteDataStreamAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<AcknowledgedResponse> deleteDataStream(DeleteDataStreamAction.Request request) {
return execute(DeleteDataStreamAction.INSTANCE, request);
}
@Override
public void getDataStreams(GetDataStreamAction.Request request, ActionListener<GetDataStreamAction.Response> listener) {
execute(GetDataStreamAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<GetDataStreamAction.Response> getDataStreams(GetDataStreamAction.Request request) {
return execute(GetDataStreamAction.INSTANCE, request);
}
@Override
public void resolveIndex(ResolveIndexAction.Request request,
ActionListener<ResolveIndexAction.Response> listener) {

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class RestCreateDataStreamAction extends BaseRestHandler {
@Override
public String getName() {
return "create_data_stream_action";
}
@Override
public List<Route> routes() {
return Collections.singletonList(
new Route(RestRequest.Method.PUT, "/_data_stream/{name}")
);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(request.param("name"));
return channel -> client.admin().indices().createDataStream(putDataStreamRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class RestDataStreamsStatsAction extends BaseRestHandler {
@Override
public String getName() {
return "data_stream_stats_action";
}
@Override
public List<Route> routes() {
return Arrays.asList(
new Route(RestRequest.Method.GET, "/_data_stream/_stats"),
new Route(RestRequest.Method.GET, "/_data_stream/{name}/_stats")
);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
DataStreamsStatsAction.Request dataStreamsStatsRequest = new DataStreamsStatsAction.Request();
boolean forbidClosedIndices = request.paramAsBoolean("forbid_closed_indices", true);
IndicesOptions defaultIndicesOption = forbidClosedIndices ? dataStreamsStatsRequest.indicesOptions()
: IndicesOptions.strictExpandOpen();
assert dataStreamsStatsRequest.indicesOptions() == IndicesOptions.strictExpandOpenAndForbidClosed() : "DataStreamStats default " +
"indices options changed";
dataStreamsStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, defaultIndicesOption));
dataStreamsStatsRequest.indices(Strings.splitStringByCommaToArray(request.param("name")));
return channel -> client.execute(DataStreamsStatsAction.INSTANCE, dataStreamsStatsRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class RestDeleteDataStreamAction extends BaseRestHandler {
@Override
public String getName() {
return "delete_data_stream_action";
}
@Override
public List<Route> routes() {
return Collections.singletonList(new Route(RestRequest.Method.DELETE, "/_data_stream/{name}"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(
Strings.splitStringByCommaToArray(request.param("name")));
return channel -> client.admin().indices().deleteDataStream(deleteDataStreamRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.List;
public class RestGetDataStreamsAction extends BaseRestHandler {
@Override
public String getName() {
return "get_data_streams_action";
}
@Override
public List<Route> routes() {
return org.elasticsearch.common.collect.List.of(
new Route(RestRequest.Method.GET, "/_data_stream"),
new Route(RestRequest.Method.GET, "/_data_stream/{name}")
);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
GetDataStreamAction.Request getDataStreamsRequest = new GetDataStreamAction.Request(
Strings.splitStringByCommaToArray(request.param("name")));
return channel -> client.admin().indices().getDataStreams(getDataStreamsRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLength(8));
}
public void testValidateRequest() {
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream");
ActionRequestValidationException e = req.validate();
assertNull(e);
}
public void testValidateRequestWithoutName() {
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("");
ActionRequestValidationException e = req.validate();
assertNotNull(e);
assertThat(e.validationErrors().size(), equalTo(1));
assertThat(e.validationErrors().get(0), containsString("name is missing"));
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
public class DataStreamsStatsResponseTests extends AbstractWireSerializingTestCase<DataStreamsStatsAction.Response> {
@Override
protected Writeable.Reader<DataStreamsStatsAction.Response> instanceReader() {
return DataStreamsStatsAction.Response::new;
}
@Override
protected DataStreamsStatsAction.Response createTestInstance() {
return randomStatsResponse();
}
public static DataStreamsStatsAction.Response randomStatsResponse() {
int dataStreamCount = randomInt(10);
int backingIndicesTotal = 0;
long totalStoreSize = 0L;
ArrayList<DataStreamsStatsAction.DataStreamStats> dataStreamStats = new ArrayList<>();
for (int i = 0; i < dataStreamCount; i++) {
String dataStreamName = randomAlphaOfLength(8).toLowerCase(Locale.getDefault());
int backingIndices = randomInt(5);
backingIndicesTotal += backingIndices;
long storeSize = randomLongBetween(250, 1000000000);
totalStoreSize += storeSize;
long maximumTimestamp = randomRecentTimestamp();
dataStreamStats.add(new DataStreamsStatsAction.DataStreamStats(dataStreamName, backingIndices,
new ByteSizeValue(storeSize), maximumTimestamp));
}
int totalShards = randomIntBetween(backingIndicesTotal, backingIndicesTotal * 3);
int successfulShards = randomInt(totalShards);
int failedShards = totalShards - successfulShards;
List<DefaultShardOperationFailedException> exceptions = new ArrayList<>();
for (int i = 0; i < failedShards; i++) {
exceptions.add(new DefaultShardOperationFailedException(randomAlphaOfLength(8).toLowerCase(Locale.getDefault()),
randomInt(totalShards), new ElasticsearchException("boom")));
}
return new DataStreamsStatsAction.Response(totalShards, successfulShards, failedShards, exceptions,
dataStreamCount, backingIndicesTotal, new ByteSizeValue(totalStoreSize),
dataStreamStats.toArray(new DataStreamsStatsAction.DataStreamStats[0]));
}
private static long randomRecentTimestamp() {
long base = System.currentTimeMillis();
return randomLongBetween(base - TimeUnit.HOURS.toMillis(1), base);
}
}

View File

@ -0,0 +1,222 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request createTestInstance() {
return new Request(randomArray(1, 3, String[]::new, () -> randomAlphaOfLength(6)));
}
public void testValidateRequest() {
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{"my-data-stream"});
ActionRequestValidationException e = req.validate();
assertNull(e);
}
public void testValidateRequestWithoutName() {
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[0]);
ActionRequestValidationException e = req.validate();
assertNotNull(e);
assertThat(e.validationErrors().size(), equalTo(1));
assertThat(e.validationErrors().get(0), containsString("no data stream(s) specified"));
}
public void testDeleteDataStream() {
final String dataStreamName = "my-data-stream";
final List<String> otherIndices = randomSubsetOf(org.elasticsearch.common.collect.List.of("foo", "bar", "baz"));
ClusterState cs = getClusterStateWithDataStreams(
org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, 2)), otherIndices);
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{dataStreamName});
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(0));
assertThat(newState.metadata().indices().size(), equalTo(otherIndices.size()));
for (String indexName : otherIndices) {
assertThat(newState.metadata().indices().get(indexName).getIndex().getName(), equalTo(indexName));
}
}
public void testDeleteMultipleDataStreams() {
String[] dataStreamNames = {"foo", "bar", "baz", "eggplant"};
ClusterState cs = getClusterStateWithDataStreams(org.elasticsearch.common.collect.List.of(
new Tuple<>(dataStreamNames[0], randomIntBetween(1, 3)),
new Tuple<>(dataStreamNames[1], randomIntBetween(1, 3)),
new Tuple<>(dataStreamNames[2], randomIntBetween(1, 3)),
new Tuple<>(dataStreamNames[3], randomIntBetween(1, 3))
), org.elasticsearch.common.collect.List.of());
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{"ba*", "eggplant"});
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
DataStream remainingDataStream = newState.metadata().dataStreams().get(dataStreamNames[0]);
assertNotNull(remainingDataStream);
assertThat(newState.metadata().indices().size(), equalTo(remainingDataStream.getIndices().size()));
for (Index i : remainingDataStream.getIndices()) {
assertThat(newState.metadata().indices().get(i.getName()).getIndex(), equalTo(i));
}
}
public void testDeleteSnapshottingDataStream() {
final String dataStreamName = "my-data-stream1";
final String dataStreamName2 = "my-data-stream2";
final List<String> otherIndices = randomSubsetOf(Arrays.asList("foo", "bar", "baz"));
ClusterState cs = getClusterStateWithDataStreams(Arrays.asList(new Tuple<>(dataStreamName, 2), new Tuple<>(dataStreamName2, 2)),
otherIndices);
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.of(Arrays.asList(
createEntry(dataStreamName, "repo1", false),
createEntry(dataStreamName2, "repo2", true)));
ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{dataStreamName});
SnapshotInProgressException e = expectThrows(SnapshotInProgressException.class,
() -> DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), snapshotCs, req));
assertThat(e.getMessage(), equalTo("Cannot delete data streams that are being snapshotted: [my-data-stream1]. Try again after " +
"snapshot finishes or cancel the currently running snapshot."));
}
private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) {
return new SnapshotsInProgress.Entry(new Snapshot(repo, new SnapshotId("", "")), false, partial,
SnapshotsInProgress.State.STARTED, Collections.emptyList(), Collections.singletonList(dataStreamName), 0, 1,
ImmutableOpenMap.of(), null, null, null);
}
public void testDeleteNonexistentDataStream() {
final String dataStreamName = "my-data-stream";
String[] dataStreamNames = {"foo", "bar", "baz", "eggplant"};
ClusterState cs = getClusterStateWithDataStreams(org.elasticsearch.common.collect.List.of(
new Tuple<>(dataStreamNames[0], randomIntBetween(1, 3)),
new Tuple<>(dataStreamNames[1], randomIntBetween(1, 3)),
new Tuple<>(dataStreamNames[2], randomIntBetween(1, 3)),
new Tuple<>(dataStreamNames[3], randomIntBetween(1, 3))
), org.elasticsearch.common.collect.List.of());
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{dataStreamName});
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(cs.metadata().dataStreams().size()));
assertThat(newState.metadata().dataStreams().keySet(),
containsInAnyOrder(cs.metadata().dataStreams().keySet().toArray(Strings.EMPTY_ARRAY)));
}
@SuppressWarnings("unchecked")
private static MetadataDeleteIndexService getMetadataDeleteIndexService() {
MetadataDeleteIndexService s = mock(MetadataDeleteIndexService.class);
when(s.deleteIndices(any(ClusterState.class), any(Set.class)))
.thenAnswer(mockInvocation -> {
ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
Set<Index> indices = (Set<Index>) mockInvocation.getArguments()[1];
final Metadata.Builder b = Metadata.builder(currentState.metadata());
for (Index index : indices) {
b.remove(index.getName());
}
return ClusterState.builder(currentState).metadata(b.build()).build();
});
return s;
}
/**
* Constructs {@code ClusterState} with the specified data streams and indices.
*
* @param dataStreams The names of the data streams to create with their respective number of backing indices
* @param indexNames The names of indices to create that do not back any data streams
*/
public static ClusterState getClusterStateWithDataStreams(List<Tuple<String, Integer>> dataStreams, List<String> indexNames) {
Metadata.Builder builder = Metadata.builder();
List<IndexMetadata> allIndices = new ArrayList<>();
for (Tuple<String, Integer> dsTuple : dataStreams) {
List<IndexMetadata> backingIndices = new ArrayList<>();
for (int backingIndexNumber = 1; backingIndexNumber <= dsTuple.v2(); backingIndexNumber++) {
backingIndices.add(createIndexMetadata(DataStream.getDefaultBackingIndexName(dsTuple.v1(), backingIndexNumber), true));
}
allIndices.addAll(backingIndices);
DataStream ds = new DataStream(dsTuple.v1(), createTimestampField("@timestamp"),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), dsTuple.v2());
builder.put(ds);
}
for (String indexName : indexNames) {
allIndices.add(createIndexMetadata(indexName, false));
}
for (IndexMetadata index : allIndices) {
builder.put(index, false);
}
return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
}
private static IndexMetadata createIndexMetadata(String name, boolean hidden) {
Settings.Builder b = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put("index.hidden", hidden);
return IndexMetadata.builder(name)
.settings(b)
.numberOfShards(1)
.numberOfReplicas(1)
.build();
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.List;
import static org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests.getClusterStateWithDataStreams;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request createTestInstance() {
final String[] searchParameter;
switch (randomIntBetween(1, 4)) {
case 1:
searchParameter = generateRandomStringArray(3, 8, false, false);
break;
case 2:
String[] parameters = generateRandomStringArray(3, 8, false, false);
for (int k = 0; k < parameters.length; k++) {
parameters[k] = parameters[k] + "*";
}
searchParameter = parameters;
break;
case 3:
searchParameter = new String[]{"*"};
break;
default:
searchParameter = null;
break;
}
return new Request(searchParameter);
}
public void testGetDataStream() {
final String dataStreamName = "my-data-stream";
ClusterState cs = getClusterStateWithDataStreams(
org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, 1)), org.elasticsearch.common.collect.List.of());
GetDataStreamAction.Request req = new GetDataStreamAction.Request(new String[]{dataStreamName});
List<DataStream> dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req);
assertThat(dataStreams.size(), equalTo(1));
assertThat(dataStreams.get(0).getName(), equalTo(dataStreamName));
}
public void testGetDataStreamsWithWildcards() {
final String[] dataStreamNames = {"my-data-stream", "another-data-stream"};
ClusterState cs = getClusterStateWithDataStreams(
org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamNames[0], 1), new Tuple<>(dataStreamNames[1], 1)),
org.elasticsearch.common.collect.List.of());
GetDataStreamAction.Request req = new GetDataStreamAction.Request(new String[]{dataStreamNames[1].substring(0, 5) + "*"});
List<DataStream> dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req);
assertThat(dataStreams.size(), equalTo(1));
assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1]));
req = new GetDataStreamAction.Request(new String[]{"*"});
dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req);
assertThat(dataStreams.size(), equalTo(2));
assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1]));
assertThat(dataStreams.get(1).getName(), equalTo(dataStreamNames[0]));
req = new GetDataStreamAction.Request((String[]) null);
dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req);
assertThat(dataStreams.size(), equalTo(2));
assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1]));
assertThat(dataStreams.get(1).getName(), equalTo(dataStreamNames[0]));
req = new GetDataStreamAction.Request(new String[]{"matches-none*"});
dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req);
assertThat(dataStreams.size(), equalTo(0));
}
public void testGetDataStreamsWithoutWildcards() {
final String[] dataStreamNames = {"my-data-stream", "another-data-stream"};
ClusterState cs = getClusterStateWithDataStreams(
org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamNames[0], 1), new Tuple<>(dataStreamNames[1], 1)),
org.elasticsearch.common.collect.List.of());
GetDataStreamAction.Request req = new GetDataStreamAction.Request(new String[]{dataStreamNames[0], dataStreamNames[1]});
List<DataStream> dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req);
assertThat(dataStreams.size(), equalTo(2));
assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1]));
assertThat(dataStreams.get(1).getName(), equalTo(dataStreamNames[0]));
req = new GetDataStreamAction.Request(new String[]{dataStreamNames[1]});
dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req);
assertThat(dataStreams.size(), equalTo(1));
assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[1]));
req = new GetDataStreamAction.Request(new String[]{dataStreamNames[0]});
dataStreams = GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req);
assertThat(dataStreams.size(), equalTo(1));
assertThat(dataStreams.get(0).getName(), equalTo(dataStreamNames[0]));
GetDataStreamAction.Request req2 = new GetDataStreamAction.Request(new String[]{"foo"});
IndexNotFoundException e = expectThrows(IndexNotFoundException.class,
() -> GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req2));
assertThat(e.getMessage(), containsString("no such index [foo]"));
}
public void testGetNonexistentDataStream() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
GetDataStreamAction.Request req = new GetDataStreamAction.Request(new String[]{dataStreamName});
IndexNotFoundException e = expectThrows(IndexNotFoundException.class,
() -> GetDataStreamAction.TransportAction.getDataStreams(cs, new IndexNameExpressionResolver(), req));
assertThat(e.getMessage(), containsString("no such index [" + dataStreamName + "]"));
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction.Response;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStreamTests;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.List;
public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase<Response> {
@Override
protected Writeable.Reader<Response> instanceReader() {
return Response::new;
}
@Override
protected Response createTestInstance() {
int numDataStreams = randomIntBetween(0, 8);
List<Response.DataStreamInfo> dataStreams = new ArrayList<>();
for (int i = 0; i < numDataStreams; i++) {
dataStreams.add(new Response.DataStreamInfo(DataStreamTests.randomInstance(), ClusterHealthStatus.GREEN,
randomAlphaOfLengthBetween(2, 10), randomAlphaOfLengthBetween(2, 10)));
}
return new Response(dataStreams);
}
}

View File

@ -20,8 +20,8 @@
package org.elasticsearch.action.admin.indices.mapping.put;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
@ -163,7 +163,7 @@ public class PutMappingRequestTests extends ESTestCase {
tuple(dataStreamNames[1], randomIntBetween(1, 3)),
tuple(dataStreamNames[2], randomIntBetween(1, 3)));
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata,
ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata,
org.elasticsearch.common.collect.List.of("index1", "index2", "index3"));
cs = addAliases(cs, org.elasticsearch.common.collect.List.of(
tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", true))),
@ -185,7 +185,7 @@ public class PutMappingRequestTests extends ESTestCase {
tuple(dataStreamNames[1], randomIntBetween(1, 3)),
tuple(dataStreamNames[2], randomIntBetween(1, 3)));
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata,
ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata,
org.elasticsearch.common.collect.List.of("index1", "index2", "index3"));
cs = addAliases(cs, org.elasticsearch.common.collect.List.of(
tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", true))),
@ -209,7 +209,7 @@ public class PutMappingRequestTests extends ESTestCase {
tuple(dataStreamNames[1], randomIntBetween(1, 3)),
tuple(dataStreamNames[2], randomIntBetween(1, 3)));
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata,
ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata,
org.elasticsearch.common.collect.List.of("index1", "index2", "index3"));
cs = addAliases(cs, org.elasticsearch.common.collect.List.of(
tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", true))),
@ -233,7 +233,7 @@ public class PutMappingRequestTests extends ESTestCase {
tuple(dataStreamNames[1], randomIntBetween(1, 3)),
tuple(dataStreamNames[2], randomIntBetween(1, 3)));
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata,
ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata,
org.elasticsearch.common.collect.List.of("index1", "index2", "index3"));
final ClusterState cs2 = addAliases(cs, org.elasticsearch.common.collect.List.of(
tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", true))),
@ -253,7 +253,7 @@ public class PutMappingRequestTests extends ESTestCase {
tuple(dataStreamNames[1], randomIntBetween(1, 3)),
tuple(dataStreamNames[2], randomIntBetween(1, 3)));
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dsMetadata,
ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dsMetadata,
org.elasticsearch.common.collect.List.of("index1", "index2", "index3"));
final ClusterState cs2 = addAliases(cs, org.elasticsearch.common.collect.List.of(
tuple("alias1", org.elasticsearch.common.collect.List.of(tuple("index1", false), tuple("index2", false))),

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTests;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -261,7 +262,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
public void testDataStreamValidation() throws IOException {
Metadata.Builder md = Metadata.builder();
DataStream randomDataStream = DataStreamTestHelper.randomInstance();
DataStream randomDataStream = DataStreamTests.randomInstance();
for (Index index : randomDataStream.getIndices()) {
md.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}
@ -335,7 +336,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
}
public void testCreateIndexRequestForDataStream() {
DataStream dataStream = DataStreamTestHelper.randomInstance();
DataStream dataStream = DataStreamTests.randomInstance();
final String newWriteIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
final RolloverRequest rolloverRequest = new RolloverRequest(dataStream.getName(), randomAlphaOfLength(10));
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
@ -540,7 +541,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
}
public void testRolloverClusterStateForDataStream() throws Exception {
final DataStream dataStream = DataStreamTestHelper.randomInstance();
final DataStream dataStream = DataStreamTests.randomInstance();
ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStream.getName() + "*"),
null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate());
Metadata.Builder builder = Metadata.builder();
@ -636,7 +637,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
final boolean useDataStream = randomBoolean();
final Metadata.Builder builder = Metadata.builder();
if (useDataStream) {
DataStream dataStream = DataStreamTestHelper.randomInstance();
DataStream dataStream = DataStreamTests.randomInstance();
rolloverTarget = dataStream.getName();
sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName();
defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
@ -693,7 +694,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
}
public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception {
final DataStream dataStream = DataStreamTestHelper.randomInstance();
final DataStream dataStream = DataStreamTests.randomInstance();
Metadata.Builder builder = Metadata.builder();
for (Index index : dataStream.getIndices()) {
builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractNamedWriteableTestCase;
@ -37,7 +36,7 @@ public class DataStreamMetadataTests extends AbstractNamedWriteableTestCase<Data
}
Map<String, DataStream> dataStreams = new HashMap<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
dataStreams.put(randomAlphaOfLength(5), DataStreamTestHelper.randomInstance());
dataStreams.put(randomAlphaOfLength(5), DataStreamTests.randomInstance());
}
return new DataStreamMetadata(dataStreams);
}

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
@ -36,6 +35,23 @@ import static org.hamcrest.Matchers.equalTo;
public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
public static List<Index> randomIndexInstances() {
int numIndices = randomIntBetween(0, 128);
List<Index> indices = new ArrayList<>(numIndices);
for (int i = 0; i < numIndices; i++) {
indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())));
}
return indices;
}
public static DataStream randomInstance() {
List<Index> indices = randomIndexInstances();
long generation = indices.size() + randomLongBetween(1, 128);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation);
}
@Override
protected DataStream doParseInstance(XContentParser parser) throws IOException {
return DataStream.fromXContent(parser);
@ -48,11 +64,11 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
@Override
protected DataStream createTestInstance() {
return DataStreamTestHelper.randomInstance();
return randomInstance();
}
public void testRollover() {
DataStream ds = DataStreamTestHelper.randomInstance();
DataStream ds = randomInstance();
Index newWriteIndex = new Index(getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random()));
DataStream rolledDs = ds.rollover(newWriteIndex);

View File

@ -16,8 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;

View File

@ -23,9 +23,9 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlock;
@ -401,7 +401,7 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices));
writeIndices.add(DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices));
}
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dataStreamsToCreate,
ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dataStreamsToCreate,
org.elasticsearch.common.collect.List.of());
ClusterService clusterService = mock(ClusterService.class);

View File

@ -1289,7 +1289,7 @@ public class MetadataTests extends ESTestCase {
.put("component_template_" + randomAlphaOfLength(3), ComponentTemplateTests.randomInstance())
.put("index_template_v2_" + randomAlphaOfLength(3), ComposableIndexTemplateTests.randomInstance());
DataStream randomDataStream = DataStreamTestHelper.randomInstance();
DataStream randomDataStream = DataStreamTests.randomInstance();
for (Index index : randomDataStream.getIndices()) {
md.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.apache.lucene.util.LuceneTestCase;
@ -89,6 +90,7 @@ public final class DataStreamTestHelper {
" }";
}
public static List<Index> randomIndexInstances() {
int numIndices = ESTestCase.randomIntBetween(0, 128);
List<Index> indices = new ArrayList<>(numIndices);
@ -148,4 +150,6 @@ public final class DataStreamTestHelper {
return IndexMetadata.builder(name).settings(b).numberOfShards(1).numberOfReplicas(1).build();
}
}

View File

@ -20,11 +20,14 @@
package org.elasticsearch.test;
import com.carrotsearch.hppc.ObjectArrayList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
@ -76,6 +79,7 @@ public abstract class TestCluster implements Closeable {
* Wipes any data that a test can leave behind: indices, templates (except exclude templates) and repositories
*/
public void wipe(Set<String> excludeTemplates) {
wipeAllDataStreams();
wipeIndices("_all");
wipeAllTemplates(excludeTemplates);
wipeRepositories();
@ -131,6 +135,18 @@ public abstract class TestCluster implements Closeable {
@Override
public abstract void close() throws IOException;
/**
* Deletes all data streams from the test cluster.
*/
public void wipeAllDataStreams() {
// Feature flag may not be enabled in all gradle modules that use ESIntegTestCase
if (size() > 0) {
AcknowledgedResponse response =
client().admin().indices().deleteDataStream(new DeleteDataStreamAction.Request(new String[]{"*"})).actionGet();
assertAcked(response);
}
}
/**
* Deletes the given indices from the tests cluster. If no index name is passed to this method
* all indices are removed.

View File

@ -652,9 +652,7 @@ public abstract class ESRestTestCase extends ESTestCase {
protected static void wipeDataStreams() throws IOException {
try {
if (hasXPack()) {
adminClient().performRequest(new Request("DELETE", "_data_stream/*"));
}
} catch (ResponseException e) {
// We hit a version of ES that doesn't serialize DeleteDataStreamAction.Request#wildcardExpressionsOriginallySpecified field or
// that doesn't support data streams so it's safe to ignore