more changes + updated api

This commit is contained in:
Tal Levy 2018-04-03 17:42:25 -07:00
parent 1ac1ee413f
commit 47dcc8fe9c
24 changed files with 465 additions and 169 deletions

View File

@ -38,8 +38,6 @@ import static org.elasticsearch.xpack.core.indexlifecycle.ObjectParserUtils.conv
* particular point in the lifecycle of an index.
*/
public class Phase implements ToXContentObject, Writeable {
public static final String PHASE_COMPLETED = "ACTIONS COMPLETED";
private static final Logger logger = ESLoggerFactory.getLogger(Phase.class);
public static final ParseField AFTER_FIELD = new ParseField("after");

View File

@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import java.util.Objects;
/**
@ -23,7 +26,6 @@ public abstract class Step {
return key;
}
public StepKey getNextStepKey() {
return nextStepKey;
}
@ -32,6 +34,28 @@ public abstract class Step {
return true;
}
@Override
public int hashCode() {
return Objects.hash(key, nextStepKey);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Step other = (Step) obj;
return Objects.equals(key, other.key) && Objects.equals(nextStepKey, other.nextStepKey);
}
@Override
public String toString() {
return key + " => " + nextStepKey;
}
public static class StepKey {
private final String phase;
private final String action;
@ -76,6 +100,5 @@ public abstract class Step {
public String toString() {
return String.format("[%s][%s][%s]", phase, action, name);
}
}
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class GetLifecycleAction
@ -51,38 +52,42 @@ public class GetLifecycleAction
public static class Response extends ActionResponse implements ToXContentObject {
private LifecyclePolicy policy;
private List<LifecyclePolicy> policies;
public Response() {
}
public Response(LifecyclePolicy policy) {
this.policy = policy;
public Response(List<LifecyclePolicy> policies) {
this.policies = policies;
}
public LifecyclePolicy getPolicy() {
return policy;
public List<LifecyclePolicy> getPolicies() {
return policies;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
policy.toXContent(builder, params);
builder.startObject();
for (LifecyclePolicy policy : policies) {
builder.field(policy.getName(), policy);
}
builder.endObject();
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
policy = new LifecyclePolicy(in);
policies = in.readList(LifecyclePolicy::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
policy.writeTo(out);
out.writeList(policies);
}
@Override
public int hashCode() {
return Objects.hash(policy);
return Objects.hash(policies);
}
@Override
@ -94,7 +99,7 @@ public class GetLifecycleAction
return false;
}
Response other = (Response) obj;
return Objects.equals(policy, other.policy);
return Objects.equals(policies, other.policies);
}
@Override
@ -105,20 +110,21 @@ public class GetLifecycleAction
}
public static class Request extends AcknowledgedRequest<Request> {
private String[] policyNames;
public static final ParseField POLICY_FIELD = new ParseField("policy");
private String policyName;
public Request(String policyName) {
this.policyName = policyName;
public Request(String... policyNames) {
if (policyNames == null) {
throw new IllegalArgumentException("ids cannot be null");
}
this.policyNames = policyNames;
}
public Request() {
policyNames = Strings.EMPTY_ARRAY;
}
public String getPolicyName() {
return policyName;
public String[] getPolicyNames() {
return policyNames;
}
@Override
@ -129,18 +135,18 @@ public class GetLifecycleAction
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
policyName = in.readString();
policyNames = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(policyName);
out.writeStringArray(policyNames);
}
@Override
public int hashCode() {
return Objects.hash(policyName);
return policyNames.hashCode();
}
@Override
@ -152,7 +158,7 @@ public class GetLifecycleAction
return false;
}
Request other = (Request) obj;
return Objects.equals(policyName, other.policyName);
return Objects.equals(policyNames, other.policyNames);
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
@ -29,8 +28,7 @@ public class MockAction implements LifecycleAction {
a -> new MockAction((List<MockStep>) a[0]));
static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> p.list(),
new ParseField("steps"), ObjectParser.ValueType.OBJECT_ARRAY);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), MockStep.PARSER, new ParseField("steps"));
}
public static MockAction parse(XContentParser parser) {
@ -95,6 +93,6 @@ public class MockAction implements LifecycleAction {
@Override
public String toString() {
return Strings.toString(this);
return Strings.toString(this, true, true);
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -17,17 +18,7 @@ public class MockActionTests extends AbstractSerializingTestCase<MockAction> {
@Override
protected MockAction createTestInstance() {
List<MockStep> steps = new ArrayList<>();
int stepCount = randomIntBetween(2, 10);
Step.StepKey currentStepKey = randomStepKey();
Step.StepKey nextStepKey = null;
for (int i = 0; i < stepCount - 1; i++) {
nextStepKey = randomStepKey();
steps.add(new MockStep(currentStepKey, nextStepKey));
currentStepKey = nextStepKey;
}
steps.add(new MockStep(currentStepKey, null));
return new MockAction(steps);
return randomMockAction(null);
}
@Override
@ -52,7 +43,22 @@ public class MockActionTests extends AbstractSerializingTestCase<MockAction> {
return new MockAction(steps);
}
private Step.StepKey randomStepKey() {
// TODO(talevy): design this in a way that we can build up a proper LifecyclePolicy where the steps connect
public static MockAction randomMockAction(@Nullable Step.StepKey nextExternalStepKey) {
List<MockStep> steps = new ArrayList<>();
int stepCount = randomIntBetween(2, 10);
Step.StepKey currentStepKey = randomStepKey();
Step.StepKey nextStepKey;
for (int i = 0; i < stepCount - 1; i++) {
nextStepKey = randomStepKey();
steps.add(new MockStep(currentStepKey, nextStepKey));
currentStepKey = nextStepKey;
}
steps.add(new MockStep(currentStepKey, nextExternalStepKey));
return new MockAction(steps);
}
private static Step.StepKey randomStepKey() {
return new Step.StepKey(randomAlphaOfLength(5),
randomAlphaOfLength(5), randomAlphaOfLength(5));
}

View File

@ -5,23 +5,47 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class MockStep extends Step implements ToXContent, Writeable {
public static final String NAME = "TEST_STEP";
static final ConstructingObjectParser<MockStep, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new MockStep((StepKey) a[0], (StepKey) a[1]));
private static final ConstructingObjectParser<StepKey, Void> KEY_PARSER = new ConstructingObjectParser<>("TEST_KEY",
a -> new StepKey((String) a[0], (String) a[1], (String) a[2]));
static {
KEY_PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField("phase"));
KEY_PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField("action"));
KEY_PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField("name"));
PARSER.declareObject(ConstructingObjectParser.constructorArg(), KEY_PARSER::apply,
new ParseField("step_key"));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), KEY_PARSER::apply,
new ParseField("next_step_key"));
}
public MockStep(StepKey stepKey, Step.StepKey nextStepKey) {
super(stepKey, nextStepKey);
}
public MockStep(StreamInput in) throws IOException {
super(new StepKey(in.readString(), in.readString(), in.readString()),
new StepKey(in.readString(), in.readString(), in.readString()));
super(new StepKey(in.readString(), in.readString(), in.readString()), readOptionalNextStepKey(in));
}
private static StepKey readOptionalNextStepKey(StreamInput in) throws IOException {
if (in.readBoolean()) {
return new StepKey(in.readString(), in.readString(), in.readString());
}
return null;
}
@Override
@ -29,9 +53,13 @@ public class MockStep extends Step implements ToXContent, Writeable {
out.writeString(getKey().getPhase());
out.writeString(getKey().getAction());
out.writeString(getKey().getName());
out.writeString(getNextStepKey().getPhase());
out.writeString(getNextStepKey().getAction());
out.writeString(getNextStepKey().getName());
boolean hasNextStep = getNextStepKey() != null;
out.writeBoolean(hasNextStep);
if (hasNextStep) {
out.writeString(getNextStepKey().getPhase());
out.writeString(getNextStepKey().getAction());
out.writeString(getNextStepKey().getName());
}
}
@Override
@ -45,16 +73,23 @@ public class MockStep extends Step implements ToXContent, Writeable {
builder.field("name", getKey().getName());
}
builder.endObject();
builder.startObject("next_step_key");
{
builder.field("phase", getNextStepKey().getPhase());
builder.field("action", getNextStepKey().getAction());
builder.field("name", getNextStepKey().getName());
if (getNextStepKey() != null) {
builder.startObject("next_step_key");
{
builder.field("phase", getNextStepKey().getPhase());
builder.field("action", getNextStepKey().getAction());
builder.field("name", getNextStepKey().getName());
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder;
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}

View File

@ -5,30 +5,15 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.AllocateAction;
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction;
import org.elasticsearch.xpack.core.indexlifecycle.ForceMergeAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
import org.elasticsearch.xpack.core.indexlifecycle.ReplicasAction;
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType.ORDERED_VALID_COLD_ACTIONS;
@ -41,7 +26,6 @@ import static org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleTyp
import static org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType.VALID_PHASES;
import static org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType.VALID_WARM_ACTIONS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class TimeseriesLifecycleTypeTests extends ESTestCase {

View File

@ -23,6 +23,6 @@ public class GetLifecycleRequestTests extends AbstractStreamableTestCase<GetLife
@Override
protected Request mutateInstance(Request request) {
return new Request(request.getPolicyName() + randomAlphaOfLengthBetween(1, 10));
return new Request(request.getPolicyNames() + randomAlphaOfLengthBetween(1, 10));
}
}

View File

@ -18,24 +18,23 @@ import org.elasticsearch.xpack.core.indexlifecycle.TestLifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction.Response;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GetLifecycleResponseTests extends AbstractStreamableTestCase<GetLifecycleAction.Response> {
private String lifecycleName;
@Before
public void setup() {
lifecycleName = randomAlphaOfLength(20); // NORELEASE we need to randomise the lifecycle name rather
// than use the same name for all instances
}
@Override
protected Response createTestInstance() {
return new Response(new LifecyclePolicy(TestLifecycleType.INSTANCE, lifecycleName, Collections.emptyMap()));
String randomPrefix = randomAlphaOfLength(5);
List<LifecyclePolicy> policies = new ArrayList<>();
for (int i = 0; i < randomIntBetween(0, 2); i++) {
policies.add(new LifecyclePolicy(TestLifecycleType.INSTANCE, randomPrefix + i, Collections.emptyMap()));
}
return new Response(policies);
}
@Override
@ -51,22 +50,16 @@ public class GetLifecycleResponseTests extends AbstractStreamableTestCase<GetLif
@Override
protected Response mutateInstance(Response response) {
LifecyclePolicy policy = response.getPolicy();
String name = policy.getName();
Map<String, Phase> phases = policy.getPhases();
switch (between(0, 1)) {
case 0:
name = name + randomAlphaOfLengthBetween(1, 5);
break;
case 1:
phases = new HashMap<>(phases);
String newPhaseName = randomAlphaOfLengthBetween(1, 10);
phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)),
Collections.emptyMap()));
break;
default:
throw new AssertionError("Illegal randomisation branch");
List<LifecyclePolicy> policies = new ArrayList<>(response.getPolicies());
if (policies.size() > 0) {
if (randomBoolean()) {
policies.add(new LifecyclePolicy(TestLifecycleType.INSTANCE, randomAlphaOfLength(2), Collections.emptyMap()));
} else {
policies.remove(policies.size() - 1);
}
} else {
policies.add(new LifecyclePolicy(TestLifecycleType.INSTANCE, randomAlphaOfLength(2), Collections.emptyMap()));
}
return new Response(new LifecyclePolicy(TestLifecycleType.INSTANCE, name, phases));
return new Response(policies);
}
}

View File

@ -19,27 +19,25 @@ import java.util.function.Function;
public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = ESLoggerFactory.getLogger(ExecuteStepsUpdateTask.class);
private final String policy;
private final Index index;
private final Step startStep;
private final Function<ClusterState, Step> getCurrentStepInClusterState;
private final Function<Step.StepKey, Step> getStepFromRegistry;
private final BiFunction<ClusterState, Step, ClusterState> moveClusterStateToNextStep;
private final PolicyStepsRegistry policyStepsRegistry;
public ExecuteStepsUpdateTask(Index index, Step startStep, Function<ClusterState, Step> getCurrentStepInClusterState,
BiFunction<ClusterState, Step, ClusterState> moveClusterStateToNextStep,
Function<Step.StepKey, Step> getStepFromRegistry) {
public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry) {
this.policy = policy;
this.index = index;
this.startStep = startStep;
this.getCurrentStepInClusterState = getCurrentStepInClusterState;
this.moveClusterStateToNextStep = moveClusterStateToNextStep;
this.getStepFromRegistry = getStepFromRegistry;
this.policyStepsRegistry = policyStepsRegistry;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
Step currentStep = startStep;
if (currentStep.equals(getCurrentStepInClusterState.apply(currentState))) {
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy,
currentState.metaData().index(index).getSettings());
if (currentStep.equals(registeredCurrentStep)) {
// We can do cluster state steps all together until we
// either get to a step that isn't a cluster state step or a
// cluster state wait step returns not completed
@ -49,7 +47,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
// move
// the cluster state to the next step
currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState);
currentState = moveClusterStateToNextStep.apply(currentState, currentStep);
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey());
} else {
// cluster state wait step so evaluate the
// condition, if the condition is met move to the
@ -59,13 +57,13 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
// condition again
boolean complete = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState);
if (complete) {
currentState = moveClusterStateToNextStep.apply(currentState, currentStep);
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey());
} else {
logger.warn("condition not met, returning existing state");
return currentState;
}
}
currentStep = getStepFromRegistry.apply(currentStep.getNextStepKey());
currentStep = policyStepsRegistry.getStep(policy, currentStep.getNextStepKey());
}
return currentState;
} else {

View File

@ -27,7 +27,6 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
@ -46,10 +45,10 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
}, POLICIES_FIELD);
}
private final SortedMap<String, LifecyclePolicy> policies;
private final Map<String, LifecyclePolicy> policies;
public IndexLifecycleMetadata(SortedMap<String, LifecyclePolicy> policies) {
this.policies = Collections.unmodifiableSortedMap(policies);
public IndexLifecycleMetadata(Map<String, LifecyclePolicy> policies) {
this.policies = Collections.unmodifiableMap(policies);
}
public IndexLifecycleMetadata(StreamInput in) throws IOException {
@ -70,7 +69,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
}
}
public SortedMap<String, LifecyclePolicy> getPolicies() {
public Map<String, LifecyclePolicy> getPolicies() {
return policies;
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
@ -34,7 +33,7 @@ public class IndexLifecycleRunner {
}
public void runPolicy(String policy, Index index, Settings indexSettings, Cause cause) {
Step currentStep = getCurrentStep(policy, indexSettings);
Step currentStep = getCurrentStep(stepRegistry, policy, indexSettings);
logger.warn("running policy with current-step[" + currentStep.getKey() + "]");
if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
if (cause != Cause.SCHEDULE_TRIGGER) {
@ -92,13 +91,10 @@ public class IndexLifecycleRunner {
private void executeClusterStateSteps(Index index, String policy, Step step) {
assert step instanceof ClusterStateActionStep || step instanceof ClusterStateWaitStep;
clusterService.submitStateUpdateTask("ILM", new ExecuteStepsUpdateTask(index, step,
(currentState) -> getCurrentStep(policy, currentState.getMetaData().index(index).getSettings()),
(currentState, currentStep) -> moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey()),
(stepKey) -> stepRegistry.getStep(policy, stepKey)));
clusterService.submitStateUpdateTask("ILM", new ExecuteStepsUpdateTask(policy, index, step, stepRegistry));
}
private StepKey getCurrentStepKey(Settings indexSettings) {
static StepKey getCurrentStepKey(Settings indexSettings) {
String currentPhase = LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(indexSettings);
String currentAction = LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(indexSettings);
String currentStep = LifecycleSettings.LIFECYCLE_STEP_SETTING.get(indexSettings);
@ -113,7 +109,7 @@ public class IndexLifecycleRunner {
}
}
private Step getCurrentStep(String policy, Settings indexSettings) {
static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, Settings indexSettings) {
StepKey currentStepKey = getCurrentStepKey(indexSettings);
if (currentStepKey == null) {
return stepRegistry.getFirstStep(policy);
@ -122,7 +118,7 @@ public class IndexLifecycleRunner {
}
}
private ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey nextStep) {
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey nextStep) {
ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
Builder indexSettings = Settings.builder().put(idxMeta.getSettings()).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
@ -136,9 +132,8 @@ public class IndexLifecycleRunner {
private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey, Cause cause) {
logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
+ nextStepKey + ". because:" + cause.name());
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy,
currentStepKey, (c) -> moveClusterStateToNextStep(index, c, nextStepKey),
(s) -> getCurrentStepKey(s), (c) -> runPolicy(index, c, cause)));
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey,
nextStepKey, newState -> runPolicy(index, newState, cause)));
}
public enum Cause {

View File

@ -11,43 +11,32 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = ESLoggerFactory.getLogger(MoveToNextStepUpdateTask.class);
private final Index index;
private final String policy;
private final Step.StepKey currentStepKey;
private final Function<ClusterState, ClusterState> moveClusterStateToNextStep;
private final Function<Settings, Step.StepKey> getCurrentStepKey;
private final Consumer<ClusterState> runPolicy;
private final Step.StepKey nextStepKey;
private final Listener listener;
public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey,
Function<ClusterState, ClusterState> moveClusterStateToNextStep,
Function<Settings, Step.StepKey> getCurrentStepKey,
Consumer<ClusterState> runPolicy) {
public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey nextStepKey,
Listener listener) {
this.index = index;
this.policy = policy;
this.currentStepKey = currentStepKey;
this.moveClusterStateToNextStep = moveClusterStateToNextStep;
this.getCurrentStepKey = getCurrentStepKey;
this.runPolicy = runPolicy;
this.nextStepKey = nextStepKey;
this.listener = listener;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
&& currentStepKey.equals(getCurrentStepKey.apply(indexSettings))) {
return moveClusterStateToNextStep.apply(currentState);
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexSettings))) {
return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, nextStepKey);
} else {
// either the policy has changed or the step is now
// not the same as when we submitted the update task. In
@ -62,7 +51,7 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
// we moved to the new step in the execute method so we should
// execute the next step
if (oldState != newState) {
runPolicy.accept(newState);
listener.onClusterStateProcessed(newState);
}
}
@ -71,4 +60,8 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
throw new RuntimeException(e); // NORELEASE implement error handling
}
@FunctionalInterface
public interface Listener {
void onClusterStateProcessed(ClusterState clusterState);
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
@ -22,14 +23,12 @@ import java.util.TreeMap;
import java.util.function.LongSupplier;
public class PolicyStepsRegistry {
// keeps track of existing policies in the cluster state
SortedMap<String, LifecyclePolicy> lifecyclePolicyMap;
private SortedMap<String, LifecyclePolicy> lifecyclePolicyMap;
// keeps track of what the first step in a policy is
Map<String, Step> firstStepMap;
private Map<String, Step> firstStepMap;
// keeps track of a mapping from policy/step-name to respective Step
Map<String, Map<Step.StepKey, Step>> stepMap;
private Map<String, Map<Step.StepKey, Step>> stepMap;
public PolicyStepsRegistry() {
this.lifecyclePolicyMap = new TreeMap<>();
@ -37,6 +36,26 @@ public class PolicyStepsRegistry {
this.stepMap = new HashMap<>();
}
PolicyStepsRegistry(SortedMap<String, LifecyclePolicy> lifecyclePolicyMap,
Map<String, Step> firstStepMap, Map<String, Map<Step.StepKey, Step>> stepMap) {
this.lifecyclePolicyMap = lifecyclePolicyMap;
this.firstStepMap = firstStepMap;
this.stepMap = stepMap;
}
SortedMap<String, LifecyclePolicy> getLifecyclePolicyMap() {
return lifecyclePolicyMap;
}
Map<String, Step> getFirstStepMap() {
return firstStepMap;
}
Map<String, Map<Step.StepKey, Step>> getStepMap() {
return stepMap;
}
@SuppressWarnings("unchecked")
public void update(ClusterState currentState, Client client, LongSupplier nowSupplier) {
IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
@ -72,12 +91,16 @@ public class PolicyStepsRegistry {
* as String values in the cluster state.
* @param policy the policy from which to fetch the associated steps from
* @param stepKey the key to the requested {@link Step}
* @return
* @return step
*/
public Step getStep(String policy, Step.StepKey stepKey) {
Step step = stepMap.getOrDefault(policy, Collections.emptyMap()).get(stepKey);
Map<Step.StepKey, Step> steps = stepMap.get(policy);
if (steps == null) {
throw new IllegalStateException("policy [" + policy + "] does not exist");
}
Step step = steps.get(stepKey);
if (step == null) {
step = firstStepMap.get(policy);
throw new IllegalStateException("step [" + stepKey + "] does not exist");
}
return step;
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.indexlifecycle.action;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
@ -20,8 +21,8 @@ public class RestGetLifecycleAction extends BaseRestHandler {
public RestGetLifecycleAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET,
IndexLifecycle.BASE_PATH + "{name}", this);
controller.registerHandler(RestRequest.Method.GET, IndexLifecycle.BASE_PATH, this);
controller.registerHandler(RestRequest.Method.GET, IndexLifecycle.BASE_PATH + "{name}", this);
}
@Override
@ -31,8 +32,8 @@ public class RestGetLifecycleAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String lifecycleName = restRequest.param("name");
GetLifecycleAction.Request getLifecycleRequest = new GetLifecycleAction.Request(lifecycleName);
String[] lifecycleNames = Strings.splitStringByCommaToArray(restRequest.param("name"));
GetLifecycleAction.Request getLifecycleRequest = new GetLifecycleAction.Request(lifecycleNames);
getLifecycleRequest.timeout(restRequest.paramAsTime("timeout", getLifecycleRequest.timeout()));
getLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", getLifecycleRequest.masterNodeTimeout()));

View File

@ -24,6 +24,10 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction.Request;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction.Response;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class TransportGetLifecycleAction extends TransportMasterNodeAction<Request, Response> {
@Inject
@ -47,14 +51,24 @@ public class TransportGetLifecycleAction extends TransportMasterNodeAction<Reque
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
IndexLifecycleMetadata metadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE);
if (metadata == null) {
listener.onFailure(new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName()));
listener.onFailure(new ResourceNotFoundException("Lifecycle policy not found: {}", Arrays.toString(request.getPolicyNames())));
} else {
LifecyclePolicy policy = metadata.getPolicies().get(request.getPolicyName());
if (policy == null) {
listener.onFailure(new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName()));
List<LifecyclePolicy> requestedPolicies;
// if no policies explicitly provided, behave as if `*` was specified
if (request.getPolicyNames().length == 0) {
requestedPolicies = new ArrayList<>(metadata.getPolicies().values());
} else {
listener.onResponse(new Response(policy));
requestedPolicies = new ArrayList<>(request.getPolicyNames().length);
for (String name : request.getPolicyNames()) {
LifecyclePolicy policy = metadata.getPolicies().get(name);
if (policy == null) {
listener.onFailure(new ResourceNotFoundException("Lifecycle policy not found: {}", name));
return;
}
requestedPolicies.add(policy);
}
}
listener.onResponse(new Response(requestedPolicies));
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
public class ExecuteStepsUpdateTaskTests extends ESTestCase {
@Before
public void prepareServices() {
}
public void test() {
// TODO: stub
}
}

View File

@ -13,7 +13,6 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -85,7 +84,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(XPackPlugin.class, CommonAnalysisPlugin.class, ReindexPlugin.class);
return Arrays.asList(XPackPlugin.class, CommonAnalysisPlugin.class);
}
@Override

