Adds step info to ClusterStateWaitSteps (#30609)

The `ClusterStateWaitStep.isConditionMet()` method now returns a
`Result` object which contains a boolean for if the condition is met
and an `ToXContentObject` to provide information in the case where the
condition is not met.
If the condition is not met, the step information is stored in the
cluster state
This commit is contained in:
Colin Goodheart-Smithe 2018-05-16 08:15:23 +01:00 committed by GitHub
parent 8c12a59c5e
commit 79be4d2976
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 781 additions and 58 deletions

View File

@ -17,13 +17,19 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
@ -47,11 +53,11 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
public Result isConditionMet(Index index, ClusterState clusterState) {
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
getKey().getAction(), index.getName());
return false;
return new Result(false, new Info(-1, false));
}
IndexMetaData idxMeta = clusterState.metaData().index(index);
if (idxMeta == null) {
@ -88,10 +94,10 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
logger.debug(
"[{}] lifecycle action for index [{}] waiting for [{}] shards " + "to be allocated to nodes matching the given filters",
getKey().getAction(), index, allocationPendingAllShards);
return false;
return new Result(false, new Info(allocationPendingAllShards, true));
} else {
logger.debug("[{}] lifecycle action for index [{}] complete", getKey().getAction(), index);
return true;
return new Result(true, null);
}
}
@ -112,4 +118,74 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
return super.equals(obj) &&
Objects.equals(waitOnAllShardCopies, other.waitOnAllShardCopies);
}
public static final class Info implements ToXContentObject {
private final long numberShardsLeftToAllocate;
private final boolean allShardsActive;
private final String message;
static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate");
static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active");
static final ParseField MESSAGE = new ParseField("message");
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("allocation_routed_step_info",
a -> new Info((long) a[0], (boolean) a[1]));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_ALLOCATE);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE);
PARSER.declareString((i, s) -> {}, MESSAGE);
}
public Info(long numberShardsLeftToMerge, boolean allShardsActive) {
this.numberShardsLeftToAllocate = numberShardsLeftToMerge;
this.allShardsActive = allShardsActive;
if (allShardsActive == false) {
message = "Waiting for all shard copies to be active";
} else {
message = "Waiting for [" + numberShardsLeftToAllocate + "] shards "
+ "to be allocated to nodes matching the given filters";
}
}
public long getNumberShardsLeftToAllocate() {
return numberShardsLeftToAllocate;
}
public boolean allShardsActive() {
return allShardsActive;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(), message);
builder.field(SHARDS_TO_ALLOCATE.getPreferredName(), numberShardsLeftToAllocate);
builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(numberShardsLeftToAllocate, allShardsActive);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Info other = (Info) obj;
return Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
Objects.equals(allShardsActive, other.allShardsActive);
}
@Override
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;
public abstract class ClusterStateWaitStep extends Step {
@ -14,6 +15,24 @@ public abstract class ClusterStateWaitStep extends Step {
super(key, nextStepKey);
}
public abstract boolean isConditionMet(Index index, ClusterState clusterState);
public abstract Result isConditionMet(Index index, ClusterState clusterState);
public static class Result {
private final boolean complete;
private final ToXContentObject infomationContext;
public Result(boolean complete, ToXContentObject infomationContext) {
this.complete = complete;
this.infomationContext = infomationContext;
}
public boolean isComplete() {
return complete;
}
public ToXContentObject getInfomationContext() {
return infomationContext;
}
}
}

View File

