adds _rollover api

This commit is contained in:
Areek Zillur 2016-06-03 14:52:54 -04:00
parent a98856663b
commit a30437f250
16 changed files with 1207 additions and 1 deletions

View File

@ -107,6 +107,8 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@ -289,6 +291,7 @@ public class ActionModule extends AbstractModule {
registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
registerAction(ShrinkAction.INSTANCE, TransportShrinkAction.class);
registerAction(RolloverAction.INSTANCE, TransportRolloverAction.class);
registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);

View File

@ -28,7 +28,7 @@ public class IndicesAliasesClusterStateUpdateRequest extends ClusterStateUpdateR
AliasAction[] actions;
IndicesAliasesClusterStateUpdateRequest() {
public IndicesAliasesClusterStateUpdateRequest() {
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Locale;
public class Condition {
public enum ConditionType {
MAX_SIZE((byte) 0),
MAX_AGE((byte) 1),
MAX_DOCS((byte) 2);
private final byte id;
ConditionType(byte id) {
this.id = id;
}
public byte getId() {
return id;
}
public static ConditionType fromId(byte id) {
if (id == 0) {
return MAX_SIZE;
} else if (id == 1) {
return MAX_AGE;
} else if (id == 2) {
return MAX_DOCS;
} else {
throw new IllegalArgumentException("no condition type [" + id + "]");
}
}
public static ConditionType fromString(String type) {
final String typeString = type.toLowerCase(Locale.ROOT);
switch (typeString) {
case "max_size":
return MAX_SIZE;
case "max_age":
return MAX_AGE;
case "max_docs":
return MAX_DOCS;
default:
throw new IllegalArgumentException("no condition type [" + type + "]");
}
}
public static long parseFromString(ConditionType condition, String value) {
switch (condition) {
case MAX_SIZE:
return ByteSizeValue.parseBytesSizeValue(value, MAX_SIZE.name().toLowerCase(Locale.ROOT)).getBytes();
case MAX_AGE:
return TimeValue.parseTimeValue(value, MAX_AGE.name().toLowerCase(Locale.ROOT)).getMillis();
case MAX_DOCS:
try {
return Long.valueOf(value);
} catch (NumberFormatException e) {
throw new ElasticsearchParseException("Failed to parse setting [{}] with value [{}] as long", e,
MAX_DOCS.name().toLowerCase(Locale.ROOT), value);
}
default:
throw new ElasticsearchParseException("condition [" + condition + "] not recognized");
}
}
}
private final ConditionType type;
private final long value;
public Condition(ConditionType type, long value) {
this.type = type;
this.value = value;
}
public ConditionType getType() {
return type;
}
public long getValue() {
return value;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
*/
public class RolloverAction extends Action<RolloverRequest, RolloverResponse, RolloverRequestBuilder> {
public static final RolloverAction INSTANCE = new RolloverAction();
public static final String NAME = "indices:admin/rollover";
private RolloverAction() {
super(NAME);
}
@Override
public RolloverResponse newResponse() {
return new RolloverResponse();
}
@Override
public RolloverRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RolloverRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Request class to swap index under an alias given some predicates
* TODO: documentation
*/
public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implements IndicesRequest {
private String sourceAlias;
private String optionalTargetAlias;
private List<Condition> conditions = new ArrayList<>();
RolloverRequest() {}
public RolloverRequest(String sourceAlias, String optionalTargetAlias) {
this.sourceAlias = sourceAlias;
this.optionalTargetAlias = optionalTargetAlias;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (sourceAlias == null) {
validationException = addValidationError("source alias is missing", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sourceAlias = in.readString();
optionalTargetAlias = in.readOptionalString();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
Condition.ConditionType type = Condition.ConditionType.fromId(in.readByte());
long value = in.readLong();
this.conditions.add(new Condition(type, value));
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sourceAlias);
out.writeOptionalString(optionalTargetAlias);
out.writeVInt(conditions.size());
for (Condition condition : conditions) {
out.writeByte(condition.getType().getId());
out.writeLong(condition.getValue());
}
}
@Override
public String[] indices() {
return new String[] {sourceAlias};
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
public void setSourceAlias(String sourceAlias) {
this.sourceAlias = sourceAlias;
}
public void setOptionalTargetAlias(String optionalTargetAlias) {
this.optionalTargetAlias = optionalTargetAlias;
}
public void addMaxIndexAgeCondition(String age) {
addCondition(Condition.ConditionType.MAX_AGE, Condition.ConditionType.parseFromString(Condition.ConditionType.MAX_AGE, age));
}
public void addMaxIndexDocsCondition(String docs) {
addCondition(Condition.ConditionType.MAX_DOCS, Condition.ConditionType.parseFromString(Condition.ConditionType.MAX_DOCS, docs));
}
public void addMaxIndexSizeCondition(String size) {
addCondition(Condition.ConditionType.MAX_SIZE, Condition.ConditionType.parseFromString(Condition.ConditionType.MAX_SIZE, size));
}
private void addCondition(Condition.ConditionType conditionType, long value) {
this.conditions.add(new Condition(conditionType, value));
}
public List<Condition> getConditions() {
return conditions;
}
public String getSourceAlias() {
return sourceAlias;
}
public String getOptionalTargetAlias() {
return optionalTargetAlias;
}
public void source(BytesReference source) {
XContentType xContentType = XContentFactory.xContentType(source);
if (xContentType != null) {
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(source)) {
source(parser.map());
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse source for rollover index", e);
}
} else {
throw new ElasticsearchParseException("failed to parse content type for rollover index source");
}
}
private void source(Map<String, Object> map) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (entry.getKey().equals("conditions")) {
final Map<String, String> conditions = (Map<String, String>) entry.getValue();
for (Map.Entry<String, String> conditionEntry : conditions.entrySet()) {
final Condition.ConditionType conditionType = Condition.ConditionType.fromString(conditionEntry.getKey());
this.addCondition(conditionType, Condition.ConditionType.parseFromString(conditionType,
conditionEntry.getValue()));
}
} else {
throw new ElasticsearchParseException("unknown property [" + entry.getKey() + "]");
}
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* TODO: Documentation
*/
public class RolloverRequestBuilder extends MasterNodeOperationRequestBuilder<RolloverRequest, RolloverResponse,
RolloverRequestBuilder> {
public RolloverRequestBuilder(ElasticsearchClient client, RolloverAction action) {
super(client, action, new RolloverRequest());
}
public RolloverRequestBuilder setSourceAlias(String sourceAlias) {
this.request.setSourceAlias(sourceAlias);
return this;
}
public RolloverRequestBuilder setOptionalTargetAlias(String optionalTargetAlias) {
this.request.setOptionalTargetAlias(optionalTargetAlias);
return this;
}
public RolloverRequestBuilder addMaxIndexSizeCondition(String size) {
this.request.addMaxIndexSizeCondition(size);
return this;
}
public RolloverRequestBuilder addMaxIndexAgeCondition(String age) {
this.request.addMaxIndexAgeCondition(age);
return this;
}
public RolloverRequestBuilder addMaxIndexDocsCondition(String docs) {
this.request.addMaxIndexDocsCondition(docs);
return this;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public final class RolloverResponse extends ActionResponse {
private String oldIndex;
private String newIndex;
RolloverResponse() {
}
RolloverResponse(String oldIndex, String newIndex) {
this.oldIndex = oldIndex;
this.newIndex = newIndex;
}
public String getOldIndex() {
return oldIndex;
}
public String getNewIndex() {
return newIndex;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
oldIndex = in.readString();
newIndex = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(oldIndex);
out.writeString(newIndex);
}
}

View File

@ -0,0 +1,257 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
/**
* Main class to swap the index pointed to by an alias, given some predicates
*/
public class TransportRolloverAction extends TransportMasterNodeAction<RolloverRequest, RolloverResponse> {
private final MetaDataCreateIndexService createIndexService;
private final MetaDataIndexAliasesService indexAliasesService;
private final Client client;
@Inject
public TransportRolloverAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetaDataIndexAliasesService indexAliasesService, Client client) {
super(settings, RolloverAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
RolloverRequest::new);
this.createIndexService = createIndexService;
this.indexAliasesService = indexAliasesService;
this.client = client;
}
@Override
protected String executor() {
// we go async right away
return ThreadPool.Names.SAME;
}
@Override
protected RolloverResponse newResponse() {
return new RolloverResponse();
}
@Override
protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState state) {
IndicesOptions indicesOptions = IndicesOptions.fromOptions(true, true,
request.indicesOptions().expandWildcardsOpen(), request.indicesOptions().expandWildcardsClosed());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request.indices()));
}
@Override
protected void masterOperation(final RolloverRequest rolloverRequest, final ClusterState state,
final ActionListener<RolloverResponse> listener) {
final MetaData metaData = state.metaData();
validate(metaData, rolloverRequest);
final AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(rolloverRequest.getSourceAlias());
final IndexMetaData indexMetaData = aliasOrIndex.getIndices().get(0);
final String sourceIndexName = indexMetaData.getIndex().getName();
client.admin().indices().prepareStats(sourceIndexName).clear().setDocs(true).setStore(true)
.execute(new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
final IndexMetaData sourceIndex = metaData.index(sourceIndexName);
DocsStats docsStats = indicesStatsResponse.getTotal().getDocs();
long docCount = docsStats == null ? 0 : docsStats.getCount();
StoreStats storeStats = indicesStatsResponse.getTotal().getStore();
long sizeInBytes = storeStats == null ? 0 : storeStats.getSizeInBytes();
long creationDate = sourceIndex.getCreationDate();
if (satisfiesConditions(rolloverRequest.getConditions(), docCount, sizeInBytes, creationDate)) {
final String rolloverIndexName = generateRolloverIndexName(sourceIndexName);
boolean createRolloverIndex = metaData.index(rolloverIndexName) == null;
if (createRolloverIndex) {
CreateIndexClusterStateUpdateRequest updateRequest =
prepareCreateIndexRequest(rolloverIndexName, rolloverRequest);
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
indexAliasesService.indicesAliases(
prepareIndicesAliasesRequest(sourceIndexName, rolloverIndexName, rolloverRequest),
new IndicesAliasesListener(sourceIndexName, rolloverIndexName, listener));
}
@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create rollover index", t, updateRequest.index());
} else {
logger.debug("[{}] failed to create rollover index", t, updateRequest.index());
}
listener.onFailure(t);
}
});
} else {
indexAliasesService.indicesAliases(
prepareIndicesAliasesRequest(sourceIndexName, rolloverIndexName, rolloverRequest),
new IndicesAliasesListener(sourceIndexName, rolloverIndexName, listener));
}
} else {
// conditions not met
listener.onResponse(new RolloverResponse(sourceIndexName, sourceIndexName));
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
}
private static final class IndicesAliasesListener implements ActionListener<ClusterStateUpdateResponse> {
private final ActionListener<RolloverResponse> listener;
private final String oldIndex;
private final String newIndex;
public IndicesAliasesListener(String oldIndex, String newIndex, ActionListener<RolloverResponse> listener) {
this.oldIndex = oldIndex;
this.newIndex = newIndex;
this.listener = listener;
}
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
listener.onResponse(new RolloverResponse(oldIndex, newIndex));
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
}
static IndicesAliasesClusterStateUpdateRequest prepareIndicesAliasesRequest(String concreteSourceIndex, String targetIndex,
RolloverRequest request) {
final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest()
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
final AliasAction[] actions;
if (request.getOptionalTargetAlias() != null) {
actions = new AliasAction[3];
actions[0] = new AliasAction(AliasAction.Type.ADD, targetIndex, request.getSourceAlias());
actions[1] = new AliasAction(AliasAction.Type.ADD, targetIndex, request.getOptionalTargetAlias());
actions[2] = new AliasAction(AliasAction.Type.REMOVE, concreteSourceIndex, request.getSourceAlias());
} else {
actions = new AliasAction[2];
actions[0] = new AliasAction(AliasAction.Type.ADD, targetIndex, request.getSourceAlias());
actions[1] = new AliasAction(AliasAction.Type.REMOVE, concreteSourceIndex, request.getSourceAlias());
}
updateRequest.actions(actions);
return updateRequest;
}
static String generateRolloverIndexName(String sourceIndexName) {
int numberIndex = sourceIndexName.lastIndexOf("-");
int counter = 1;
String indexPrefix = sourceIndexName;
if (numberIndex != -1) {
try {
counter = Integer.parseInt(sourceIndexName.substring(numberIndex + 1));
counter++;
indexPrefix = sourceIndexName.substring(0, numberIndex);
} catch (NumberFormatException ignored) {
}
}
return String.join("-", indexPrefix, String.valueOf(counter));
}
static boolean satisfiesConditions(List<Condition> requestConditions, long docCount, long sizeInBytes,
long creationDate) {
for (Condition condition: requestConditions) {
switch (condition.getType()) {
case MAX_SIZE:
if (sizeInBytes < condition.getValue()) {
return false;
}
break;
case MAX_AGE:
long currentAge = System.currentTimeMillis() - creationDate;
if (currentAge < condition.getValue()) {
return false;
}
break;
case MAX_DOCS:
if (docCount < condition.getValue()) {
return false;
}
break;
}
}
return true;
}
static void validate(MetaData metaData, RolloverRequest request) {
final AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(request.getSourceAlias());
if (aliasOrIndex == null) {
throw new IllegalArgumentException("source alias does not exist");
}
if (aliasOrIndex.isAlias() == false) {
throw new IllegalArgumentException("source alias is a concrete index");
}
if (aliasOrIndex.getIndices().size() != 1) {
throw new IllegalArgumentException("source alias maps to multiple indices");
}
}
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final String targetIndexName,
final RolloverRequest rolloverRequest) {
return new CreateIndexClusterStateUpdateRequest(rolloverRequest,
"rollover_index", targetIndexName, true)
.ackTimeout(rolloverRequest.timeout())
.masterNodeTimeout(rolloverRequest.masterNodeTimeout());
}
}

View File

@ -80,6 +80,9 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder;
@ -800,4 +803,19 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
void shrinkIndex(ShrinkRequest request, ActionListener<ShrinkResponse> listener);
/**
* Swaps the index pointed to by an alias given all provided conditions are satisfied
*/
RolloverRequestBuilder prepareRolloverIndex(String sourceAlias);
/**
* Swaps the index pointed to by an alias given all provided conditions are satisfied
*/
ActionFuture<RolloverResponse> rolloversIndex(RolloverRequest request);
/**
* Swaps the index pointed to by an alias given all provided conditions are satisfied
*/
void rolloverIndex(RolloverRequest request, ActionListener<RolloverResponse> listener);
}

