incorporate feedback

This commit is contained in:
Areek Zillur 2016-06-07 22:38:47 -04:00
parent 76c59b7417
commit 134a4e5e52
10 changed files with 252 additions and 135 deletions

View File

@ -22,12 +22,9 @@ package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import java.io.IOException;
import java.util.Set;
public abstract class Condition<T> implements NamedWriteable {
@ -35,67 +32,9 @@ public abstract class Condition<T> implements NamedWriteable {
new ObjectParser<>("conditions", null);
static {
PARSER.declareString((conditions, s) ->
conditions.add(new MaxAge(TimeValue.parseTimeValue(s, MaxAge.NAME))), new ParseField(MaxAge.NAME));
conditions.add(new MaxAgeCondition(TimeValue.parseTimeValue(s, MaxAgeCondition.NAME))), new ParseField(MaxAgeCondition.NAME));
PARSER.declareLong((conditions, value) ->
conditions.add(new MaxDocs(value)), new ParseField(MaxDocs.NAME));
}
public static class MaxAge extends Condition<TimeValue> {
public final static String NAME = "max_age";
public MaxAge(TimeValue value) {
super(NAME);
this.value = value;
}
public MaxAge(StreamInput in) throws IOException {
super(NAME);
this.value = TimeValue.timeValueMillis(in.readLong());
}
@Override
public boolean matches(TimeValue value) {
return this.value.getMillis() <= value.getMillis();
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value.getMillis());
}
}
public static class MaxDocs extends Condition<Long> {
public final static String NAME = "max_docs";
public MaxDocs(Long value) {
super(NAME);
this.value = value;
}
public MaxDocs(StreamInput in) throws IOException {
super(NAME);
this.value = in.readLong();
}
@Override
public boolean matches(Long value) {
return this.value <= value;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value);
}
conditions.add(new MaxDocsCondition(value)), new ParseField(MaxDocsCondition.NAME));
}
protected T value;
@ -105,10 +44,30 @@ public abstract class Condition<T> implements NamedWriteable {
this.name = name;
}
public abstract boolean matches(T value);
public abstract Result evaluate(final Stats stats);
@Override
public final String toString() {
return "[" + name + ": " + value + "]";
}
public static class Stats {
public final long numDocs;
public final long indexCreated;
public Stats(long numDocs, long indexCreated) {
this.numDocs = numDocs;
this.indexCreated = indexCreated;
}
}
public static class Result {
public final Condition condition;
public final boolean matched;
protected Result(Condition condition, boolean matched) {
this.condition = condition;
this.matched = matched;
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
public class MaxAgeCondition extends Condition<TimeValue> {
public final static String NAME = "max_age";
public MaxAgeCondition(TimeValue value) {
super(NAME);
this.value = value;
}
public MaxAgeCondition(StreamInput in) throws IOException {
super(NAME);
this.value = TimeValue.timeValueMillis(in.readLong());
}
@Override
public Result evaluate(final Stats stats) {
long indexAge = System.currentTimeMillis() - stats.indexCreated;
return new Result(this, this.value.getMillis() <= indexAge);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value.getMillis());
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class MaxDocsCondition extends Condition<Long> {
public final static String NAME = "max_docs";
public MaxDocsCondition(Long value) {
super(NAME);
this.value = value;
}
public MaxDocsCondition(StreamInput in) throws IOException {
super(NAME);
this.value = in.readLong();
}
@Override
public Result evaluate(final Stats stats) {
return new Result(this, this.value <= stats.numDocs);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value);
}
}

View File

@ -115,11 +115,11 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
}
public void addMaxIndexAgeCondition(TimeValue age) {
this.conditions.add(new Condition.MaxAge(age));
this.conditions.add(new MaxAgeCondition(age));
}
public void addMaxIndexDocsCondition(long docs) {
this.conditions.add(new Condition.MaxDocs(docs));
this.conditions.add(new MaxDocsCondition(docs));
}
public boolean isSimulate() {

View File

@ -44,14 +44,16 @@ public final class RolloverResponse extends ActionResponse implements ToXContent
RolloverResponse() {
}
RolloverResponse(String oldIndex, String newIndex, Set<Map.Entry<String, Boolean>> conditionStatus,
RolloverResponse(String oldIndex, String newIndex, Set<Condition.Result> conditionResults,
boolean simulate, boolean rolledOver, boolean rolloverIndexCreated) {
this.oldIndex = oldIndex;
this.newIndex = newIndex;
this.simulate = simulate;
this.rolledOver = rolledOver;
this.rolloverIndexCreated = rolloverIndexCreated;
this.conditionStatus = conditionStatus;
this.conditionStatus = conditionResults.stream()
.map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched))
.collect(Collectors.toSet());
}
public String getOldIndex() {

View File

@ -41,16 +41,12 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@ -104,27 +100,20 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
client.admin().indices().prepareStats(sourceIndexName).clear().setDocs(true).execute(
new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
final IndexMetaData sourceIndex = metaData.index(sourceIndexName);
DocsStats docsStats = indicesStatsResponse.getTotal().getDocs();
long docCount = docsStats == null ? 0 : docsStats.getCount();
long indexAge = System.currentTimeMillis() - sourceIndex.getCreationDate();
final Set<Map.Entry<Condition, Boolean>> evaluatedConditions =
evaluateConditions(rolloverRequest.getConditions(), docCount, indexAge);
final Set<Map.Entry<String, Boolean>> conditionStatus = evaluatedConditions.stream()
.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey().toString(), entry.getValue()))
.collect(Collectors.toSet());
public void onResponse(IndicesStatsResponse statsResponse) {
final Set<Condition.Result> conditionResults = evaluateConditions(rolloverRequest.getConditions(),
statsResponse.getTotal().getDocs(), metaData.index(sourceIndexName));
final String rolloverIndexName = generateRolloverIndexName(sourceIndexName);
final boolean createRolloverIndex = metaData.index(rolloverIndexName) == null;
if (rolloverRequest.isSimulate()) {
listener.onResponse(
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionStatus, true, false,
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false,
false));
return;
}
if (conditionStatus.stream().allMatch(Map.Entry::getValue)) {
if (conditionResults.stream().anyMatch(result -> result.matched)) {
boolean createRolloverIndex = metaData.index(rolloverIndexName) == null;
final RolloverResponse rolloverResponse =
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionStatus, false, true,
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, false, true,
createRolloverIndex);
if (createRolloverIndex) {
CreateIndexClusterStateUpdateRequest updateRequest =
@ -139,11 +128,6 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create rollover index", t, updateRequest.index());
} else {
logger.debug("[{}] failed to create rollover index", t, updateRequest.index());
}
listener.onFailure(t);
}
});
@ -155,7 +139,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
} else {
// conditions not met
listener.onResponse(
new RolloverResponse(sourceIndexName, sourceIndexName, conditionStatus, false, false,
new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false,
false)
);
}
@ -167,9 +151,10 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
}
}
);
}
private static final class IndicesAliasesListener implements ActionListener<ClusterStateUpdateResponse> {
private final ActionListener<RolloverResponse> listener;
@ -203,36 +188,26 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
return updateRequest;
}
static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-(\\d)+$");
static String generateRolloverIndexName(String sourceIndexName) {
if (INDEX_NAME_PATTERN.matcher(sourceIndexName).matches()) {
int numberIndex = sourceIndexName.lastIndexOf("-");
int counter = 1;
String indexPrefix = sourceIndexName;
if (numberIndex != -1) {
try {
counter = Integer.parseInt(sourceIndexName.substring(numberIndex + 1));
counter++;
indexPrefix = sourceIndexName.substring(0, numberIndex);
} catch (NumberFormatException ignored) {
assert numberIndex != -1 : "no separator '-' found";
int counter = Integer.parseInt(sourceIndexName.substring(numberIndex + 1));
return String.join("-", sourceIndexName.substring(0, numberIndex), String.valueOf(++counter));
} else {
throw new IllegalArgumentException("index name [" + sourceIndexName + "] does not match pattern '^.*-(\\d)+$'");
}
}
return String.join("-", indexPrefix, String.valueOf(counter));
}
static Set<Map.Entry<Condition, Boolean>> evaluateConditions(Set<Condition> conditions, long docCount, long indexAge) {
Set<Map.Entry<Condition, Boolean>> result = new HashSet<>(conditions.size());
for (Condition condition: conditions) {
if (condition instanceof Condition.MaxAge) {
Condition.MaxAge maxAge = (Condition.MaxAge) condition;
final TimeValue age = TimeValue.timeValueMillis(indexAge);
result.add(new AbstractMap.SimpleEntry<>(condition, maxAge.matches(age)));
} else if (condition instanceof Condition.MaxDocs) {
final Condition.MaxDocs maxDocs = (Condition.MaxDocs) condition;
result.add(new AbstractMap.SimpleEntry<>(condition, maxDocs.matches(docCount)));
} else {
throw new IllegalArgumentException("unknown condition [" + condition.getClass().getSimpleName() + "]");
}
}
return result;
static Set<Condition.Result> evaluateConditions(final Set<Condition> conditions,
final DocsStats docsStats, final IndexMetaData metaData) {
final long numDocs = docsStats == null ? 0 : docsStats.getCount();
final Condition.Stats stats = new Condition.Stats(numDocs, metaData.getCreationDate());
return conditions.stream()
.map(condition -> condition.evaluate(stats))
.collect(Collectors.toSet());
}
static void validate(MetaData metaData, RolloverRequest request) {

View File

@ -20,6 +20,8 @@
package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.common.geo.ShapesAvailability;
@ -86,8 +88,8 @@ public class IndicesModule extends AbstractModule {
}
private void registerBuildInWritables() {
namedWritableRegistry.register(Condition.class, Condition.MaxAge.NAME, Condition.MaxAge::new);
namedWritableRegistry.register(Condition.class, Condition.MaxDocs.NAME, Condition.MaxDocs::new);
namedWritableRegistry.register(Condition.class, MaxAgeCondition.NAME, MaxAgeCondition::new);
namedWritableRegistry.register(Condition.class, MaxDocsCondition.NAME, MaxDocsCondition::new);
}
private void registerBuiltInMappers() {

View File

@ -27,15 +27,30 @@ import static org.hamcrest.Matchers.equalTo;
public class ConditionTests extends ESTestCase {
public void testMaxAge() throws Exception {
final Condition.MaxAge maxAge = new Condition.MaxAge(TimeValue.timeValueMillis(10));
assertThat(maxAge.matches(TimeValue.timeValueMillis(randomIntBetween(0, 9))), equalTo(false));
assertThat(maxAge.matches(TimeValue.timeValueMillis(randomIntBetween(10, 100))), equalTo(true));
final MaxAgeCondition maxAgeCondition = new MaxAgeCondition(TimeValue.timeValueHours(1));
long indexCreatedMatch = System.currentTimeMillis() - TimeValue.timeValueMinutes(61).getMillis();
Condition.Result evaluate = maxAgeCondition.evaluate(new Condition.Stats(0, indexCreatedMatch));
assertThat(evaluate.condition, equalTo(maxAgeCondition));
assertThat(evaluate.matched, equalTo(true));
long indexCreatedNotMatch = System.currentTimeMillis() - TimeValue.timeValueMinutes(59).getMillis();
evaluate = maxAgeCondition.evaluate(new Condition.Stats(0, indexCreatedNotMatch));
assertThat(evaluate.condition, equalTo(maxAgeCondition));
assertThat(evaluate.matched, equalTo(false));
}
public void testMaxDocs() throws Exception {
final Condition.MaxDocs maxDocs = new Condition.MaxDocs(10L);
assertThat(maxDocs.matches((long) randomIntBetween(0, 9)), equalTo(false));
assertThat(maxDocs.matches((long) randomIntBetween(10, 100)), equalTo(true));
final MaxDocsCondition maxDocsCondition = new MaxDocsCondition(100L);
long maxDocsMatch = randomIntBetween(100, 1000);
Condition.Result evaluate = maxDocsCondition.evaluate(new Condition.Stats(maxDocsMatch, 0));
assertThat(evaluate.condition, equalTo(maxDocsCondition));
assertThat(evaluate.matched, equalTo(true));
long maxDocsNotMatch = randomIntBetween(0, 99);
evaluate = maxDocsCondition.evaluate(new Condition.Stats(0, maxDocsNotMatch));
assertThat(evaluate.condition, equalTo(maxDocsCondition));
assertThat(evaluate.matched, equalTo(false));
}
}

View File

@ -98,7 +98,7 @@ public class RolloverIT extends ESIntegTestCase {
assertThat(response.isRolloverIndexCreated(), equalTo(false));
assertThat(response.getConditionStatus().size(), equalTo(1));
final Map.Entry<String, Boolean> conditionEntry = response.getConditionStatus().iterator().next();
assertThat(conditionEntry.getKey(), equalTo(new Condition.MaxAge(TimeValue.timeValueHours(4)).toString()));
assertThat(conditionEntry.getKey(), equalTo(new MaxAgeCondition(TimeValue.timeValueHours(4)).toString()));
assertThat(conditionEntry.getValue(), equalTo(false));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index");

View File

@ -27,13 +27,69 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction.evaluateConditions;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class TransportRolloverActionTests extends ESTestCase {
public void testEvaluateConditions() throws Exception {
MaxDocsCondition maxDocsCondition = new MaxDocsCondition(100L);
MaxAgeCondition maxAgeCondition = new MaxAgeCondition(TimeValue.timeValueHours(2));
long matchMaxDocs = randomIntBetween(100, 1000);
long notMatchMaxDocs = randomIntBetween(0, 99);
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
final IndexMetaData metaData = IndexMetaData.builder(randomAsciiOfLength(10))
.creationDate(System.currentTimeMillis() - TimeValue.timeValueHours(3).getMillis())
.settings(settings)
.build();
final HashSet<Condition> conditions = Sets.newHashSet(maxDocsCondition, maxAgeCondition);
Set<Condition.Result> results = evaluateConditions(conditions, new DocsStats(matchMaxDocs, 0L), metaData);
assertThat(results.size(), equalTo(2));
for (Condition.Result result : results) {
assertThat(result.matched, equalTo(true));
}
results = evaluateConditions(conditions, new DocsStats(notMatchMaxDocs, 0), metaData);
assertThat(results.size(), equalTo(2));
for (Condition.Result result : results) {
if (result.condition instanceof MaxAgeCondition) {
assertThat(result.matched, equalTo(true));
} else if (result.condition instanceof MaxDocsCondition) {
assertThat(result.matched, equalTo(false));
} else {
fail("unknown condition result found " + result.condition);
}
}
results = evaluateConditions(conditions, null, metaData);
assertThat(results.size(), equalTo(2));
for (Condition.Result result : results) {
if (result.condition instanceof MaxAgeCondition) {
assertThat(result.matched, equalTo(true));
} else if (result.condition instanceof MaxDocsCondition) {
assertThat(result.matched, equalTo(false));
} else {
fail("unknown condition result found " + result.condition);
}
}
}
public void testCreateUpdateAliasRequest() throws Exception {
String sourceAlias = randomAsciiOfLength(10);
String sourceIndex = randomAsciiOfLength(10);
@ -94,17 +150,15 @@ public class TransportRolloverActionTests extends ESTestCase {
}
public void testGenerateRolloverIndexName() throws Exception {
String indexNotEndingInNumbers = randomAsciiOfLength(10) + "A";
assertThat(TransportRolloverAction.generateRolloverIndexName(indexNotEndingInNumbers),
not(equalTo(indexNotEndingInNumbers)));
assertThat(TransportRolloverAction.generateRolloverIndexName(indexNotEndingInNumbers),
equalTo(indexNotEndingInNumbers + "-1"));
String invalidIndexName = randomAsciiOfLength(10) + "A";
expectThrows(IllegalArgumentException.class, () ->
TransportRolloverAction.generateRolloverIndexName(invalidIndexName));
int num = randomIntBetween(0, 100);
final String indexPrefix = randomAsciiOfLength(10);
String indexEndingInNumbers = indexPrefix + "-" + num;
assertThat(TransportRolloverAction.generateRolloverIndexName(indexEndingInNumbers),
equalTo(indexPrefix + "-" + (num + 1)));
assertThat(TransportRolloverAction.generateRolloverIndexName("index-name-1"), equalTo("index-name-2"));
assertThat(TransportRolloverAction.generateRolloverIndexName("index-name"), equalTo("index-name-1"));
assertThat(TransportRolloverAction.generateRolloverIndexName("index-name-2"), equalTo("index-name-3"));
}
}