simplify conditions and rollover request

This commit is contained in:
Areek Zillur 2016-06-06 16:10:36 -04:00
parent a30437f250
commit 3a2cc22aff
5 changed files with 153 additions and 207 deletions

View File

@ -19,87 +19,82 @@
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import java.util.Locale;
import java.io.IOException;
import java.util.Set;
public class Condition {
public abstract class Condition<T> implements NamedWriteable {
public static ObjectParser<Set<Condition>, ParseFieldMatcherSupplier> PARSER =
new ObjectParser<>("conditions", null);
static {
PARSER.declareString((conditions, s) ->
conditions.add(new MaxAge(TimeValue.parseTimeValue(s, MaxAge.NAME))), new ParseField(MaxAge.NAME));
PARSER.declareLong((conditions, value) ->
conditions.add(new MaxDocs(value)), new ParseField(MaxDocs.NAME));
}
public enum ConditionType {
MAX_SIZE((byte) 0),
MAX_AGE((byte) 1),
MAX_DOCS((byte) 2);
public static class MaxAge extends Condition<TimeValue> {
public final static String NAME = "max_age";
private final byte id;
ConditionType(byte id) {
this.id = id;
public MaxAge(TimeValue value) {
this.value = value;
}
public byte getId() {
return id;
public MaxAge(StreamInput in) throws IOException {
this.value = TimeValue.timeValueMillis(in.readLong());
}
public static ConditionType fromId(byte id) {
if (id == 0) {
return MAX_SIZE;
} else if (id == 1) {
return MAX_AGE;
} else if (id == 2) {
return MAX_DOCS;
} else {
throw new IllegalArgumentException("no condition type [" + id + "]");
}
@Override
public boolean matches(TimeValue value) {
return this.value.getMillis() <= value.getMillis();
}
public static ConditionType fromString(String type) {
final String typeString = type.toLowerCase(Locale.ROOT);
switch (typeString) {
case "max_size":
return MAX_SIZE;
case "max_age":
return MAX_AGE;
case "max_docs":
return MAX_DOCS;
default:
throw new IllegalArgumentException("no condition type [" + type + "]");
}
@Override
public String getWriteableName() {
return NAME;
}
public static long parseFromString(ConditionType condition, String value) {
switch (condition) {
case MAX_SIZE:
return ByteSizeValue.parseBytesSizeValue(value, MAX_SIZE.name().toLowerCase(Locale.ROOT)).getBytes();
case MAX_AGE:
return TimeValue.parseTimeValue(value, MAX_AGE.name().toLowerCase(Locale.ROOT)).getMillis();
case MAX_DOCS:
try {
return Long.valueOf(value);
} catch (NumberFormatException e) {
throw new ElasticsearchParseException("Failed to parse setting [{}] with value [{}] as long", e,
MAX_DOCS.name().toLowerCase(Locale.ROOT), value);
}
default:
throw new ElasticsearchParseException("condition [" + condition + "] not recognized");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value.getMillis());
}
}
private final ConditionType type;
private final long value;
public static class MaxDocs extends Condition<Long> {
public final static String NAME = "max_docs";
public Condition(ConditionType type, long value) {
this.type = type;
this.value = value;
public MaxDocs(Long value) {
this.value = value;
}
public MaxDocs(StreamInput in) throws IOException {
this.value = in.readLong();
}
@Override
public boolean matches(Long value) {
return this.value <= value;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value);
}
}
public ConditionType getType() {
return type;
}
protected T value;
public long getValue() {
return value;
}
public abstract boolean matches(T value);
}

View File

@ -23,17 +23,21 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -44,14 +48,20 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implements IndicesRequest {
private String sourceAlias;
private String optionalTargetAlias;
private List<Condition> conditions = new ArrayList<>();
private Set<Condition> conditions = new HashSet<>(2);
public static ObjectParser<Set<Condition>, ParseFieldMatcherSupplier> TLP_PARSER =
new ObjectParser<>("conditions", null);
static {
TLP_PARSER.declareField((parser, conditions, parseFieldMatcherSupplier) ->
Condition.PARSER.parse(parser, conditions, () -> ParseFieldMatcher.EMPTY),
new ParseField("conditions"), ObjectParser.ValueType.OBJECT);
}
RolloverRequest() {}
public RolloverRequest(String sourceAlias, String optionalTargetAlias) {
public RolloverRequest(String sourceAlias) {
this.sourceAlias = sourceAlias;
this.optionalTargetAlias = optionalTargetAlias;
}
@Override
@ -67,12 +77,9 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sourceAlias = in.readString();
optionalTargetAlias = in.readOptionalString();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
Condition.ConditionType type = Condition.ConditionType.fromId(in.readByte());
long value = in.readLong();
this.conditions.add(new Condition(type, value));
this.conditions.add(in.readNamedWriteable(Condition.class));
}
}
@ -80,11 +87,9 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sourceAlias);
out.writeOptionalString(optionalTargetAlias);
out.writeVInt(conditions.size());
for (Condition condition : conditions) {
out.writeByte(condition.getType().getId());
out.writeLong(condition.getValue());
out.writeNamedWriteable(condition);
}
}
@ -102,27 +107,15 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
this.sourceAlias = sourceAlias;
}
public void setOptionalTargetAlias(String optionalTargetAlias) {
this.optionalTargetAlias = optionalTargetAlias;
public void addMaxIndexAgeCondition(TimeValue age) {
this.conditions.add(new Condition.MaxAge(age));
}
public void addMaxIndexAgeCondition(String age) {
addCondition(Condition.ConditionType.MAX_AGE, Condition.ConditionType.parseFromString(Condition.ConditionType.MAX_AGE, age));
public void addMaxIndexDocsCondition(long docs) {
this.conditions.add(new Condition.MaxDocs(docs));
}
public void addMaxIndexDocsCondition(String docs) {
addCondition(Condition.ConditionType.MAX_DOCS, Condition.ConditionType.parseFromString(Condition.ConditionType.MAX_DOCS, docs));
}
public void addMaxIndexSizeCondition(String size) {
addCondition(Condition.ConditionType.MAX_SIZE, Condition.ConditionType.parseFromString(Condition.ConditionType.MAX_SIZE, size));
}
private void addCondition(Condition.ConditionType conditionType, long value) {
this.conditions.add(new Condition(conditionType, value));
}
public List<Condition> getConditions() {
public Set<Condition> getConditions() {
return conditions;
}
@ -130,15 +123,11 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
return sourceAlias;
}
public String getOptionalTargetAlias() {
return optionalTargetAlias;
}
public void source(BytesReference source) {
XContentType xContentType = XContentFactory.xContentType(source);
if (xContentType != null) {
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(source)) {
source(parser.map());
TLP_PARSER.parse(parser, this.conditions, () -> ParseFieldMatcher.EMPTY);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse source for rollover index", e);
}
@ -146,19 +135,4 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
throw new ElasticsearchParseException("failed to parse content type for rollover index source");
}
}
private void source(Map<String, Object> map) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (entry.getKey().equals("conditions")) {
final Map<String, String> conditions = (Map<String, String>) entry.getValue();
for (Map.Entry<String, String> conditionEntry : conditions.entrySet()) {
final Condition.ConditionType conditionType = Condition.ConditionType.fromString(conditionEntry.getKey());
this.addCondition(conditionType, Condition.ConditionType.parseFromString(conditionType,
conditionEntry.getValue()));
}
} else {
throw new ElasticsearchParseException("unknown property [" + entry.getKey() + "]");
}
}
}
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
/**
@ -36,22 +37,12 @@ public class RolloverRequestBuilder extends MasterNodeOperationRequestBuilder<Ro
return this;
}
public RolloverRequestBuilder setOptionalTargetAlias(String optionalTargetAlias) {
this.request.setOptionalTargetAlias(optionalTargetAlias);
return this;
}
public RolloverRequestBuilder addMaxIndexSizeCondition(String size) {
this.request.addMaxIndexSizeCondition(size);
return this;
}
public RolloverRequestBuilder addMaxIndexAgeCondition(String age) {
public RolloverRequestBuilder addMaxIndexAgeCondition(TimeValue age) {
this.request.addMaxIndexAgeCondition(age);
return this;
}
public RolloverRequestBuilder addMaxIndexDocsCondition(String docs) {
public RolloverRequestBuilder addMaxIndexDocsCondition(long docs) {
this.request.addMaxIndexDocsCondition(docs);
return this;
}

View File

@ -41,13 +41,13 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Set;
/**
* Main class to swap the index pointed to by an alias, given some predicates
@ -97,56 +97,55 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
final AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(rolloverRequest.getSourceAlias());
final IndexMetaData indexMetaData = aliasOrIndex.getIndices().get(0);
final String sourceIndexName = indexMetaData.getIndex().getName();
client.admin().indices().prepareStats(sourceIndexName).clear().setDocs(true).setStore(true)
.execute(new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
final IndexMetaData sourceIndex = metaData.index(sourceIndexName);
DocsStats docsStats = indicesStatsResponse.getTotal().getDocs();
long docCount = docsStats == null ? 0 : docsStats.getCount();
StoreStats storeStats = indicesStatsResponse.getTotal().getStore();
long sizeInBytes = storeStats == null ? 0 : storeStats.getSizeInBytes();
long creationDate = sourceIndex.getCreationDate();
if (satisfiesConditions(rolloverRequest.getConditions(), docCount, sizeInBytes, creationDate)) {
final String rolloverIndexName = generateRolloverIndexName(sourceIndexName);
boolean createRolloverIndex = metaData.index(rolloverIndexName) == null;
if (createRolloverIndex) {
CreateIndexClusterStateUpdateRequest updateRequest =
prepareCreateIndexRequest(rolloverIndexName, rolloverRequest);
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
indexAliasesService.indicesAliases(
prepareIndicesAliasesRequest(sourceIndexName, rolloverIndexName, rolloverRequest),
new IndicesAliasesListener(sourceIndexName, rolloverIndexName, listener));
}
@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create rollover index", t, updateRequest.index());
} else {
logger.debug("[{}] failed to create rollover index", t, updateRequest.index());
client.admin().indices().prepareStats(sourceIndexName).clear().setDocs(true).execute(
new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
final IndexMetaData sourceIndex = metaData.index(sourceIndexName);
DocsStats docsStats = indicesStatsResponse.getTotal().getDocs();
long docCount = docsStats == null ? 0 : docsStats.getCount();
long indexAge = System.currentTimeMillis() - sourceIndex.getCreationDate();
if (satisfiesConditions(rolloverRequest.getConditions(), docCount, indexAge)) {
final String rolloverIndexName = generateRolloverIndexName(sourceIndexName);
boolean createRolloverIndex = metaData.index(rolloverIndexName) == null;
if (createRolloverIndex) {
CreateIndexClusterStateUpdateRequest updateRequest =
prepareCreateIndexRequest(rolloverIndexName, rolloverRequest);
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
indexAliasesService.indicesAliases(
prepareIndicesAliasesRequest(sourceIndexName, rolloverIndexName, rolloverRequest),
new IndicesAliasesListener(sourceIndexName, rolloverIndexName, listener));
}
listener.onFailure(t);
}
});
@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create rollover index", t, updateRequest.index());
} else {
logger.debug("[{}] failed to create rollover index", t, updateRequest.index());
}
listener.onFailure(t);
}
});
} else {
indexAliasesService.indicesAliases(
prepareIndicesAliasesRequest(sourceIndexName, rolloverIndexName, rolloverRequest),
new IndicesAliasesListener(sourceIndexName, rolloverIndexName, listener));
}
} else {
indexAliasesService.indicesAliases(
prepareIndicesAliasesRequest(sourceIndexName, rolloverIndexName, rolloverRequest),
new IndicesAliasesListener(sourceIndexName, rolloverIndexName, listener));
// conditions not met
listener.onResponse(new RolloverResponse(sourceIndexName, sourceIndexName));
}
} else {
// conditions not met
listener.onResponse(new RolloverResponse(sourceIndexName, sourceIndexName));
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
);
}
@ -178,17 +177,9 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest()
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
final AliasAction[] actions;
if (request.getOptionalTargetAlias() != null) {
actions = new AliasAction[3];
actions[0] = new AliasAction(AliasAction.Type.ADD, targetIndex, request.getSourceAlias());
actions[1] = new AliasAction(AliasAction.Type.ADD, targetIndex, request.getOptionalTargetAlias());
actions[2] = new AliasAction(AliasAction.Type.REMOVE, concreteSourceIndex, request.getSourceAlias());
} else {
actions = new AliasAction[2];
actions[0] = new AliasAction(AliasAction.Type.ADD, targetIndex, request.getSourceAlias());
actions[1] = new AliasAction(AliasAction.Type.REMOVE, concreteSourceIndex, request.getSourceAlias());
}
AliasAction[] actions = new AliasAction[2];
actions[0] = new AliasAction(AliasAction.Type.ADD, targetIndex, request.getSourceAlias());
actions[1] = new AliasAction(AliasAction.Type.REMOVE, concreteSourceIndex, request.getSourceAlias());
updateRequest.actions(actions);
return updateRequest;
}
@ -208,26 +199,21 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
return String.join("-", indexPrefix, String.valueOf(counter));
}
static boolean satisfiesConditions(List<Condition> requestConditions, long docCount, long sizeInBytes,
long creationDate) {
for (Condition condition: requestConditions) {
switch (condition.getType()) {
case MAX_SIZE:
if (sizeInBytes < condition.getValue()) {
return false;
}
break;
case MAX_AGE:
long currentAge = System.currentTimeMillis() - creationDate;
if (currentAge < condition.getValue()) {
return false;
}
break;
case MAX_DOCS:
if (docCount < condition.getValue()) {
return false;
}
break;
static boolean satisfiesConditions(Set<Condition> conditions, long docCount, long indexAge) {
for (Condition condition: conditions) {
if (condition instanceof Condition.MaxAge) {
Condition.MaxAge maxAge = (Condition.MaxAge) condition;
final TimeValue age = TimeValue.timeValueMillis(indexAge);
if (maxAge.matches(age) == false) {
return false;
}
} else if (condition instanceof Condition.MaxDocs) {
final Condition.MaxDocs maxDocs = (Condition.MaxDocs) condition;
if (maxDocs.matches(docCount) == false) {
return false;
}
} else {
throw new IllegalArgumentException("unknown condition [" + condition.getClass().getSimpleName() + "]");
}
}
return true;

View File

@ -43,8 +43,8 @@ public class RestRolloverIndexAction extends BaseRestHandler {
@Inject
public RestRolloverIndexAction(Settings settings, RestController controller, Client client) {
super(settings, client);
controller.registerHandler(RestRequest.Method.PUT, "/{alias}/_rollover/{optional_alias}", this);
controller.registerHandler(RestRequest.Method.POST, "/{alias}/_rollover/{optional_alias}", this);
controller.registerHandler(RestRequest.Method.PUT, "/{alias}/_rollover", this);
controller.registerHandler(RestRequest.Method.POST, "/{alias}/_rollover", this);
}
@Override
@ -52,7 +52,7 @@ public class RestRolloverIndexAction extends BaseRestHandler {
if (request.param("alias") == null) {
throw new IllegalArgumentException("no source alias");
}
RolloverRequest rolloverIndexRequest = new RolloverRequest(request.param("alias"), request.param("optional_alias"));
RolloverRequest rolloverIndexRequest = new RolloverRequest(request.param("alias"));
if (request.hasContent()) {
rolloverIndexRequest.source(request.content());
}