[7.x] Rollover for data streams

This commit is contained in:
Dan Hermann 2020-04-23 12:04:34 -05:00 committed by GitHub
parent 55485cfa17
commit dd5c96c2ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 395 additions and 76 deletions

View File

@ -420,8 +420,8 @@ final class IndicesRequestConverters {
@Deprecated
static Request rollover(org.elasticsearch.action.admin.indices.rollover.RolloverRequest rolloverRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
.addPathPart(rolloverRequest.getNewIndexName()).build();
String endpoint = new RequestConverters.EndpointBuilder().addPathPart(rolloverRequest.getRolloverTarget())
.addPathPartAsIs("_rollover").addPathPart(rolloverRequest.getNewIndexName()).build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params();

View File

@ -943,9 +943,9 @@ public class IndicesRequestConvertersTests extends ESTestCase {
Request request = IndicesRequestConverters.rollover(rolloverRequest);
if (rolloverRequest.getNewIndexName() == null) {
Assert.assertEquals("/" + rolloverRequest.getAlias() + "/_rollover", request.getEndpoint());
Assert.assertEquals("/" + rolloverRequest.getRolloverTarget() + "/_rollover", request.getEndpoint());
} else {
Assert.assertEquals("/" + rolloverRequest.getAlias() + "/_rollover/" + rolloverRequest.getNewIndexName(),
Assert.assertEquals("/" + rolloverRequest.getRolloverTarget() + "/_rollover/" + rolloverRequest.getNewIndexName(),
request.getEndpoint());
}
Assert.assertEquals(HttpPost.METHOD_NAME, request.getMethod());

View File

@ -51,6 +51,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
@ -177,6 +178,13 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
MetadataCreateIndexService.validateIndexOrAliasName(request.name,
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
if (request.name.toLowerCase(Locale.ROOT).equals(request.name) == false) {
throw new IllegalArgumentException("data_stream [" + request.name + "] must be lowercase");
}
if (request.name.startsWith(".")) {
throw new IllegalArgumentException("data_stream [" + request.name + "] must not start with '.'");
}
String firstBackingIndexName = DataStream.getBackingIndexName(request.name, 1);
CreateIndexClusterStateUpdateRequest createIndexRequest =
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -34,7 +35,9 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
@ -43,12 +46,19 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.ALIAS;
import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.DATA_STREAM;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV1Templates;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV2Template;
/**
* Service responsible for handling rollover requests for write aliases and data streams
*/
public class MetadataRolloverService {
private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$");
private static final List<IndexAbstraction.Type> VALID_ROLLOVER_TARGETS = org.elasticsearch.common.collect.List.of(ALIAS, DATA_STREAM);
private final ThreadPool threadPool;
private final MetadataCreateIndexService createIndexService;
@ -77,17 +87,33 @@ public class MetadataRolloverService {
}
}
public RolloverResult rolloverClusterState(ClusterState currentState, String aliasName, String newIndexName,
public RolloverResult rolloverClusterState(ClusterState currentState, String rolloverTarget, String newIndexName,
CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent) throws Exception {
validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
switch (indexAbstraction.getType()) {
case ALIAS:
return rolloverAlias(currentState, (IndexAbstraction.Alias) indexAbstraction, rolloverTarget, newIndexName,
createIndexRequest, metConditions, silent);
case DATA_STREAM:
return rolloverDataStream(currentState, (IndexAbstraction.DataStream) indexAbstraction, rolloverTarget,
createIndexRequest, metConditions, silent);
default:
// the validate method above prevents this case
throw new IllegalStateException("unable to roll over type [" + indexAbstraction.getType().getDisplayName() + "]");
}
}
private RolloverResult rolloverAlias(ClusterState currentState, IndexAbstraction.Alias alias, String aliasName,
String newIndexName, CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent) throws Exception {
final Metadata metadata = currentState.metadata();
validate(metadata, aliasName);
final IndexAbstraction alias = metadata.getIndicesLookup().get(aliasName);
final IndexMetadata indexMetadata = alias.getWriteIndex();
final AliasMetadata aliasMetadata = indexMetadata.getAliases().get(alias.getName());
final String sourceProvidedName = indexMetadata.getSettings().get(IndexMetadata.SETTING_INDEX_PROVIDED_NAME,
indexMetadata.getIndex().getName());
final String sourceIndexName = indexMetadata.getIndex().getName();
final IndexMetadata writeIndex = alias.getWriteIndex();
final AliasMetadata aliasMetadata = writeIndex.getAliases().get(alias.getName());
final String sourceProvidedName = writeIndex.getSettings().get(IndexMetadata.SETTING_INDEX_PROVIDED_NAME,
writeIndex.getIndex().getName());
final String sourceIndexName = writeIndex.getIndex().getName();
final String unresolvedName = (newIndexName != null)
? newIndexName
: generateRolloverIndexName(sourceProvidedName, indexNameExpressionResolver);
@ -113,6 +139,28 @@ public class MetadataRolloverService {
return new RolloverResult(rolloverIndexName, sourceIndexName, newState);
}
private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstraction.DataStream dataStream, String dataStreamName,
CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent) throws Exception {
final DataStream ds = dataStream.getDataStream();
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
final String newWriteIndexName = DataStream.getBackingIndexName(ds.getName(), ds.getGeneration() + 1);
CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest =
prepareDataStreamCreateIndexRequest(newWriteIndexName, createIndexRequest);
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexClusterStateRequest, silent,
(builder, indexMetadata) -> builder.put(ds.rollover(indexMetadata.getIndex())));
RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());
newState = ClusterState.builder(newState)
.metadata(Metadata.builder(newState.metadata())
.put(IndexMetadata.builder(newState.metadata().index(originalWriteIndex.getIndex()))
.putRolloverInfo(rolloverInfo)))
.build();
return new RolloverResult(newWriteIndexName, originalWriteIndex.getIndex().getName(), newState);
}
static String generateRolloverIndexName(String sourceIndexName, IndexNameExpressionResolver indexNameExpressionResolver) {
String resolvedName = indexNameExpressionResolver.resolveDateMathExpression(sourceIndexName);
final boolean isDateMath = sourceIndexName.equals(resolvedName) == false;
@ -129,15 +177,28 @@ public class MetadataRolloverService {
}
}
static CreateIndexClusterStateUpdateRequest prepareDataStreamCreateIndexRequest(final String targetIndexName,
CreateIndexRequest createIndexRequest) {
Settings settings = Settings.builder().put("index.hidden", true).build();
return prepareCreateIndexRequest(targetIndexName, targetIndexName, "rollover_data_stream", createIndexRequest, settings);
}
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
final String providedIndexName, final String targetIndexName, CreateIndexRequest createIndexRequest) {
return prepareCreateIndexRequest(providedIndexName, targetIndexName, "rollover_index", createIndexRequest, null);
}
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final String providedIndexName, final String targetIndexName,
CreateIndexRequest createIndexRequest) {
createIndexRequest.cause("rollover_index");
createIndexRequest.index(targetIndexName);
return new CreateIndexClusterStateUpdateRequest(
"rollover_index", targetIndexName, providedIndexName)
final String cause, CreateIndexRequest createIndexRequest,
Settings settings) {
Settings.Builder b = Settings.builder().put(createIndexRequest.settings());
if (settings != null) {
b.put(settings);
}
return new CreateIndexClusterStateUpdateRequest(cause, targetIndexName, providedIndexName)
.ackTimeout(createIndexRequest.timeout())
.masterNodeTimeout(createIndexRequest.masterNodeTimeout())
.settings(createIndexRequest.settings())
.settings(b.build())
.aliases(createIndexRequest.aliases())
.waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation
.mappings(createIndexRequest.mappings())
@ -191,17 +252,30 @@ public class MetadataRolloverService {
}
}
static void validate(Metadata metadata, String aliasName) {
final IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(aliasName);
static void validate(Metadata metadata, String rolloverTarget, String newIndexName, CreateIndexRequest request) {
final IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(rolloverTarget);
if (indexAbstraction == null) {
throw new IllegalArgumentException("source alias does not exist");
throw new IllegalArgumentException("rollover target [" + rolloverTarget + "] does not exist");
}
if (indexAbstraction.getType() != IndexAbstraction.Type.ALIAS) {
throw new IllegalArgumentException("source alias is a [" + indexAbstraction.getType().getDisplayName() +
"], but an [" + IndexAbstraction.Type.ALIAS.getDisplayName() + "] was expected");
if (VALID_ROLLOVER_TARGETS.contains(indexAbstraction.getType()) == false) {
throw new IllegalArgumentException("rollover target is a [" + indexAbstraction.getType().getDisplayName() + "] but one of [" +
Strings.collectionToCommaDelimitedString(VALID_ROLLOVER_TARGETS.stream().map(IndexAbstraction.Type::getDisplayName)
.collect(Collectors.toList())) + "] was expected");
}
if (indexAbstraction.getWriteIndex() == null) {
throw new IllegalArgumentException("source alias [" + indexAbstraction.getName() + "] does not point to a write index");
throw new IllegalArgumentException(
"rollover target [" + indexAbstraction.getName() + "] does not point to a write index");
}
if (indexAbstraction.getType() == DATA_STREAM) {
if (Strings.isNullOrEmpty(newIndexName) == false) {
throw new IllegalArgumentException("new index name may not be specified when rolling over a data stream");
}
if ((request.settings().equals(Settings.EMPTY) == false) ||
(request.aliases().size() > 0) ||
(request.mappings().size() > 0)) {
throw new IllegalArgumentException(
"aliases, mappings, and index settings may not be specified when rolling over a data stream");
}
}
}
}

View File

@ -42,7 +42,7 @@ import java.util.Map;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Request class to swap index under an alias upon satisfying conditions
* Request class to swap index under an alias or increment data stream generation upon satisfying conditions
*
* Note: there is a new class with the same name for the Java HLRC that uses a typeless format.
* Any changes done to this class should also go to that client class.
@ -90,7 +90,7 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
CreateIndexRequest.ALIASES, ObjectParser.ValueType.OBJECT);
}
private String alias;
private String rolloverTarget;
private String newIndexName;
private boolean dryRun;
private Map<String, Condition<?>> conditions = new HashMap<>(2);
@ -99,7 +99,7 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
public RolloverRequest(StreamInput in) throws IOException {
super(in);
alias = in.readString();
rolloverTarget = in.readString();
newIndexName = in.readOptionalString();
dryRun = in.readBoolean();
int size = in.readVInt();
@ -112,16 +112,16 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
RolloverRequest() {}
public RolloverRequest(String alias, String newIndexName) {
this.alias = alias;
public RolloverRequest(String rolloverTarget, String newIndexName) {
this.rolloverTarget = rolloverTarget;
this.newIndexName = newIndexName;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = createIndexRequest.validate();
if (alias == null) {
validationException = addValidationError("index alias is missing", validationException);
if (rolloverTarget == null) {
validationException = addValidationError("rollover target is missing", validationException);
}
return validationException;
}
@ -129,7 +129,7 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(alias);
out.writeString(rolloverTarget);
out.writeOptionalString(newIndexName);
out.writeBoolean(dryRun);
out.writeVInt(conditions.size());
@ -143,19 +143,19 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
@Override
public String[] indices() {
return new String[] {alias};
return new String[] {rolloverTarget};
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
return IndicesOptions.strictSingleIndexIncludeDataStreamNoExpandForbidClosed();
}
/**
* Sets the alias to rollover to another index
* Sets the rollover target to rollover to another index
*/
public void setAlias(String alias) {
this.alias = alias;
public void setRolloverTarget(String rolloverTarget) {
this.rolloverTarget = rolloverTarget;
}
/**
@ -220,8 +220,8 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
return conditions;
}
public String getAlias() {
return alias;
public String getRolloverTarget() {
return rolloverTarget;
}
public String getNewIndexName() {

View File

@ -34,8 +34,8 @@ public class RolloverRequestBuilder extends MasterNodeOperationRequestBuilder<Ro
super(client, action, new RolloverRequest());
}
public RolloverRequestBuilder setAlias(String alias) {
this.request.setAlias(alias);
public RolloverRequestBuilder setRolloverTarget(String rolloverTarget) {
this.request.setRolloverTarget(rolloverTarget);
return this;
}

View File

@ -89,7 +89,11 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
@Override
protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState state) {
IndicesOptions indicesOptions = IndicesOptions.fromOptions(true, true,
request.indicesOptions().expandWildcardsOpen(), request.indicesOptions().expandWildcardsClosed());
request.indicesOptions().expandWildcardsOpen(), request.indicesOptions().expandWildcardsClosed(),
request.indicesOptions().expandWildcardsHidden(), true,
request.indicesOptions().forbidClosedIndices(), request.indicesOptions().ignoreAliases(),
request.indicesOptions().ignoreThrottled(), request.indicesOptions().includeDataStreams());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request.indices()));
}
@ -105,14 +109,14 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
final ActionListener<RolloverResponse> listener) throws Exception {
MetadataRolloverService.RolloverResult preResult =
rolloverService.rolloverClusterState(state,
rolloverRequest.getAlias(), rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest(),
rolloverRequest.getRolloverTarget(), rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest(),
Collections.emptyList(), true);
Metadata metadata = state.metadata();
String sourceIndexName = preResult.sourceIndexName;
String rolloverIndexName = preResult.rolloverIndexName;
IndicesStatsRequest statsRequest = new IndicesStatsRequest().indices(rolloverRequest.getAlias())
IndicesStatsRequest statsRequest = new IndicesStatsRequest().indices(rolloverRequest.getRolloverTarget())
.clear()
.indicesOptions(IndicesOptions.fromOptions(true, false, true, true))
.indicesOptions(IndicesOptions.fromOptions(true, false, true, true, false, true, false, false, false, true))
.docs(true);
statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
client.execute(IndicesStatsAction.INSTANCE, statsRequest,
@ -142,11 +146,11 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
.preferV2Templates(IndexMetadata.PREFER_V2_TEMPLATES_SETTING.get(originalIndexSettings));
}
MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(currentState,
rolloverRequest.getAlias(), rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest(),
metConditions, false);
rolloverRequest.getRolloverTarget(), rolloverRequest.getNewIndexName(),
rolloverRequest.getCreateIndexRequest(), metConditions, false);
if (rolloverResult.sourceIndexName.equals(sourceIndexName) == false) {
throw new ElasticsearchException("Concurrent modification of alias [{}] during rollover",
rolloverRequest.getAlias());
rolloverRequest.getRolloverTarget());
}
return rolloverResult.clusterState;
}

View File

@ -1723,7 +1723,7 @@ public abstract class AbstractClient implements Client {
@Override
public RolloverRequestBuilder prepareRolloverIndex(String alias) {
return new RolloverRequestBuilder(this, RolloverAction.INSTANCE).setAlias(alias);
return new RolloverRequestBuilder(this, RolloverAction.INSTANCE).setRolloverTarget(alias);
}
@Override

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
@ -68,6 +69,29 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
return generation;
}
/**
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
*
* @param newWriteIndex the new write backing index. Must conform to the naming convention for
* backing indices on data streams. See {@link #getBackingIndexName}.
* @return new {@code DataStream} instance with the rollover operation applied
*/
public DataStream rollover(Index newWriteIndex) {
assert newWriteIndex.getName().equals(getBackingIndexName(name, generation + 1));
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(newWriteIndex);
return new DataStream(name, timeStampField, backingIndices, generation + 1);
}
/**
* Generates the name of the index that conforms to the naming convention for backing indices
* on data streams given the specified data stream name and generation.
*
* @param dataStreamName name of the data stream
* @param generation generation of the data stream
* @return backing index name
*/
public static String getBackingIndexName(String dataStreamName, long generation) {
return String.format(Locale.ROOT, "%s-%06d", dataStreamName, generation);
}

View File

@ -326,5 +326,9 @@ public interface IndexAbstraction {
public boolean isHidden() {
return false;
}
public org.elasticsearch.cluster.metadata.DataStream getDataStream() {
return dataStream;
}
}
}

View File

@ -109,6 +109,26 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas
assertThat(e.getMessage(), containsString("must not contain the following characters"));
}
public void testCreateDataStreamWithUppercaseCharacters() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "MAY_NOT_USE_UPPERCASE";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must be lowercase"));
}
public void testCreateDataStreamStartingWithPeriod() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = ".may_not_start_with_period";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must not start with '.'"));
}
private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))