View File

@ -196,6 +196,10 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
@ -1704,6 +1708,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client
execute(ShrinkAction.INSTANCE, request, listener);
}
@Override
public RolloverRequestBuilder prepareRolloverIndex(String sourceAlias) {
return new RolloverRequestBuilder(this, RolloverAction.INSTANCE).setSourceAlias(sourceAlias);
}
@Override
public ActionFuture<RolloverResponse> rolloversIndex(RolloverRequest request) {
return execute(RolloverAction.INSTANCE, request);
}
@Override
public void rolloverIndex(RolloverRequest request, ActionListener<RolloverResponse> listener) {
execute(RolloverAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<GetSettingsResponse> getSettings(GetSettingsRequest request) {
return execute(GetSettingsAction.INSTANCE, request);

View File

@ -72,6 +72,7 @@ import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestGetStoredSc
import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutSearchTemplateAction;
import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutStoredScriptAction;
import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction;
import org.elasticsearch.rest.action.admin.indices.RestRolloverIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestShrinkIndexAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction;
import org.elasticsearch.rest.action.admin.indices.alias.delete.RestIndexDeleteAliasesAction;
@ -211,6 +212,7 @@ public class NetworkModule extends AbstractModule {
RestIndicesAliasesAction.class,
RestCreateIndexAction.class,
RestShrinkIndexAction.class,
RestRolloverIndexAction.class,
RestDeleteIndexAction.class,
RestCloseIndexAction.class,
RestOpenIndexAction.class,

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import static org.elasticsearch.rest.RestStatus.OK;
/**
*
*/
public class RestRolloverIndexAction extends BaseRestHandler {
@Inject
public RestRolloverIndexAction(Settings settings, RestController controller, Client client) {
super(settings, client);
controller.registerHandler(RestRequest.Method.PUT, "/{alias}/_rollover/{optional_alias}", this);
controller.registerHandler(RestRequest.Method.POST, "/{alias}/_rollover/{optional_alias}", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
if (request.param("alias") == null) {
throw new IllegalArgumentException("no source alias");
}
RolloverRequest rolloverIndexRequest = new RolloverRequest(request.param("alias"), request.param("optional_alias"));
if (request.hasContent()) {
rolloverIndexRequest.source(request.content());
}
rolloverIndexRequest.timeout(request.paramAsTime("timeout", rolloverIndexRequest.timeout()));
rolloverIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", rolloverIndexRequest.masterNodeTimeout()));
client.admin().indices().rolloverIndex(rolloverIndexRequest, new RestBuilderListener<RolloverResponse>(channel) {
@Override
public RestResponse buildResponse(RolloverResponse rolloverResponse, XContentBuilder builder) throws Exception {
builder.startObject();
builder.field("old_index", rolloverResponse.getOldIndex());
builder.field("new_index", rolloverResponse.getNewIndex());
builder.endObject();
return new BytesRestResponse(OK, builder);
}
});
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.test.ESIntegTestCase;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class RolloverIT extends ESIntegTestCase {
public void testRolloverOnEmptyIndex() throws Exception {
assertAcked(prepareCreate("test_index").addAlias(new Alias("test_alias")).get());
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias").get();
assertThat(response.getOldIndex(), equalTo("test_index"));
assertThat(response.getNewIndex(), equalTo("test_index-1"));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index");
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
final IndexMetaData newIndex = state.metaData().index("test_index-1");
assertTrue(newIndex.getAliases().containsKey("test_alias"));
}
public void testRollover() throws Exception {
assertAcked(prepareCreate("test_index").addAlias(new Alias("test_alias")).get());
index("test_index", "type1", "1", "field", "value");
flush("test_index");
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias").get();
assertThat(response.getOldIndex(), equalTo("test_index"));
assertThat(response.getNewIndex(), equalTo("test_index-1"));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index");
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
final IndexMetaData newIndex = state.metaData().index("test_index-1");
assertTrue(newIndex.getAliases().containsKey("test_alias"));
}
public void testRolloverConditionsNotMet() throws Exception {
assertAcked(prepareCreate("test_index").addAlias(new Alias("test_alias")).get());
index("test_index", "type1", "1", "field", "value");
flush("test_index");
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias")
.addMaxIndexAgeCondition("7d").get();
assertThat(response.getOldIndex(), equalTo("test_index"));
assertThat(response.getNewIndex(), equalTo("test_index"));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index");
assertTrue(oldIndex.getAliases().containsKey("test_alias"));
final IndexMetaData newIndex = state.metaData().index("test_index-1");
assertNull(newIndex);
}
public void testRolloverWithOptionalTargetAlias() throws Exception {
assertAcked(prepareCreate("test_index").addAlias(new Alias("test_alias")).get());
index("test_index", "type1", "1", "field", "value");
flush("test_index");
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias")
.setOptionalTargetAlias("test_alias_2").get();
assertThat(response.getOldIndex(), equalTo("test_index"));
assertThat(response.getNewIndex(), equalTo("test_index-1"));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index");
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
final IndexMetaData newIndex = state.metaData().index("test_index-1");
assertTrue(newIndex.getAliases().containsKey("test_alias"));
assertTrue(newIndex.getAliases().containsKey("test_alias_2"));
}
public void testRolloverOnExistingIndex() throws Exception {
assertAcked(prepareCreate("test_index").addAlias(new Alias("test_alias")).get());
index("test_index", "type1", "1", "field", "value");
assertAcked(prepareCreate("test_index-1").get());
index("test_index-1", "type1", "1", "field", "value");
flush("test_index", "test_index-1");
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias").get();
assertThat(response.getOldIndex(), equalTo("test_index"));
assertThat(response.getNewIndex(), equalTo("test_index-1"));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index");
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
final IndexMetaData newIndex = state.metaData().index("test_index-1");
assertTrue(newIndex.getAliases().containsKey("test_alias"));
}
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class TransportRolloverActionTests extends ESTestCase {
public void testSatisfyConditions() throws Exception {
List<Condition> conditions = Collections.emptyList();
assertTrue(TransportRolloverAction.satisfiesConditions(conditions, randomLong(), randomLong(),
randomLong()));
conditions = Collections.singletonList(
new Condition(Condition.ConditionType.MAX_AGE, 10L));
assertTrue(TransportRolloverAction.satisfiesConditions(conditions, randomLong(), randomLong(),
System.currentTimeMillis() - randomIntBetween(10, 100)));
assertFalse(TransportRolloverAction.satisfiesConditions(conditions, randomLong(), randomLong(),
System.currentTimeMillis() - randomIntBetween(1, 9)));
conditions = Collections.singletonList(
new Condition(Condition.ConditionType.MAX_SIZE, 10L));
assertTrue(TransportRolloverAction.satisfiesConditions(conditions, randomLong(), randomIntBetween(10, 100),
randomLong()));
assertFalse(TransportRolloverAction.satisfiesConditions(conditions, randomLong(), randomIntBetween(1, 9),
randomLong()));
conditions = Collections.singletonList(
new Condition(Condition.ConditionType.MAX_DOCS, 10L));
assertTrue(TransportRolloverAction.satisfiesConditions(conditions, randomIntBetween(10, 100), randomLong(),
randomLong()));
assertFalse(TransportRolloverAction.satisfiesConditions(conditions, randomIntBetween(1, 9), randomLong(),
randomLong()));
conditions = Arrays.asList(new Condition(Condition.ConditionType.MAX_AGE, 100L),
new Condition(Condition.ConditionType.MAX_DOCS, 1000L));
assertTrue(TransportRolloverAction.satisfiesConditions(conditions, randomIntBetween(1000, 1500),
randomLong(), System.currentTimeMillis() - randomIntBetween(100, 500)));
assertFalse(TransportRolloverAction.satisfiesConditions(conditions, randomIntBetween(1, 999),
randomLong(), System.currentTimeMillis() - randomIntBetween(100, 500)));
assertFalse(TransportRolloverAction.satisfiesConditions(conditions, randomIntBetween(1000, 1500),
randomLong(), System.currentTimeMillis() - randomIntBetween(1, 99)));
}
public void testCreateUpdateAliasRequest() throws Exception {
String sourceAlias = randomAsciiOfLength(10);
String sourceIndex = randomAsciiOfLength(10);
String targetIndex = randomAsciiOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, null);
final IndicesAliasesClusterStateUpdateRequest updateRequest =
TransportRolloverAction.prepareIndicesAliasesRequest(sourceIndex, targetIndex, rolloverRequest);
final AliasAction[] actions = updateRequest.actions();
assertThat(actions.length, equalTo(2));
boolean foundAdd = false;
boolean foundRemove = false;
for (AliasAction action : actions) {
if (action.actionType() == AliasAction.Type.ADD) {
foundAdd = true;
assertThat(action.index(), equalTo(targetIndex));
assertThat(action.alias(), equalTo(sourceAlias));
} else if (action.actionType() == AliasAction.Type.REMOVE) {
foundRemove = true;
assertThat(action.index(), equalTo(sourceIndex));
assertThat(action.alias(), equalTo(sourceAlias));
}
}
assertTrue(foundAdd);
assertTrue(foundRemove);
}
public void testCreateUpdateAliasRequestWithOptionalTargetAlias() throws Exception {
String sourceAlias = randomAsciiOfLength(10);
String optionalTargetAlias = randomAsciiOfLength(10);
String sourceIndex = randomAsciiOfLength(10);
String targetIndex = randomAsciiOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, optionalTargetAlias);
final IndicesAliasesClusterStateUpdateRequest updateRequest =
TransportRolloverAction.prepareIndicesAliasesRequest(sourceIndex, targetIndex, rolloverRequest);
final AliasAction[] actions = updateRequest.actions();
assertThat(actions.length, equalTo(3));
boolean foundAdd = false;
boolean foundRemove = false;
for (AliasAction action : actions) {
if (action.actionType() == AliasAction.Type.ADD) {
foundAdd = true;
assertThat(action.index(), equalTo(targetIndex));
assertThat(action.alias(), anyOf(equalTo(sourceAlias),
equalTo(optionalTargetAlias)));
} else if (action.actionType() == AliasAction.Type.REMOVE) {
foundRemove = true;
assertThat(action.index(), equalTo(sourceIndex));
assertThat(action.alias(), equalTo(sourceAlias));
}
}
assertTrue(foundAdd);
assertTrue(foundRemove);
}
public void testValidation() throws Exception {
String index1 = randomAsciiOfLength(10);
String alias = randomAsciiOfLength(10);
String index2 = randomAsciiOfLength(10);
String aliasWithMultipleIndices = randomAsciiOfLength(10);
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
final MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(index1)
.settings(settings)
.putAlias(AliasMetaData.builder(alias))
.putAlias(AliasMetaData.builder(aliasWithMultipleIndices))
)
.put(IndexMetaData.builder(index2)
.settings(settings)
.putAlias(AliasMetaData.builder(aliasWithMultipleIndices))
).build();
try {
TransportRolloverAction.validate(metaData, new RolloverRequest(aliasWithMultipleIndices,
randomBoolean() ? null : randomAsciiOfLength(10)));
fail("expected to throw exception");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("source alias maps to multiple indices"));
}
try {
TransportRolloverAction.validate(metaData, new RolloverRequest(randomFrom(index1, index2),
randomBoolean() ? null : randomAsciiOfLength(10)));
fail("expected to throw exception");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("source alias is a concrete index"));
}
try {
TransportRolloverAction.validate(metaData, new RolloverRequest(randomAsciiOfLength(5),
randomBoolean() ? null : randomAsciiOfLength(10)));
fail("expected to throw exception");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("source alias does not exist"));
}
TransportRolloverAction.validate(metaData, new RolloverRequest(alias,
randomBoolean() ? null : randomAsciiOfLength(10)));
}
public void testGenerateRolloverIndexName() throws Exception {
String index = randomAsciiOfLength(10);
assertThat(TransportRolloverAction.generateRolloverIndexName(index), not(equalTo(index)));
assertThat(TransportRolloverAction.generateRolloverIndexName("index"), equalTo("index-1"));
assertThat(TransportRolloverAction.generateRolloverIndexName("index-1"), equalTo("index-2"));
assertThat(TransportRolloverAction.generateRolloverIndexName("index-name-1"), equalTo("index-name-2"));
assertThat(TransportRolloverAction.generateRolloverIndexName("index-name"), equalTo("index-name-1"));
}
}

View File

@ -0,0 +1,35 @@
{
"indices.rollover": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-rollover-index.html",
"methods": ["PUT", "POST"],
"url": {
"path": "/{alias}/_rollover/{target_alias}",
"paths": ["/{alias}/_rollover/{target_alias}"],
"parts": {
"alias": {
"type" : "string",
"required" : true,
"description" : "The name of the alias to rollover"
},
"target_alias": {
"type" : "string",
"required" : false,
"description" : "The name of optional target alias"
}
},
"params": {
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
},
"master_timeout": {
"type" : "time",
"description" : "Specify timeout for connection to master"
}
}
},
"body": {
"description" : "The conditions that needs to be met for executing rollover"
}
}
}

View File

@ -0,0 +1,69 @@
---
"Rollover index via API":
# create index with alias
- do:
indices.create:
index: logs
body:
settings:
number_of_replicas: "0"
aliases:
logs_index: {}
logs_search: {}
# index document
- do:
index:
index: logs
type: test
id: "1"
body: { "foo": "hello world" }
- do:
get:
index: logs_search
type: test
id: "1"
- match: { _index: logs }
- match: { _type: test }
- match: { _id: "1" }
- match: { _source: { foo: "hello world" } }
# perform alias rollover
- do:
indices.rollover:
alias: "logs_search"
body:
condition.max_docs: 1
- match: { old_index: logs }
- match: { new_index: logs-1 }
# ensure new index is created
- do:
indices.exists:
index: logs-1
- is_true: ''
# index into new index
- do:
index:
index: logs-1
type: test
id: "2"
body: { "foo": "hello world" }
# check alias points to the new index
- do:
get:
index: logs_search
type: test
id: "2"
- match: { _index: logs-1 }
- match: { _type: test }
- match: { _id: "2" }
- match: { _source: { foo: "hello world" } }