View File

@ -88,7 +88,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
@Override
protected MetaData.Custom mutateInstance(MetaData.Custom instance) {
IndexLifecycleMetadata metadata = (IndexLifecycleMetadata) instance;
SortedMap<String, LifecyclePolicy> policies = metadata.getPolicies();
Map<String, LifecyclePolicy> policies = metadata.getPolicies();
policies = new TreeMap<>(policies);
String policyName = randomAlphaOfLength(10);
policies.put(policyName, new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, Collections.emptyMap()));

View File

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
public class IndexLifecycleRunnerTests extends ESTestCase {
@Before
public void prepareServices() {
}
public void test() {
// TODO: stub
}
}

View File

@ -183,7 +183,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
public void testServiceSetupOnFirstClusterChange() {
TimeValue pollInterval = TimeValue.timeValueSeconds(randomIntBetween(1, 59));
MetaData metaData = MetaData.builder() .persistentSettings(settings(Version.CURRENT)
MetaData metaData = MetaData.builder().persistentSettings(settings(Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), pollInterval).build())
.build();
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
@ -19,6 +20,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.MockAction;
import org.elasticsearch.xpack.core.indexlifecycle.MockActionTests;
import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
import org.elasticsearch.xpack.core.indexlifecycle.PhaseAfterStep;
@ -66,14 +68,20 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
@Override
protected LifecyclePolicy createTestInstance() {
lifecycleName = randomAlphaOfLengthBetween(1, 20);
return randomLifecyclePolicy(null);
}
static LifecyclePolicy randomLifecyclePolicy(@Nullable String lifecycleName) {
if (lifecycleName == null) {
lifecycleName = randomAlphaOfLengthBetween(1, 20);
}
int numberPhases = randomInt(5);
Map<String, Phase> phases = new HashMap<>(numberPhases);
for (int i = 0; i < numberPhases; i++) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
Map<String, LifecycleAction> actions = new HashMap<>();
if (randomBoolean()) {
MockAction action = new MockAction(Collections.emptyList());
MockAction action = MockActionTests.randomMockAction(null);
actions.put(action.getWriteableName(), action);
}
String phaseName = randomAlphaOfLength(10);

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
public class MoveToNextStepUpdateTaskTests extends ESTestCase {
@Before
public void prepareServices() {
}
public void test() {
// TODO: stub
}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.indexlifecycle.LifecyclePolicyTests.randomLifecyclePolicy;
import static org.hamcrest.Matchers.equalTo;
public class PolicyStepsRegistryTests extends ESTestCase {
private static final Step.StepKey MOCK_STEP_KEY = new Step.StepKey("mock", "mock", "mock");
public void testGetFirstStep() {
String policyName = randomAlphaOfLengthBetween(2, 10);
Step expectedFirstStep = new MockStep(MOCK_STEP_KEY, null);
Map<String, Step> firstStepMap = Collections.singletonMap(policyName, expectedFirstStep);
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, firstStepMap, null);
Step actualFirstStep = registry.getFirstStep(policyName);
assertThat(actualFirstStep, equalTo(expectedFirstStep));
}
public void testGetFirstStepUnknownPolicy() {
String policyName = randomAlphaOfLengthBetween(2, 10);
Step expectedFirstStep = new MockStep(MOCK_STEP_KEY, null);
Map<String, Step> firstStepMap = Collections.singletonMap(policyName, expectedFirstStep);
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, firstStepMap, null);
Step actualFirstStep = registry.getFirstStep(policyName + "unknown");
assertNull(actualFirstStep);
}
public void testGetStep() {
String policyName = randomAlphaOfLengthBetween(2, 10);
Step expectedStep = new MockStep(MOCK_STEP_KEY, null);
Map<String, Map<Step.StepKey, Step>> stepMap =
Collections.singletonMap(policyName, Collections.singletonMap(MOCK_STEP_KEY, expectedStep));
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, stepMap);
Step actualStep = registry.getStep(policyName, MOCK_STEP_KEY);
assertThat(actualStep, equalTo(expectedStep));
}
public void testGetStepUnknownPolicy() {
String policyName = randomAlphaOfLengthBetween(2, 10);
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, Collections.emptyMap());
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> registry.getStep(policyName, MOCK_STEP_KEY));
assertThat(exception.getMessage(), equalTo("policy [" + policyName +"] does not exist"));
}
public void testGetStepUnknownStepKey() {
String policyName = randomAlphaOfLengthBetween(2, 10);
Step expectedStep = new MockStep(MOCK_STEP_KEY, null);
Map<String, Map<Step.StepKey, Step>> stepMap =
Collections.singletonMap(policyName, Collections.singletonMap(MOCK_STEP_KEY, expectedStep));
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, stepMap);
Step.StepKey unknownStepKey = new Step.StepKey(MOCK_STEP_KEY.getPhase(),
MOCK_STEP_KEY.getAction(),MOCK_STEP_KEY.getName() + "not");
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> registry.getStep(policyName, unknownStepKey));
assertThat(exception.getMessage(), equalTo("step [" + unknownStepKey +"] does not exist"));
}
public void testUpdateFromNothingToSomethingToNothing() {
LifecyclePolicy newPolicy = randomLifecyclePolicy(null);
List<Step> policySteps = newPolicy.toSteps(null, () -> 0L);
Map<String, LifecyclePolicy> policyMap = Collections.singletonMap(newPolicy.getName(), newPolicy);
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build())
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap))
.build();
String nodeId = randomAlphaOfLength(10);
DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT)
.put(Node.NODE_MASTER_SETTING.getKey(), true).build(),
new TransportAddress(TransportAddress.META_ADDRESS, 9300), nodeId);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
// start with empty registry
PolicyStepsRegistry registry = new PolicyStepsRegistry();
// add new policy
registry.update(currentState, null, () -> 0L);
assertThat(registry.getFirstStep(newPolicy.getName()), equalTo(policySteps.get(0)));
assertThat(registry.getLifecyclePolicyMap().size(), equalTo(1));
assertThat(registry.getFirstStepMap().size(), equalTo(1));
assertThat(registry.getStepMap().size(), equalTo(1));
Map<Step.StepKey, Step> registeredStepsForPolicy = registry.getStepMap().get(newPolicy.getName());
assertThat(registeredStepsForPolicy.size(), equalTo(policySteps.size()));
for (Step step : policySteps) {
assertThat(registeredStepsForPolicy.get(step.getKey()), equalTo(step));
assertThat(registry.getStep(newPolicy.getName(), step.getKey()), equalTo(step));
}
Map<String, LifecyclePolicy> registryPolicyMap = registry.getLifecyclePolicyMap();
Map<String, Step> registryFirstStepMap = registry.getFirstStepMap();
Map<String, Map<Step.StepKey, Step>> registryStepMap = registry.getStepMap();
registry.update(currentState, null, () -> 0L);
assertThat(registry.getLifecyclePolicyMap(), equalTo(registryPolicyMap));
assertThat(registry.getFirstStepMap(), equalTo(registryFirstStepMap));
assertThat(registry.getStepMap(), equalTo(registryStepMap));
// remove policy
currentState = ClusterState.builder(currentState)
.metaData(
MetaData.builder(metaData)
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(Collections.emptyMap()))).build();
registry.update(currentState, null, () -> 0L);
assertTrue(registry.getLifecyclePolicyMap().isEmpty());
assertTrue(registry.getFirstStepMap().isEmpty());
assertTrue(registry.getStepMap().isEmpty());
}
public void testUpdateChangedPolicy() {
String policyName = randomAlphaOfLengthBetween(5, 10);
LifecyclePolicy newPolicy = randomLifecyclePolicy(policyName);
List<Step> policySteps = newPolicy.toSteps(null, () -> 0L);
Map<String, LifecyclePolicy> policyMap = Collections.singletonMap(newPolicy.getName(), newPolicy);
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build())
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap))
.build();
String nodeId = randomAlphaOfLength(10);
DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT)
.put(Node.NODE_MASTER_SETTING.getKey(), true).build(),
new TransportAddress(TransportAddress.META_ADDRESS, 9300), nodeId);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
PolicyStepsRegistry registry = new PolicyStepsRegistry();
// add new policy
registry.update(currentState, null, () -> 0L);
// swap out policy
newPolicy = randomLifecyclePolicy(policyName);
currentState = ClusterState.builder(currentState)
.metaData(
MetaData.builder(metaData)
.putCustom(IndexLifecycleMetadata.TYPE,
new IndexLifecycleMetadata(Collections.singletonMap(policyName, newPolicy)))).build();
registry.update(currentState, null, () -> 0L);
// TODO(talevy): assert changes... right now we do not support updates to policies. will require internal cleanup
}
}