View File

@ -20,15 +20,19 @@
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTests;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -41,10 +45,13 @@ import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener;
@ -54,6 +61,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -187,7 +195,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
assertTrue(foundRemoveWrite);
}
public void testValidation() {
public void testAliasValidation() {
String index1 = randomAlphaOfLength(10);
String aliasWithWriteIndex = randomAlphaOfLength(10);
String index2 = randomAlphaOfLength(10);
@ -211,18 +219,59 @@ public class MetadataRolloverServiceTests extends ESTestCase {
}
metadataBuilder.put(indexTwoBuilder);
Metadata metadata = metadataBuilder.build();
CreateIndexRequest req = new CreateIndexRequest();
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.validate(metadata, aliasWithNoWriteIndex));
assertThat(exception.getMessage(), equalTo("source alias [" + aliasWithNoWriteIndex + "] does not point to a write index"));
MetadataRolloverService.validate(metadata, aliasWithNoWriteIndex, randomAlphaOfLength(5), req));
assertThat(exception.getMessage(),
equalTo("rollover target [" + aliasWithNoWriteIndex + "] does not point to a write index"));
exception = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.validate(metadata, randomFrom(index1, index2)));
assertThat(exception.getMessage(), equalTo("source alias is a [concrete index], but an [alias] was expected"));
MetadataRolloverService.validate(metadata, randomFrom(index1, index2), randomAlphaOfLength(5), req));
assertThat(exception.getMessage(),
equalTo("rollover target is a [concrete index] but one of [alias,data_stream] was expected"));
final String aliasName = randomAlphaOfLength(5);
exception = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.validate(metadata, randomAlphaOfLength(5))
MetadataRolloverService.validate(metadata, aliasName, randomAlphaOfLength(5), req)
);
assertThat(exception.getMessage(), equalTo("source alias does not exist"));
MetadataRolloverService.validate(metadata, aliasWithWriteIndex);
assertThat(exception.getMessage(), equalTo("rollover target [" + aliasName + "] does not exist"));
MetadataRolloverService.validate(metadata, aliasWithWriteIndex, randomAlphaOfLength(5), req);
}
public void testDataStreamValidation() throws IOException {
Metadata.Builder md = Metadata.builder();
DataStream randomDataStream = DataStreamTests.randomInstance();
for (Index index : randomDataStream.getIndices()) {
md.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}
md.put(randomDataStream);
Metadata metadata = md.build();
CreateIndexRequest req = new CreateIndexRequest();
MetadataRolloverService.validate(metadata, randomDataStream.getName(), null, req);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.validate(metadata, randomDataStream.getName(), randomAlphaOfLength(5), req));
assertThat(exception.getMessage(),
equalTo("new index name may not be specified when rolling over a data stream"));
CreateIndexRequest aliasReq = new CreateIndexRequest().alias(new Alias("no_aliases_permitted"));
exception = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.validate(metadata, randomDataStream.getName(), null, aliasReq));
assertThat(exception.getMessage(),
equalTo("aliases, mappings, and index settings may not be specified when rolling over a data stream"));
String mapping = Strings.toString(JsonXContent.contentBuilder().startObject().startObject("_doc").endObject().endObject());
CreateIndexRequest mappingReq = new CreateIndexRequest().mapping(mapping);
exception = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.validate(metadata, randomDataStream.getName(), null, mappingReq));
assertThat(exception.getMessage(),
equalTo("aliases, mappings, and index settings may not be specified when rolling over a data stream"));
CreateIndexRequest settingReq = new CreateIndexRequest().settings(Settings.builder().put("foo", "bar"));
exception = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.validate(metadata, randomDataStream.getName(), null, settingReq));
assertThat(exception.getMessage(),
equalTo("aliases, mappings, and index settings may not be specified when rolling over a data stream"));
}
public void testGenerateRolloverIndexName() {
@ -263,6 +312,29 @@ public class MetadataRolloverServiceTests extends ESTestCase {
assertThat(createIndexRequest.cause(), equalTo("rollover_index"));
}
public void testCreateIndexRequestForDataStream() {
DataStream dataStream = DataStreamTests.randomInstance();
final String newWriteIndexName = DataStream.getBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
final RolloverRequest rolloverRequest = new RolloverRequest(dataStream.getName(), randomAlphaOfLength(10));
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
rolloverRequest.getCreateIndexRequest().waitForActiveShards(activeShardCount);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
rolloverRequest.getCreateIndexRequest().settings(settings);
final CreateIndexClusterStateUpdateRequest createIndexRequest =
MetadataRolloverService.prepareDataStreamCreateIndexRequest(newWriteIndexName, rolloverRequest.getCreateIndexRequest());
for (String settingKey : settings.keySet()) {
assertThat(settings.get(settingKey), equalTo(createIndexRequest.settings().get(settingKey)));
}
assertThat(createIndexRequest.settings().get("index.hidden"), equalTo("true"));
assertThat(createIndexRequest.index(), equalTo(newWriteIndexName));
assertThat(createIndexRequest.cause(), equalTo("rollover_data_stream"));
}
public void testRejectDuplicateAlias() {
final IndexTemplateMetadata template = IndexTemplateMetadata.builder("test-template")
.patterns(Arrays.asList("foo-*", "bar-*"))
@ -324,7 +396,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, Boolean.TRUE);
// not hidden will throw
final IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, false));
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, randomFrom(Boolean.FALSE, null)));
assertThat(ex.getMessage(), containsString("index template [test-template]"));
}
@ -344,7 +416,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, Boolean.TRUE);
// not hidden will throw
final IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, false));
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, randomFrom(Boolean.FALSE, null)));
assertThat(ex.getMessage(), containsString("index template [test-template]"));
}
@ -367,7 +439,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, Boolean.TRUE);
// not hidden will throw
final IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, false));
MetadataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metadata, indexName, aliasName, randomFrom(Boolean.FALSE, null)));
assertThat(ex.getMessage(), containsString("index template [test-template]"));
}
@ -412,7 +484,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
long before = testThreadPool.absoluteTimeInMillis();
MetadataRolloverService.RolloverResult rolloverResult =
rolloverService.rolloverClusterState(clusterState,aliasName, newIndexName, createIndexRequest, metConditions,
rolloverService.rolloverClusterState(clusterState, aliasName, newIndexName, createIndexRequest, metConditions,
randomBoolean());
long after = testThreadPool.absoluteTimeInMillis();
@ -441,6 +513,68 @@ public class MetadataRolloverServiceTests extends ESTestCase {
}
}
public void testRolloverClusterStateForDataStream() throws Exception {
final DataStream dataStream = DataStreamTests.randomInstance();
Metadata.Builder builder = Metadata.builder();
for (Index index : dataStream.getIndices()) {
builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}
builder.put(dataStream);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
Environment env = mock(Environment.class);
when(env.sharedDataFile()).thenReturn(null);
AllocationService allocationService = mock(AllocationService.class);
when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]);
IndicesService indicesService = mockIndicesServices();
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(Settings.EMPTY,
clusterService, indicesService, allocationService, null, env, null, testThreadPool, null, Collections.emptyList(), false);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService,
new AliasValidator(), null, xContentRegistry());
MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService,
mockIndexNameExpressionResolver);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
long before = testThreadPool.absoluteTimeInMillis();
MetadataRolloverService.RolloverResult rolloverResult =
rolloverService.rolloverClusterState(clusterState, dataStream.getName(), null, createIndexRequest, metConditions,
randomBoolean());
long after = testThreadPool.absoluteTimeInMillis();
String sourceIndexName = DataStream.getBackingIndexName(dataStream.getName(), dataStream.getGeneration());
String newIndexName = DataStream.getBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
assertEquals(sourceIndexName, rolloverResult.sourceIndexName);
assertEquals(newIndexName, rolloverResult.rolloverIndexName);
Metadata rolloverMetadata = rolloverResult.clusterState.metadata();
assertEquals(dataStream.getIndices().size() + 1, rolloverMetadata.indices().size());
IndexMetadata rolloverIndexMetadata = rolloverMetadata.index(newIndexName);
IndexAbstraction ds = rolloverMetadata.getIndicesLookup().get(dataStream.getName());
assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM));
assertThat(ds.getIndices(), hasSize(dataStream.getIndices().size() + 1));
assertThat(ds.getIndices(), hasItem(rolloverMetadata.index(sourceIndexName)));
assertThat(ds.getIndices(), hasItem(rolloverIndexMetadata));
assertThat(ds.getWriteIndex(), equalTo(rolloverIndexMetadata));
RolloverInfo info = rolloverMetadata.index(sourceIndexName).getRolloverInfos().get(dataStream.getName());
assertThat(info.getTime(), lessThanOrEqualTo(after));
assertThat(info.getTime(), greaterThanOrEqualTo(before));
assertThat(info.getMetConditions(), hasSize(1));
assertThat(info.getMetConditions().get(0).value(), equalTo(condition.value()));
} finally {
testThreadPool.shutdown();
}
}
private IndicesService mockIndicesServices() throws Exception {
/*
* Throws Exception because Eclipse uses the lower bound for

View File

@ -142,7 +142,7 @@ public class RolloverIT extends ESIntegTestCase {
}
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> client().admin().indices().prepareRolloverIndex("alias").dryRun(randomBoolean()).get());
assertThat(exception.getMessage(), equalTo("source alias [alias] does not point to a write index"));
assertThat(exception.getMessage(), equalTo("rollover target [alias] does not point to a write index"));
}
public void testRolloverWithIndexSettings() throws Exception {

View File

@ -158,7 +158,7 @@ public class RolloverRequestTests extends ESTestCase {
try (StreamInput in = new NamedWriteableAwareStreamInput(bytes.streamInput(), writeableRegistry)) {
RolloverRequest cloneRequest = new RolloverRequest(in);
assertThat(cloneRequest.getNewIndexName(), equalTo(originalRequest.getNewIndexName()));
assertThat(cloneRequest.getAlias(), equalTo(originalRequest.getAlias()));
assertThat(cloneRequest.getRolloverTarget(), equalTo(originalRequest.getRolloverTarget()));
for (Map.Entry<String, Condition<?>> entry : cloneRequest.getConditions().entrySet()) {
Condition<?> condition = originalRequest.getConditions().get(entry.getKey());
//here we compare the string representation as there is some information loss when serializing
@ -218,7 +218,7 @@ public class RolloverRequestTests extends ESTestCase {
ActionRequestValidationException validationException = rolloverRequest.validate();
assertNotNull(validationException);
assertEquals(1, validationException.validationErrors().size());
assertEquals("index alias is missing", validationException.validationErrors().get(0));
assertEquals("rollover target is missing", validationException.validationErrors().get(0));
}
private static List<Consumer<RolloverRequest>> conditionsGenerator = new ArrayList<>();

View File

@ -27,6 +27,10 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import static org.elasticsearch.cluster.metadata.DataStream.getBackingIndexName;
import static org.hamcrest.Matchers.equalTo;
public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
@ -34,16 +38,16 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
int numIndices = randomIntBetween(0, 128);
List<Index> indices = new ArrayList<>(numIndices);
for (int i = 0; i < numIndices; i++) {
indices.add(new Index(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random())));
indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())));
}
return indices;
}
public static DataStream randomInstance() {
List<Index> indices = randomIndexInstances();
long generation = randomLongBetween(1, 128);
String dataStreamName = randomAlphaOfLength(10);
indices.add(new Index(DataStream.getBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
long generation = indices.size() + randomLongBetween(1, 128);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
indices.add(new Index(getBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
return new DataStream(dataStreamName, randomAlphaOfLength(10), indices, generation);
}
@ -62,4 +66,16 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
return randomInstance();
}
public void testRollover() {
DataStream ds = randomInstance();
Index newWriteIndex = new Index(getBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random()));
DataStream rolledDs = ds.rollover(newWriteIndex);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
assertTrue(rolledDs.getIndices().contains(newWriteIndex));
}
}

View File

@ -26,6 +26,8 @@ import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -39,8 +41,10 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -84,10 +88,32 @@ public class DataStreamIT extends ESIntegTestCase {
int numDocsFoo = randomIntBetween(2, 16);
indexDocs("metrics-foo", numDocsFoo);
verifyDocs("metrics-bar", numDocsBar);
verifyDocs("metrics-foo", numDocsFoo);
verifyDocs("metrics-bar", numDocsBar, 1, 1);
verifyDocs("metrics-foo", numDocsFoo, 1, 1);
// TODO: execute rollover and index some more data.
RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("metrics-foo", null)).get();
assertThat(rolloverResponse.getNewIndex(), equalTo("metrics-foo-000002"));
assertTrue(rolloverResponse.isRolledOver());
rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("metrics-bar", null)).get();
assertThat(rolloverResponse.getNewIndex(), equalTo("metrics-bar-000002"));
assertTrue(rolloverResponse.isRolledOver());
getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices("metrics-foo-000002")).actionGet();
assertThat(getIndexResponse.getSettings().get("metrics-foo-000002"), notNullValue());
assertThat(getIndexResponse.getSettings().get("metrics-foo-000002").getAsBoolean("index.hidden", null), is(true));
getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices("metrics-bar-000002")).actionGet();
assertThat(getIndexResponse.getSettings().get("metrics-bar-000002"), notNullValue());
assertThat(getIndexResponse.getSettings().get("metrics-bar-000002").getAsBoolean("index.hidden", null), is(true));
int numDocsBar2 = randomIntBetween(2, 16);
indexDocs("metrics-bar", numDocsBar2);
int numDocsFoo2 = randomIntBetween(2, 16);
indexDocs("metrics-foo", numDocsFoo2);
verifyDocs("metrics-bar", numDocsBar + numDocsBar2, 1, 2);
verifyDocs("metrics-foo", numDocsFoo + numDocsFoo2, 1, 2);
DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request("metrics-*");
client().admin().indices().deleteDataStream(deleteDataStreamRequest).actionGet();
@ -96,8 +122,12 @@ public class DataStreamIT extends ESIntegTestCase {
expectThrows(IndexNotFoundException.class,
() -> client().admin().indices().getIndex(new GetIndexRequest().indices("metrics-bar-000001")).actionGet());
expectThrows(IndexNotFoundException.class,
() -> client().admin().indices().getIndex(new GetIndexRequest().indices("metrics-bar-000002")).actionGet());
expectThrows(IndexNotFoundException.class,
() -> client().admin().indices().getIndex(new GetIndexRequest().indices("metrics-foo-000001")).actionGet());
expectThrows(IndexNotFoundException.class,
() -> client().admin().indices().getIndex(new GetIndexRequest().indices("metrics-foo-000002")).actionGet());
}
public void testOtherWriteOps() throws Exception {
@ -163,13 +193,18 @@ public class DataStreamIT extends ESIntegTestCase {
client().admin().indices().refresh(new RefreshRequest(dataStream)).actionGet();
}
private static void verifyDocs(String dataStream, long expectedNumHits) {
private static void verifyDocs(String dataStream, long expectedNumHits, long minGeneration, long maxGeneration) {
SearchRequest searchRequest = new SearchRequest(dataStream);
searchRequest.source().size((int) expectedNumHits);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(expectedNumHits));
List<String> expectedIndices = new ArrayList<>();
for (long k = minGeneration; k <= maxGeneration; k++) {
expectedIndices.add(DataStream.getBackingIndexName(dataStream, k));
}
Arrays.stream(searchResponse.getHits().getHits()).forEach(hit -> {
assertThat(hit.getIndex(), equalTo(DataStream.getBackingIndexName(dataStream, 1)));
assertTrue(expectedIndices.contains(hit.getIndex()));
});
}

View File

@ -75,7 +75,7 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
assertNotNull(request);
assertEquals(1, request.indices().length);
assertEquals(alias, request.indices()[0]);
assertEquals(alias, request.getAlias());
assertEquals(alias, request.getRolloverTarget());
assertFalse(request.isDryRun());
assertEquals(0, request.getConditions().size());
}

View File

@ -92,7 +92,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
assertNotNull(request);
assertEquals(1, request.indices().length);
assertEquals(alias, request.indices()[0]);
assertEquals(alias, request.getAlias());
assertEquals(alias, request.getRolloverTarget());
assertEquals(expectedConditions.size(), request.getConditions().size());
assertTrue(request.isDryRun());
Set<Object> expectedConditionValues = expectedConditions.stream().map(Condition::value).collect(Collectors.toSet());