@ -24,11 +24,11 @@ public class PhaseAfterStep extends ClusterStateWaitStep {
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetaData indexMetaData = clusterState.metaData().index(index);
long lifecycleDate = indexMetaData.getSettings()
.getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L);
return nowSupplier.getAsLong() >= lifecycleDate + after.getMillis();
return new Result(nowSupplier.getAsLong() >= lifecycleDate + after.getMillis(), null);
}
TimeValue getAfter() {

View File

@ -8,9 +8,15 @@ package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import java.io.IOException;
import java.util.Objects;
public class ReplicasAllocatedStep extends ClusterStateWaitStep {
@ -27,14 +33,20 @@ public class ReplicasAllocatedStep extends ClusterStateWaitStep {
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetaData idxMeta = clusterState.metaData().index(index);
if (idxMeta == null) {
throw new IndexNotFoundException("Index not found when executing " + getKey().getAction() + " lifecycle action.",
index.getName());
}
// We only want to make progress if the cluster state reflects the number of replicas change and all shards are active
return idxMeta.getNumberOfReplicas() == numberReplicas && ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName());
boolean allShardsActive = ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName());
boolean isConditionMet = idxMeta.getNumberOfReplicas() == numberReplicas && allShardsActive;
if (isConditionMet) {
return new Result(true, null);
} else {
return new Result(false, new Info(numberReplicas, idxMeta.getNumberOfReplicas(), allShardsActive));
}
}
@Override
@ -54,4 +66,85 @@ public class ReplicasAllocatedStep extends ClusterStateWaitStep {
return super.equals(obj) &&
Objects.equals(numberReplicas, other.numberReplicas);
}
public static final class Info implements ToXContentObject {
private final long expectedReplicas;
private final long actualReplicas;
private final boolean allShardsActive;
private final String message;
static final ParseField EXPECTED_REPLICAS = new ParseField("expected_replicas");
static final ParseField ACTUAL_REPLICAS = new ParseField("actual_replicas");
static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active");
static final ParseField MESSAGE = new ParseField("message");
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("replicas_allocated_step_info",
a -> new Info((long) a[0], (long) a[1], (boolean) a[2]));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), EXPECTED_REPLICAS);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), ACTUAL_REPLICAS);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE);
PARSER.declareString((i, s) -> {}, MESSAGE);
}
public Info(long expectedReplicas, long actualReplicas, boolean allShardsActive) {
this.expectedReplicas = expectedReplicas;
this.actualReplicas = actualReplicas;
this.allShardsActive = allShardsActive;
if (actualReplicas != expectedReplicas) {
message = "Waiting for " + IndexMetaData.SETTING_NUMBER_OF_REPLICAS + " to be updated to " + expectedReplicas;
} else if (allShardsActive == false) {
message = "Waiting for all shard copies to be active";
} else {
message = "";
}
}
public long getExpectedReplicas() {
return expectedReplicas;
}
public long getActualReplicas() {
return actualReplicas;
}
public boolean allShardsActive() {
return allShardsActive;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(), message);
builder.field(EXPECTED_REPLICAS.getPreferredName(), expectedReplicas);
builder.field(ACTUAL_REPLICAS.getPreferredName(), actualReplicas);
builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(expectedReplicas, actualReplicas, allShardsActive);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Info other = (Info) obj;
return Objects.equals(expectedReplicas, other.expectedReplicas) &&
Objects.equals(actualReplicas, other.actualReplicas) &&
Objects.equals(allShardsActive, other.allShardsActive);
}
@Override
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -90,10 +90,12 @@ public class SegmentCountStep extends AsyncWaitStep {
private final long numberShardsLeftToMerge;
static final ParseField SHARDS_TO_MERGE = new ParseField("shards_left_to_merge");
static final ParseField MESSAGE = new ParseField("message");
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("segment_count_step_info",
a -> new Info((long) a[0]));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_MERGE);
PARSER.declareString((i, s) -> {}, MESSAGE);
}
public Info(long numberShardsLeftToMerge) {
@ -107,6 +109,8 @@ public class SegmentCountStep extends AsyncWaitStep {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(),
"Waiting for [" + numberShardsLeftToMerge + "] shards " + "to forcemerge");
builder.field(SHARDS_TO_MERGE.getPreferredName(), numberShardsLeftToMerge);
builder.endObject();
return builder;

View File

@ -7,12 +7,18 @@ package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.Objects;
public class ShrunkShardsAllocatedStep extends ClusterStateWaitStep {
public static final String NAME = "enough-shards-allocated";
public static final String NAME = "shrunk-shards-allocated";
private final int numberOfShards;
private String shrunkIndexPrefix;
@ -31,12 +37,21 @@ public class ShrunkShardsAllocatedStep extends ClusterStateWaitStep {
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
public Result isConditionMet(Index index, ClusterState clusterState) {
// We only want to make progress if all shards of the shrunk index are
// active
return clusterState.metaData().index(shrunkIndexPrefix + index.getName()) != null
&& clusterState.metaData().index(shrunkIndexPrefix + index.getName()).getNumberOfShards() == numberOfShards
&& ActiveShardCount.ALL.enoughShardsActive(clusterState, shrunkIndexPrefix + index.getName());
boolean indexExists = clusterState.metaData().index(shrunkIndexPrefix + index.getName()) != null;
if (indexExists == false) {
return new Result(false, new Info(false, -1, -1, false));
}
boolean allShardsActive = ActiveShardCount.ALL.enoughShardsActive(clusterState, shrunkIndexPrefix + index.getName());
int numShrunkIndexShards = clusterState.metaData().index(shrunkIndexPrefix + index.getName()).getNumberOfShards();
boolean isConditionMet = numShrunkIndexShards == numberOfShards && allShardsActive;
if (isConditionMet) {
return new Result(true, null);
} else {
return new Result(false, new Info(true, numberOfShards, numShrunkIndexShards, allShardsActive));
}
}
@Override
@ -57,4 +72,97 @@ public class ShrunkShardsAllocatedStep extends ClusterStateWaitStep {
Objects.equals(numberOfShards, other.numberOfShards) &&
Objects.equals(shrunkIndexPrefix, other.shrunkIndexPrefix);
}
public static final class Info implements ToXContentObject {
private final int expectedShards;
private final int actualShards;
private final boolean shrunkIndexExists;
private final boolean allShardsActive;
private final String message;
static final ParseField EXPECTED_SHARDS = new ParseField("expected_shards");
static final ParseField ACTUAL_SHARDS = new ParseField("actual_shards");
static final ParseField SHRUNK_INDEX_EXISTS = new ParseField("shrunk_index_exists");
static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active");
static final ParseField MESSAGE = new ParseField("message");
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("shrunk_shards_allocated_step_info",
a -> new Info((boolean) a[0], (int) a[1], (int) a[2], (boolean) a[3]));
static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), SHRUNK_INDEX_EXISTS);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), EXPECTED_SHARDS);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), ACTUAL_SHARDS);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE);
PARSER.declareString((i, s) -> {}, MESSAGE);
}
public Info(boolean shrunkIndexExists, int expectedShards, int actualShards, boolean allShardsActive) {
this.expectedShards = expectedShards;
this.actualShards = actualShards;
this.shrunkIndexExists = shrunkIndexExists;
this.allShardsActive = allShardsActive;
if (shrunkIndexExists == false) {
message = "Waiting for shrunk index to be created";
} else if (actualShards != expectedShards) {
message = "Waiting for shrunk index shards to be " + expectedShards;
} else if (allShardsActive == false) {
message = "Waiting for all shard copies to be active";
} else {
message = "";
}
}
public int getExpectedShards() {
return expectedShards;
}
public int getActualShards() {
return actualShards;
}
public boolean shrunkIndexExists() {
return shrunkIndexExists;
}
public boolean allShardsActive() {
return allShardsActive;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(), message);
builder.field(SHRUNK_INDEX_EXISTS.getPreferredName(), shrunkIndexExists);
builder.field(EXPECTED_SHARDS.getPreferredName(), expectedShards);
builder.field(ACTUAL_SHARDS.getPreferredName(), actualShards);
builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(shrunkIndexExists, expectedShards, actualShards, allShardsActive);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Info other = (Info) obj;
return Objects.equals(shrunkIndexExists, other.shrunkIndexExists) &&
Objects.equals(expectedShards, other.expectedShards) &&
Objects.equals(actualShards, other.actualShards) &&
Objects.equals(allShardsActive, other.allShardsActive);
}
@Override
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -7,9 +7,14 @@ package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.Objects;
public class ShrunkenIndexCheckStep extends ClusterStateWaitStep {
@ -26,14 +31,19 @@ public class ShrunkenIndexCheckStep extends ClusterStateWaitStep {
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
public Result isConditionMet(Index index, ClusterState clusterState) {
String shrunkenIndexSource = IndexMetaData.INDEX_SHRINK_SOURCE_NAME.get(
clusterState.metaData().index(index).getSettings());
if (Strings.isNullOrEmpty(shrunkenIndexSource)) {
throw new IllegalStateException("step[" + NAME + "] is checking an un-shrunken index[" + index.getName() + "]");
}
return index.getName().equals(shrunkIndexPrefix + shrunkenIndexSource) &&
boolean isConditionMet = index.getName().equals(shrunkIndexPrefix + shrunkenIndexSource) &&
clusterState.metaData().index(shrunkenIndexSource) == null;
if (isConditionMet) {
return new Result(true, null);
} else {
return new Result(false, new Info(shrunkenIndexSource));
}
}
@Override
@ -53,4 +63,59 @@ public class ShrunkenIndexCheckStep extends ClusterStateWaitStep {
return super.equals(obj) &&
Objects.equals(shrunkIndexPrefix, other.shrunkIndexPrefix);
}
public static final class Info implements ToXContentObject {
private final String originalIndexName;
private final String message;
static final ParseField ORIGINAL_INDEX_NAME = new ParseField("original_index_name");
static final ParseField MESSAGE = new ParseField("message");
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("shrunken_index_check_step_info",
a -> new Info((String) a[0]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ORIGINAL_INDEX_NAME);
PARSER.declareString((i, s) -> {}, MESSAGE);
}
public Info(String originalIndexName) {
this.originalIndexName = originalIndexName;
this.message = "Waiting for original index [" + originalIndexName + "] to be deleted";
}
public String getOriginalIndexName() {
return originalIndexName;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(), message);
builder.field(ORIGINAL_INDEX_NAME.getPreferredName(), originalIndexName);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(originalIndexName);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Info other = (Info) obj;
return Objects.equals(originalIndexName, other.originalIndexName);
}
@Override
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xpack.core.indexlifecycle.AllocationRoutedStep.Info;
import java.io.IOException;
public class AllocationRoutedStepInfoTests extends AbstractXContentTestCase<AllocationRoutedStep.Info> {
@Override
protected Info createTestInstance() {
return new Info(randomNonNegativeLong(), randomBoolean());
}
@Override
protected Info doParseInstance(XContentParser parser) throws IOException {
return Info.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
public final void testEqualsAndHashcode() {
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copyInstance, this::mutateInstance);
}
}
protected final Info copyInstance(Info instance) throws IOException {
return new Info(instance.getNumberShardsLeftToAllocate(), instance.allShardsActive());
}
protected Info mutateInstance(Info instance) throws IOException {
long shardsToAllocate = instance.getNumberShardsLeftToAllocate();
boolean allShardsActive = instance.allShardsActive();
switch (between(0, 1)) {
case 0:
shardsToAllocate += between(1, 20);
break;
case 1:
allShardsActive = allShardsActive == false;
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new Info(shardsToAllocate, allShardsActive);
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.Node;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep.Result;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import java.util.Map;
@ -97,7 +98,8 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED));
AllocationRoutedStep step = createRandomInstance();
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, true);
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
new ClusterStateWaitStep.Result(true, null));
}
public void testConditionMetOnlyOneCopyAllocated() {
@ -131,7 +133,8 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
ShardRoutingState.STARTED));
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey(), false);
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, true);
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
new ClusterStateWaitStep.Result(true, null));
}
public void testExecuteAllocateNotComplete() throws Exception {
@ -164,7 +167,8 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 1), "node2", true, ShardRoutingState.STARTED));
AllocationRoutedStep step = createRandomInstance();
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, false);
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(1, true)));
}
public void testExecuteAllocateNotCompleteOnlyOneCopyAllocated() throws Exception {
@ -199,7 +203,8 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
ShardRoutingState.STARTED));
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey(), true);
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, false);
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(1, true)));
}
public void testExecuteAllocateUnassigned() throws Exception {
@ -233,7 +238,8 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
new UnassignedInfo(randomFrom(Reason.values()), "the shard is intentionally unassigned")));
AllocationRoutedStep step = createRandomInstance();
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, false);
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(-1, false)));
}
public void testExecuteIndexMissing() throws Exception {
@ -248,8 +254,8 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
}
private void assertAllocateStatus(Index index, int shards, int replicas, AllocationRoutedStep step, Settings.Builder existingSettings,
Settings.Builder node1Settings, Settings.Builder node2Settings,
IndexRoutingTable.Builder indexRoutingTable, boolean expectComplete) {
Settings.Builder node1Settings, Settings.Builder node2Settings, IndexRoutingTable.Builder indexRoutingTable,
ClusterStateWaitStep.Result expectedResult) {
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()).settings(existingSettings).numberOfShards(shards)
.numberOfReplicas(replicas).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
@ -262,6 +268,8 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
.add(DiscoveryNode.createLocal(node2Settings.build(), new TransportAddress(TransportAddress.META_ADDRESS, 9201),
"node2")))
.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
assertEquals(expectComplete, step.isConditionMet(index, clusterState));
Result actualResult = step.isConditionMet(index, clusterState);
assertEquals(expectedResult.isComplete(), actualResult.isComplete());
assertEquals(expectedResult.getInfomationContext(), actualResult.getInfomationContext());
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep.Result;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import java.util.concurrent.TimeUnit;
@ -75,7 +76,9 @@ public class PhaseAfterStepTests extends AbstractStepTestCase<PhaseAfterStep> {
long after = randomNonNegativeLong();
long now = creationDate + after + randomIntBetween(0, 2);
PhaseAfterStep step = new PhaseAfterStep(() -> now, TimeValue.timeValueMillis(after), null, null);
assertTrue(step.isConditionMet(index, clusterState));
Result result = step.isConditionMet(index, clusterState);
assertTrue(result.isComplete());
assertNull(result.getInfomationContext());
}
public void testConditionNotMet() {
@ -93,6 +96,8 @@ public class PhaseAfterStepTests extends AbstractStepTestCase<PhaseAfterStep> {
long after = randomNonNegativeLong();
long now = creationDate + after - randomIntBetween(1, 1000);
PhaseAfterStep step = new PhaseAfterStep(() -> now, TimeValue.timeValueMillis(after), null, null);
assertFalse(step.isConditionMet(index, clusterState));
Result result = step.isConditionMet(index, clusterState);
assertFalse(result.isComplete());
assertNull(result.getInfomationContext());
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xpack.core.indexlifecycle.ReplicasAllocatedStep.Info;
import java.io.IOException;
public class ReplicasAllocatedStepInfoTests extends AbstractXContentTestCase<ReplicasAllocatedStep.Info> {
@Override
protected Info createTestInstance() {
return new Info(randomNonNegativeLong(), randomNonNegativeLong(), randomBoolean());
}
@Override
protected Info doParseInstance(XContentParser parser) throws IOException {
return Info.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
public final void testEqualsAndHashcode() {
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copyInstance, this::mutateInstance);
}
}
protected final Info copyInstance(Info instance) throws IOException {
return new Info(instance.getExpectedReplicas(), instance.getActualReplicas(), instance.allShardsActive());
}
protected Info mutateInstance(Info instance) throws IOException {
long expectedReplicas = instance.getExpectedReplicas();
long actualReplicas = instance.getActualReplicas();
boolean allShardsActive = instance.allShardsActive();
switch (between(0, 2)) {
case 0:
expectedReplicas += between(1, 20);
break;
case 1:
actualReplicas += between(1, 20);
break;
case 2:
allShardsActive = allShardsActive == false;
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new Info(expectedReplicas, actualReplicas, allShardsActive);
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.Node;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep.Result;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
public class ReplicasAllocatedStepTests extends AbstractStepTestCase<ReplicasAllocatedStep> {
@ -87,7 +88,9 @@ public class ReplicasAllocatedStepTests extends AbstractStepTestCase<ReplicasAll
nodeId, true, ShardRoutingState.STARTED)))
.build())
.build();
assertTrue(step.isConditionMet(indexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
assertTrue(result.isComplete());
assertNull(result.getInfomationContext());
}
public void testConditionNotMetAllocation() {
@ -117,7 +120,10 @@ public class ReplicasAllocatedStepTests extends AbstractStepTestCase<ReplicasAll
.build())
.build();
assertFalse(step.isConditionMet(indexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
assertFalse(result.isComplete());
assertEquals(new ReplicasAllocatedStep.Info(step.getNumberReplicas(), step.getNumberReplicas(), false),
result.getInfomationContext());
}
public void testConditionNotMetNumberReplicas() {
@ -147,7 +153,10 @@ public class ReplicasAllocatedStepTests extends AbstractStepTestCase<ReplicasAll
.build())
.build();
assertFalse(step.isConditionMet(indexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
assertFalse(result.isComplete());
assertEquals(new ReplicasAllocatedStep.Info(step.getNumberReplicas(), indexMetadata.getNumberOfReplicas(), true),
result.getInfomationContext());
}
public void testConditionIndexMissing() throws Exception {

View File

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xpack.core.indexlifecycle.ShrunkShardsAllocatedStep.Info;
import java.io.IOException;
public class ShrunkShardsAllocatedStepInfoTests extends AbstractXContentTestCase<ShrunkShardsAllocatedStep.Info> {
@Override
protected Info createTestInstance() {
return new Info(randomBoolean(), randomIntBetween(0, 10000), randomIntBetween(0, 10000), randomBoolean());
}
@Override
protected Info doParseInstance(XContentParser parser) throws IOException {
return Info.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
public final void testEqualsAndHashcode() {
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copyInstance, this::mutateInstance);
}
}
protected final Info copyInstance(Info instance) throws IOException {
return new Info(instance.shrunkIndexExists(), instance.getExpectedShards(), instance.getActualShards(), instance.allShardsActive());
}
protected Info mutateInstance(Info instance) throws IOException {
boolean shrunkIndexExists = instance.shrunkIndexExists();
int expectedShards = instance.getExpectedShards();
int actualShards = instance.getActualShards();
boolean allShardsActive = instance.allShardsActive();
switch (between(0, 3)) {
case 0:
shrunkIndexExists = shrunkIndexExists == false;
break;
case 1:
expectedShards += between(1, 20);
break;
case 2:
actualShards += between(1, 20);
break;
case 3:
allShardsActive = allShardsActive == false;
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new Info(shrunkIndexExists, expectedShards, actualShards, allShardsActive);
}
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.Node;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep.Result;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
public class ShrunkShardsAllocatedStepTests extends AbstractStepTestCase<ShrunkShardsAllocatedStep> {
@ -101,7 +102,9 @@ public class ShrunkShardsAllocatedStepTests extends AbstractStepTestCase<ShrunkS
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.routingTable(RoutingTable.builder().add(builder.build()).build()).build();
assertTrue(step.isConditionMet(originalIndexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(originalIndexMetadata.getIndex(), clusterState);
assertTrue(result.isComplete());
assertNull(result.getInfomationContext());
}
public void testConditionNotMetBecauseOfActive() {
@ -139,7 +142,10 @@ public class ShrunkShardsAllocatedStepTests extends AbstractStepTestCase<ShrunkS
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.routingTable(RoutingTable.builder().add(builder.build()).build()).build();
assertFalse(step.isConditionMet(originalIndexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(originalIndexMetadata.getIndex(), clusterState);
assertFalse(result.isComplete());
assertEquals(new ShrunkShardsAllocatedStep.Info(true, shrinkNumberOfShards, shrinkNumberOfShards, false),
result.getInfomationContext());
}
public void testConditionNotMetBecauseOfShardCount() {
@ -178,7 +184,10 @@ public class ShrunkShardsAllocatedStepTests extends AbstractStepTestCase<ShrunkS
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.routingTable(RoutingTable.builder().add(builder.build()).build()).build();
assertFalse(step.isConditionMet(originalIndexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(originalIndexMetadata.getIndex(), clusterState);
assertFalse(result.isComplete());
assertEquals(new ShrunkShardsAllocatedStep.Info(true, shrinkNumberOfShards, actualShrinkNumberShards, true),
result.getInfomationContext());
}
public void testConditionNotMetBecauseOfShrunkIndexDoesntExistYet() {
@ -203,6 +212,8 @@ public class ShrunkShardsAllocatedStepTests extends AbstractStepTestCase<ShrunkS
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
assertFalse(step.isConditionMet(originalIndexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(originalIndexMetadata.getIndex(), clusterState);
assertFalse(result.isComplete());
assertEquals(new ShrunkShardsAllocatedStep.Info(false, -1, -1, false), result.getInfomationContext());
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xpack.core.indexlifecycle.ShrunkenIndexCheckStep.Info;
import java.io.IOException;
public class ShrunkenIndexCheckStepInfoTests extends AbstractXContentTestCase<ShrunkenIndexCheckStep.Info> {
@Override
protected Info createTestInstance() {
return new Info(randomAlphaOfLengthBetween(10, 20));
}
@Override
protected Info doParseInstance(XContentParser parser) throws IOException {
return Info.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
public final void testEqualsAndHashcode() {
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copyInstance, this::mutateInstance);
}
}
protected final Info copyInstance(Info instance) throws IOException {
return new Info(instance.getOriginalIndexName());
}
protected Info mutateInstance(Info instance) throws IOException {
return new Info(randomValueOtherThan(instance.getOriginalIndexName(), () -> randomAlphaOfLengthBetween(10, 20)));
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep.Result;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import static org.hamcrest.Matchers.equalTo;
@ -63,7 +64,9 @@ public class ShrunkenIndexCheckStepTests extends AbstractStepTestCase<ShrunkenIn
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
assertTrue(step.isConditionMet(indexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
assertTrue(result.isComplete());
assertNull(result.getInfomationContext());
}
public void testConditionNotMetBecauseNotSameShrunkenIndex() {
@ -78,7 +81,9 @@ public class ShrunkenIndexCheckStepTests extends AbstractStepTestCase<ShrunkenIn
.put(IndexMetaData.builder(shrinkIndexMetadata))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
assertFalse(step.isConditionMet(shrinkIndexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(shrinkIndexMetadata.getIndex(), clusterState);
assertFalse(result.isComplete());
assertEquals(new ShrunkenIndexCheckStep.Info(sourceIndex), result.getInfomationContext());
}
public void testConditionNotMetBecauseSourceIndexExists() {
@ -98,7 +103,9 @@ public class ShrunkenIndexCheckStepTests extends AbstractStepTestCase<ShrunkenIn
.put(IndexMetaData.builder(shrinkIndexMetadata))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
assertFalse(step.isConditionMet(shrinkIndexMetadata.getIndex(), clusterState));
Result result = step.isConditionMet(shrinkIndexMetadata.getIndex(), clusterState);
assertFalse(result.isComplete());
assertEquals(new ShrunkenIndexCheckStep.Info(sourceIndex), result.getInfomationContext());
}
public void testIllegalState() {

View File

@ -10,11 +10,13 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import java.io.IOException;
import java.util.function.LongSupplier;
public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
@ -48,7 +50,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState execute(ClusterState currentState) throws IOException {
Step currentStep = startStep;
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy,
currentState.metaData().index(index).getSettings());
@ -74,8 +76,8 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
// cluster state so it can be applied and we will
// wait for the next trigger to evaluate the
// condition again
boolean complete = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState);
if (complete) {
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState);
if (result.isComplete()) {
if (currentStep.getNextStepKey() == null) {
return currentState;
}
@ -83,7 +85,12 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
currentStep.getNextStepKey(), nowSupplier);
} else {
logger.debug("condition not met, returning existing state");
return currentState;
ToXContentObject stepInfo = result.getInfomationContext();
if (stepInfo == null) {
return currentState;
} else {
return IndexLifecycleRunner.addStepInfoToClusterState(index, currentState, stepInfo);
}
}
}
currentStep = policyStepsRegistry.getStep(policy, currentStep.getNextStepKey());

View File

@ -194,6 +194,17 @@ public class IndexLifecycleRunner {
return newClusterStateBuilder;
}
static ClusterState addStepInfoToClusterState(Index index, ClusterState clusterState, ToXContentObject stepInfo) throws IOException {
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
XContentBuilder infoXContentBuilder = JsonXContent.contentBuilder();
stepInfo.toXContent(infoXContentBuilder, ToXContent.EMPTY_PARAMS);
String stepInfoString = BytesReference.bytes(infoXContentBuilder).utf8ToString();
Settings.Builder indexSettings = Settings.builder().put(idxMeta.getSettings())
.put(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.getKey(), stepInfoString);
ClusterState.Builder newClusterStateBuilder = newClusterStateWithIndexSettings(index, clusterState, indexSettings);
return newClusterStateBuilder.build();
}
private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) {
logger.debug("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
+ nextStepKey);

View File

@ -8,12 +8,8 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
@ -54,12 +50,7 @@ public class SetStepInfoUpdateTask extends ClusterStateUpdateTask {
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexSettings))) {
XContentBuilder infoXContentBuilder = JsonXContent.contentBuilder();
stepInfo.toXContent(infoXContentBuilder, ToXContent.EMPTY_PARAMS);
String stepInfoString = BytesReference.bytes(infoXContentBuilder).utf8ToString();
Settings.Builder newSettings = Settings.builder().put(indexSettings).put(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.getKey(),
stepInfoString);
return IndexLifecycleRunner.newClusterStateWithIndexSettings(index, currentState, newSettings).build();
return IndexLifecycleRunner.addStepInfoToClusterState(index, currentState, stepInfo);
} else {
// either the policy has changed or the step is now
// not the same as when we submitted the update task. In

View File

@ -33,6 +33,7 @@ import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunnerTests.MockClus
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunnerTests.MockInitializePolicyContextStep;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -104,7 +105,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
policyStepsRegistry.update(clusterState, null, () -> 0L);
}
public void testExecuteAllUntilEndOfPolicy() {
public void testExecuteAllUntilEndOfPolicy() throws IOException {
Step startStep = policyStepsRegistry.getFirstStep(allClusterPolicyName);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(allClusterPolicyName, index, startStep, policyStepsRegistry, () -> now);
@ -115,9 +116,10 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
assertThat(currentStepKey, equalTo(TerminalPolicyStep.KEY));
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(""));
}
public void testExecuteMoveToNextActionStep() {
public void testExecuteMoveToNextActionStep() throws IOException {
secondStep.setWillComplete(false);
Step startStep = policyStepsRegistry.getFirstStep(mixedPolicyName);
long now = randomNonNegativeLong();
@ -129,9 +131,10 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
assertThat(currentStepKey, equalTo(secondStepKey));
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(""));
}
public void testNeverExecuteNonClusterStateStep() {
public void testNeverExecuteNonClusterStateStep() throws IOException {
setStateToKey(thirdStepKey);
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, thirdStepKey);
long now = randomNonNegativeLong();
@ -139,7 +142,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
assertThat(task.execute(clusterState), sameInstance(clusterState));
}
public void testExecuteUntilFirstNonClusterStateStep() {
public void testExecuteUntilFirstNonClusterStateStep() throws IOException {
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, secondStepKey);
long now = randomNonNegativeLong();
@ -151,9 +154,10 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
assertThat(secondStep.getExecuteCount(), equalTo(1L));
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(""));
}
public void testExecuteIncompleteWaitStep() {
public void testExecuteIncompleteWaitStepNoInfo() throws IOException {
secondStep.setWillComplete(false);
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, secondStepKey);
@ -166,6 +170,26 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
assertThat(secondStep.getExecuteCount(), equalTo(1L));
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(""));
}
public void testExecuteIncompleteWaitStepWithInfo() throws IOException {
secondStep.setWillComplete(false);
IndexLifecycleRunnerTests.RandomStepInfo stepInfo = new IndexLifecycleRunnerTests.RandomStepInfo();
secondStep.expectedInfo(stepInfo);
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, secondStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
ClusterState newState = task.execute(clusterState);
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
assertThat(currentStepKey, equalTo(secondStepKey));
assertThat(firstStep.getExecuteCount(), equalTo(0L));
assertThat(secondStep.getExecuteCount(), equalTo(1L));
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()),
equalTo(stepInfo.toString()));
}
public void testOnFailure() {

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
@ -587,6 +588,20 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
assertClusterStateOnErrorStep(clusterState, index, currentStep, newClusterState, cause, now);
}
public void testAddStepInfoToClusterState() throws IOException {
String indexName = "my_index";
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
RandomStepInfo stepInfo = new RandomStepInfo();
ClusterState clusterState = buildClusterState(indexName,
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
Index index = clusterState.metaData().index(indexName).getIndex();
ClusterState newClusterState = IndexLifecycleRunner.addStepInfoToClusterState(index, clusterState, stepInfo);
assertClusterStateStepInfo(clusterState, index, currentStep, newClusterState, stepInfo);
}
private ClusterState buildClusterState(String indexName, Settings.Builder indexSettingsBuilder) {
Settings indexSettings = indexSettingsBuilder.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
@ -651,7 +666,31 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
}
private class RandomStepInfo implements ToXContentObject {
private void assertClusterStateStepInfo(ClusterState oldClusterState, Index index, StepKey currentStep, ClusterState newClusterState,
ToXContentObject stepInfo) throws IOException {
XContentBuilder stepInfoXContentBuilder = JsonXContent.contentBuilder();
stepInfo.toXContent(stepInfoXContentBuilder, ToXContent.EMPTY_PARAMS);
String expectedstepInfoValue = BytesReference.bytes(stepInfoXContentBuilder).utf8ToString();
assertNotSame(oldClusterState, newClusterState);
MetaData newMetadata = newClusterState.metaData();
assertNotSame(oldClusterState.metaData(), newMetadata);
IndexMetaData newIndexMetadata = newMetadata.getIndexSafe(index);
assertNotSame(oldClusterState.metaData().index(index), newIndexMetadata);
Settings newIndexSettings = newIndexMetadata.getSettings();
assertNotSame(oldClusterState.metaData().index(index).getSettings(), newIndexSettings);
assertEquals(currentStep.getPhase(), LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(newIndexSettings));
assertEquals(currentStep.getAction(), LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(newIndexSettings));
assertEquals(currentStep.getName(), LifecycleSettings.LIFECYCLE_STEP_SETTING.get(newIndexSettings));
assertEquals(expectedstepInfoValue, LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newIndexSettings));
assertEquals(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newIndexSettings));
assertEquals(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newIndexSettings));
assertEquals(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
}
static class RandomStepInfo implements ToXContentObject {
private final String key;
private final String value;
@ -685,6 +724,11 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
RandomStepInfo other = (RandomStepInfo) obj;
return Objects.equals(key, other.key) && Objects.equals(value, other.value);
}
@Override
public String toString() {
return Strings.toString(this);
}
}
private static class MockAsyncActionStep extends AsyncActionStep {
@ -802,6 +846,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
private RuntimeException exception;
private boolean willComplete;
private long executeCount = 0;
private ToXContentObject expectedInfo = null;
MockClusterStateWaitStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
@ -815,17 +860,21 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
this.willComplete = willComplete;
}
void expectedInfo(ToXContentObject expectedInfo) {
this.expectedInfo = expectedInfo;
}
public long getExecuteCount() {
return executeCount;
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
public Result isConditionMet(Index index, ClusterState clusterState) {
executeCount++;
if (exception != null) {
throw exception;
}
return willComplete;
return new Result(willComplete, expectedInfo);
}
}