mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-05 20:48:22 +00:00
add move-to-step endpoint for lifecycles (#30636)
This change introduces a new rest endpoint for lifecycles that allows users to explicitely jump to earlier or later steps in the policy's execution. This is useful for re-running tasks that may be stuck, or were incorrectly configured. Endpoint can be found in this format: POST _xpack/index_lifecycle/_move/<index_name> { current_step: ... next_step: ... } This operates on a per-index basis and does not resolve the param to multiple indices. The action is validated so that the index's state is only modified if all of the following are true: - <index_name> has an existing policy associated with it - current_step is the actual step the index is currently on (for sanity) - next_step is a valid step within the policy-step-registry * respond to reviewer refactor to stop using MoveToNextStepUpdateTask directly * remove getPolicyRegistry * rename validateMoveToNextStep
This commit is contained in:
parent
190a7efd2a
commit
ca9f307b0a
@ -5,7 +5,17 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import java.util.Locale;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
@ -42,7 +52,7 @@ public abstract class Step {
|
||||
return false;
|
||||
}
|
||||
Step other = (Step) obj;
|
||||
return Objects.equals(key, other.key) &&
|
||||
return Objects.equals(key, other.key) &&
|
||||
Objects.equals(nextStepKey, other.nextStepKey);
|
||||
}
|
||||
|
||||
@ -51,17 +61,45 @@ public abstract class Step {
|
||||
return key + " => " + nextStepKey;
|
||||
}
|
||||
|
||||
public static final class StepKey {
|
||||
public static final class StepKey implements Writeable, ToXContentObject {
|
||||
private final String phase;
|
||||
private final String action;
|
||||
private final String name;
|
||||
|
||||
public static final ParseField PHASE_FIELD = new ParseField("phase");
|
||||
public static final ParseField ACTION_FIELD = new ParseField("action");
|
||||
public static final ParseField NAME_FIELD = new ParseField("name");
|
||||
private static final ConstructingObjectParser<StepKey, Void> PARSER =
|
||||
new ConstructingObjectParser<>("stepkey", a -> new StepKey((String) a[0], (String) a[1], (String) a[2]));
|
||||
static {
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), PHASE_FIELD);
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), ACTION_FIELD);
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
|
||||
}
|
||||
|
||||
public StepKey(String phase, String action, String name) {
|
||||
this.phase = phase;
|
||||
this.action = action;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public StepKey(StreamInput in) throws IOException {
|
||||
this.phase = in.readString();
|
||||
this.action = in.readString();
|
||||
this.name = in.readString();
|
||||
}
|
||||
|
||||
public static StepKey parse(XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(phase);
|
||||
out.writeString(action);
|
||||
out.writeString(name);
|
||||
}
|
||||
|
||||
public String getPhase() {
|
||||
return phase;
|
||||
}
|
||||
@ -88,14 +126,24 @@ public abstract class Step {
|
||||
return false;
|
||||
}
|
||||
StepKey other = (StepKey) obj;
|
||||
return Objects.equals(phase, other.phase) &&
|
||||
Objects.equals(action, other.action) &&
|
||||
return Objects.equals(phase, other.phase) &&
|
||||
Objects.equals(action, other.action) &&
|
||||
Objects.equals(name, other.name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format(Locale.ROOT, "[%s][%s][%s]", phase, action, name);
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(PHASE_FIELD.getPreferredName(), phase);
|
||||
builder.field(ACTION_FIELD.getPreferredName(), action);
|
||||
builder.field(NAME_FIELD.getPreferredName(), name);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,193 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class MoveToStepAction extends Action<MoveToStepAction.Request, MoveToStepAction.Response, MoveToStepAction.RequestBuilder> {
|
||||
public static final MoveToStepAction INSTANCE = new MoveToStepAction();
|
||||
public static final String NAME = "cluster:admin/xpack/index_lifecycle/_move/post";
|
||||
|
||||
protected MoveToStepAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new RequestBuilder(client, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
|
||||
super(client, action, new Request());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse implements ToXContentObject {
|
||||
|
||||
public Response() {
|
||||
}
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
readAcknowledged(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
writeAcknowledged(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(isAcknowledged());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
Response other = (Response) obj;
|
||||
return Objects.equals(isAcknowledged(), other.isAcknowledged());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this, true, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
|
||||
static final ParseField CURRENT_KEY_FIELD = new ParseField("current_step");
|
||||
static final ParseField NEXT_KEY_FIELD = new ParseField("next_step");
|
||||
private static final ConstructingObjectParser<Request, String> PARSER =
|
||||
new ConstructingObjectParser<>("move_to_step_request", false,
|
||||
(a, index) -> {
|
||||
StepKey currentStepKey = (StepKey) a[0];
|
||||
StepKey nextStepKey = (StepKey) a[1];
|
||||
return new Request(index, currentStepKey, nextStepKey);
|
||||
});
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, name) -> StepKey.parse(p), CURRENT_KEY_FIELD);
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, name) -> StepKey.parse(p), NEXT_KEY_FIELD);
|
||||
}
|
||||
|
||||
private String index;
|
||||
private StepKey currentStepKey;
|
||||
private StepKey nextStepKey;
|
||||
|
||||
public Request(String index, StepKey currentStepKey, StepKey nextStepKey) {
|
||||
this.index = index;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.nextStepKey = nextStepKey;
|
||||
}
|
||||
|
||||
public Request() {
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public StepKey getCurrentStepKey() {
|
||||
return currentStepKey;
|
||||
}
|
||||
|
||||
public StepKey getNextStepKey() {
|
||||
return nextStepKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Request parseRequest(String name, XContentParser parser) {
|
||||
return PARSER.apply(parser, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
this.index = in.readString();
|
||||
this.currentStepKey = new StepKey(in);
|
||||
this.nextStepKey = new StepKey(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
currentStepKey.writeTo(out);
|
||||
nextStepKey.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(index, currentStepKey, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
Request other = (Request) obj;
|
||||
return Objects.equals(index, other.index) && Objects.equals(currentStepKey, other.currentStepKey)
|
||||
&& Objects.equals(nextStepKey, other.nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
return builder.startObject()
|
||||
.field(CURRENT_KEY_FIELD.getPreferredName(), currentStepKey)
|
||||
.field(NEXT_KEY_FIELD.getPreferredName(), nextStepKey)
|
||||
.endObject();
|
||||
}
|
||||
}
|
||||
}
|
@ -6,16 +6,29 @@
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.EqualsHashCodeTestUtils;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
|
||||
public class StepKeyTests extends ESTestCase {
|
||||
public class StepKeyTests extends AbstractSerializingTestCase<StepKey> {
|
||||
|
||||
public StepKey createRandomInstance() {
|
||||
@Override
|
||||
public StepKey createTestInstance() {
|
||||
return new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<StepKey> instanceReader() {
|
||||
return StepKey::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StepKey doParseInstance(XContentParser parser) {
|
||||
return StepKey.parse(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StepKey mutateInstance(StepKey instance) {
|
||||
String phase = instance.getPhase();
|
||||
String action = instance.getAction();
|
||||
@ -37,11 +50,4 @@ public class StepKeyTests extends ESTestCase {
|
||||
|
||||
return new StepKey(phase, action, step);
|
||||
}
|
||||
|
||||
public void testHashcodeAndEquals() {
|
||||
for (int runs = 0; runs < 20; runs++) {
|
||||
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createRandomInstance(),
|
||||
instance -> new StepKey(instance.getPhase(), instance.getAction(), instance.getName()), this::mutateInstance);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.StepKeyTests;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction.Request;
|
||||
import org.junit.Before;
|
||||
|
||||
public class MoveToStepRequestTests extends AbstractStreamableXContentTestCase<Request> {
|
||||
|
||||
private String index;
|
||||
private static final StepKeyTests stepKeyTests = new StepKeyTests();
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
index = randomAlphaOfLength(5);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request createTestInstance() {
|
||||
return new Request(index, stepKeyTests.createTestInstance(), stepKeyTests.createTestInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request createBlankInstance() {
|
||||
return new Request();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request doParseInstance(XContentParser parser) {
|
||||
return Request.parseRequest(index, parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request mutateInstance(Request request) {
|
||||
String index = request.getIndex();
|
||||
StepKey currentStepKey = request.getCurrentStepKey();
|
||||
StepKey nextStepKey = request.getNextStepKey();
|
||||
|
||||
switch (between(0, 2)) {
|
||||
case 0:
|
||||
index += randomAlphaOfLength(5);
|
||||
break;
|
||||
case 1:
|
||||
currentStepKey = stepKeyTests.mutateInstance(currentStepKey);
|
||||
break;
|
||||
case 2:
|
||||
nextStepKey = stepKeyTests.mutateInstance(nextStepKey);
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Illegal randomisation branch");
|
||||
}
|
||||
|
||||
return new Request(index, currentStepKey, nextStepKey);
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction.Response;
|
||||
|
||||
public class MoveToStepResponseTests extends AbstractStreamableTestCase<Response> {
|
||||
|
||||
@Override
|
||||
protected Response createTestInstance() {
|
||||
return new Response(randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response createBlankInstance() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response mutateInstance(Response response) {
|
||||
return new Response(response.isAcknowledged() == false);
|
||||
}
|
||||
|
||||
}
|
@ -36,12 +36,15 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.DeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestMoveToStepAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestPutLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportDeleteLifcycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportGetLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportMoveToStepAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportPutLifecycleAction;
|
||||
|
||||
import java.time.Clock;
|
||||
@ -129,18 +132,26 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
||||
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
|
||||
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<DiscoveryNodes> nodesInCluster) {
|
||||
if (enabled == false) {
|
||||
return emptyList();
|
||||
}
|
||||
return Arrays.asList(
|
||||
new RestPutLifecycleAction(settings, restController),
|
||||
new RestGetLifecycleAction(settings, restController),
|
||||
new RestDeleteLifecycleAction(settings, restController));
|
||||
new RestDeleteLifecycleAction(settings, restController),
|
||||
new RestMoveToStepAction(settings, restController));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
|
||||
if (enabled == false) {
|
||||
return emptyList();
|
||||
}
|
||||
return Arrays.asList(
|
||||
new ActionHandler<>(PutLifecycleAction.INSTANCE, TransportPutLifecycleAction.class),
|
||||
new ActionHandler<>(GetLifecycleAction.INSTANCE, TransportGetLifecycleAction.class),
|
||||
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifcycleAction.class));
|
||||
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifcycleAction.class),
|
||||
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -60,7 +60,7 @@ public class IndexLifecycleRunner {
|
||||
} else if (currentStep instanceof AsyncWaitStep) {
|
||||
if (fromClusterStateChange == false) {
|
||||
((AsyncWaitStep) currentStep).evaluateCondition(indexMetaData.getIndex(), new AsyncWaitStep.Listener() {
|
||||
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet, ToXContentObject stepInfo) {
|
||||
logger.debug("cs-change-async-wait-callback. current-step:" + currentStep.getKey());
|
||||
@ -70,18 +70,18 @@ public class IndexLifecycleRunner {
|
||||
setStepInfo(indexMetaData.getIndex(), policy, currentStep.getKey(), stepInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e);
|
||||
}
|
||||
|
||||
|
||||
});
|
||||
}
|
||||
} else if (currentStep instanceof AsyncActionStep) {
|
||||
if (fromClusterStateChange == false) {
|
||||
((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() {
|
||||
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
logger.debug("cs-change-async-action-callback. current-step:" + currentStep.getKey());
|
||||
@ -89,7 +89,7 @@ public class IndexLifecycleRunner {
|
||||
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e);
|
||||
@ -118,7 +118,7 @@ public class IndexLifecycleRunner {
|
||||
* it is illegal for the step to be set with the phase and/or action unset,
|
||||
* or for the step to be unset with the phase and/or action set. All three
|
||||
* settings must be either present or missing.
|
||||
*
|
||||
*
|
||||
* @param indexSettings
|
||||
* the index settings to extract the {@link StepKey} from.
|
||||
*/
|
||||
@ -146,6 +146,31 @@ public class IndexLifecycleRunner {
|
||||
}
|
||||
}
|
||||
|
||||
static ClusterState moveClusterStateToStep(String indexName, ClusterState currentState, StepKey currentStepKey,
|
||||
StepKey nextStepKey, LongSupplier nowSupplier,
|
||||
PolicyStepsRegistry stepRegistry) {
|
||||
IndexMetaData idxMeta = currentState.getMetaData().index(indexName);
|
||||
Settings indexSettings = idxMeta.getSettings();
|
||||
String indexPolicySetting = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
||||
|
||||
// policy could be updated in-between execution
|
||||
if (Strings.isNullOrEmpty(indexPolicySetting)) {
|
||||
throw new IllegalArgumentException("index [" + indexName + "] is not associated with an Index Lifecycle Policy");
|
||||
}
|
||||
|
||||
if (currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexSettings)) == false) {
|
||||
throw new IllegalArgumentException("index [" + indexName + "] is not on current step [" + currentStepKey + "]");
|
||||
}
|
||||
|
||||
try {
|
||||
stepRegistry.getStep(indexPolicySetting, nextStepKey);
|
||||
} catch (IllegalStateException e) {
|
||||
throw new IllegalArgumentException(e.getMessage());
|
||||
}
|
||||
|
||||
return IndexLifecycleRunner.moveClusterStateToNextStep(idxMeta.getIndex(), currentState, currentStepKey, nextStepKey, nowSupplier);
|
||||
}
|
||||
|
||||
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
|
||||
LongSupplier nowSupplier) {
|
||||
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
||||
@ -209,7 +234,7 @@ public class IndexLifecycleRunner {
|
||||
logger.debug("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
|
||||
+ nextStepKey);
|
||||
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey,
|
||||
nextStepKey, nowSupplier, newState -> runPolicy(newState.getMetaData().index(index), newState)));
|
||||
nextStepKey, nowSupplier, stepRegistry, newState -> runPolicy(newState.getMetaData().index(index), newState)));
|
||||
}
|
||||
|
||||
private void moveToErrorStep(Index index, String policy, StepKey currentStepKey, Exception e) {
|
||||
|
@ -23,6 +23,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||
|
||||
import java.io.Closeable;
|
||||
@ -61,6 +62,11 @@ public class IndexLifecycleService extends AbstractComponent
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
public ClusterState moveClusterStateToStep(ClusterState currentState, String indexName, StepKey currentStepKey, StepKey nextStepKey) {
|
||||
return IndexLifecycleRunner.moveClusterStateToStep(indexName, currentState, currentStepKey, nextStepKey,
|
||||
nowSupplier, policyRegistry);
|
||||
}
|
||||
|
||||
SchedulerEngine getScheduler() {
|
||||
return scheduler.get();
|
||||
}
|
||||
@ -142,7 +148,7 @@ public class IndexLifecycleService extends AbstractComponent
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
public void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) {
|
||||
// loop through all indices in cluster state and filter for ones that are
|
||||
// managed by the Index Lifecycle Service they have a index.lifecycle.name setting
|
||||
|
@ -21,16 +21,18 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
|
||||
private final Step.StepKey currentStepKey;
|
||||
private final Step.StepKey nextStepKey;
|
||||
private final Listener listener;
|
||||
private LongSupplier nowSupplier;
|
||||
private final LongSupplier nowSupplier;
|
||||
private final PolicyStepsRegistry policyStepsRegistry;
|
||||
|
||||
public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey nextStepKey,
|
||||
LongSupplier nowSupplier, Listener listener) {
|
||||
LongSupplier nowSupplier, PolicyStepsRegistry policyStepsRegistry, Listener listener) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.nextStepKey = nextStepKey;
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.listener = listener;
|
||||
this.policyStepsRegistry = policyStepsRegistry;
|
||||
}
|
||||
|
||||
Index getIndex() {
|
||||
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RestMoveToStepAction extends BaseRestHandler {
|
||||
|
||||
public RestMoveToStepAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.POST, IndexLifecycle.BASE_PATH + "_move/{name}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "xpack_lifecycle_move_to_step_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
String index = restRequest.param("name");
|
||||
XContentParser parser = restRequest.contentParser();
|
||||
MoveToStepAction.Request request = MoveToStepAction.Request.parseRequest(index, parser);
|
||||
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(MoveToStepAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction.Request;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction.Response;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleService;
|
||||
|
||||
public class TransportMoveToStepAction extends TransportMasterNodeAction<Request, Response> {
|
||||
IndexLifecycleService indexLifecycleService;
|
||||
@Inject
|
||||
public TransportMoveToStepAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
IndexLifecycleService indexLifecycleService) {
|
||||
super(settings, MoveToStepAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
|
||||
Request::new);
|
||||
this.indexLifecycleService = indexLifecycleService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) {
|
||||
IndexMetaData indexMetaData = state.metaData().index(request.getIndex());
|
||||
if (indexMetaData == null) {
|
||||
listener.onFailure(new IllegalArgumentException("index [" + request.getIndex() + "] does not exist"));
|
||||
return;
|
||||
}
|
||||
clusterService.submitStateUpdateTask("index[" + request.getIndex() + "]-move-to-step",
|
||||
new AckedClusterStateUpdateTask<Response>(request, listener) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return indexLifecycleService.moveClusterStateToStep(currentState, request.getIndex(), request.getCurrentStepKey(),
|
||||
request.getNextStepKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse(boolean acknowledged) {
|
||||
return new Response(acknowledged);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
@ -42,6 +42,8 @@ import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
|
||||
private PolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step) {
|
||||
@ -332,7 +334,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
Settings indexSettings = Settings.EMPTY;
|
||||
StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(indexSettings);
|
||||
assertNull(stepKey);
|
||||
|
||||
|
||||
String phase = randomAlphaOfLength(20);
|
||||
String action = randomAlphaOfLength(20);
|
||||
String step = randomAlphaOfLength(20);
|
||||
@ -346,7 +348,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
assertEquals(phase, stepKey.getPhase());
|
||||
assertEquals(action, stepKey.getAction());
|
||||
assertEquals(step, stepKey.getName());
|
||||
|
||||
|
||||
phase = randomAlphaOfLength(20);
|
||||
action = randomAlphaOfLength(20);
|
||||
step = randomBoolean() ? null : "";
|
||||
@ -357,7 +359,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
.build();
|
||||
AssertionError error3 = expectThrows(AssertionError.class, () -> IndexLifecycleRunner.getCurrentStepKey(indexSettings3));
|
||||
assertEquals("Current phase is not empty: " + phase, error3.getMessage());
|
||||
|
||||
|
||||
phase = randomBoolean() ? null : "";
|
||||
action = randomAlphaOfLength(20);
|
||||
step = randomBoolean() ? null : "";
|
||||
@ -368,7 +370,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
.build();
|
||||
AssertionError error4 = expectThrows(AssertionError.class, () -> IndexLifecycleRunner.getCurrentStepKey(indexSettings4));
|
||||
assertEquals("Current action is not empty: " + action, error4.getMessage());
|
||||
|
||||
|
||||
phase = randomBoolean() ? null : "";
|
||||
action = randomAlphaOfLength(20);
|
||||
step = randomAlphaOfLength(20);
|
||||
@ -379,7 +381,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
.build();
|
||||
AssertionError error5 = expectThrows(AssertionError.class, () -> IndexLifecycleRunner.getCurrentStepKey(indexSettings5));
|
||||
assertEquals(null, error5.getMessage());
|
||||
|
||||
|
||||
phase = randomBoolean() ? null : "";
|
||||
action = randomBoolean() ? null : "";
|
||||
step = randomAlphaOfLength(20);
|
||||
@ -391,7 +393,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
AssertionError error6 = expectThrows(AssertionError.class, () -> IndexLifecycleRunner.getCurrentStepKey(indexSettings6));
|
||||
assertEquals(null, error6.getMessage());
|
||||
}
|
||||
|
||||
|
||||
public void testGetCurrentStep() {
|
||||
SortedMap<String, LifecyclePolicy> lifecyclePolicyMap = null; // Not used in the methods tested here
|
||||
String policyName = "policy_1";
|
||||
@ -491,13 +493,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
.build();
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class,
|
||||
() -> IndexLifecycleRunner.getCurrentStep(registry, policyName, invalidIndexSettings));
|
||||
assertEquals("step [[phase_1][action_1][step_3]] does not exist", exception.getMessage());
|
||||
assertEquals("step [{\"phase\":\"phase_1\",\"action\":\"action_1\",\"name\":\"step_3\"}] does not exist", exception.getMessage());
|
||||
|
||||
exception = expectThrows(IllegalStateException.class,
|
||||
() -> IndexLifecycleRunner.getCurrentStep(registry, "policy_does_not_exist", invalidIndexSettings));
|
||||
assertEquals("policy [policy_does_not_exist] does not exist", exception.getMessage());
|
||||
}
|
||||
|
||||
|
||||
public void testMoveClusterStateToNextStep() {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
@ -573,6 +575,89 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
}
|
||||
|
||||
public void testSuccessfulValidatedMoveClusterStateToNextStep() {
|
||||
String indexName = "my_index";
|
||||
String policyName = "my_policy";
|
||||
StepKey currentStepKey = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey nextStepKey = new StepKey("next_phase", "next_action", "next_step");
|
||||
long now = randomNonNegativeLong();
|
||||
Step step = new MockStep(nextStepKey, nextStepKey);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
|
||||
Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, currentStepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStepKey.getName());
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
||||
nextStepKey, () -> now, stepRegistry);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStepKey, nextStepKey, newClusterState, now);
|
||||
}
|
||||
|
||||
public void testValidatedMoveClusterStateToNextStepWithoutPolicy() {
|
||||
String indexName = "my_index";
|
||||
String policyName = randomBoolean() ? null : "";
|
||||
StepKey currentStepKey = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey nextStepKey = new StepKey("next_phase", "next_action", "next_step");
|
||||
long now = randomNonNegativeLong();
|
||||
Step step = new MockStep(nextStepKey, nextStepKey);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
|
||||
Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, currentStepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStepKey.getName());
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
||||
nextStepKey, () -> now, stepRegistry));
|
||||
assertThat(exception.getMessage(), equalTo("index [my_index] is not associated with an Index Lifecycle Policy"));
|
||||
}
|
||||
|
||||
public void testValidatedMoveClusterStateToNextStepInvalidCurrentStep() {
|
||||
String indexName = "my_index";
|
||||
String policyName = "my_policy";
|
||||
StepKey currentStepKey = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey notCurrentStepKey = new StepKey("not_current_phase", "not_current_action", "not_current_step");
|
||||
StepKey nextStepKey = new StepKey("next_phase", "next_action", "next_step");
|
||||
long now = randomNonNegativeLong();
|
||||
Step step = new MockStep(nextStepKey, nextStepKey);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
|
||||
Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, currentStepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStepKey.getName());
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, notCurrentStepKey,
|
||||
nextStepKey, () -> now, stepRegistry));
|
||||
assertThat(exception.getMessage(), equalTo("index [my_index] is not on current step " +
|
||||
"[{\"phase\":\"not_current_phase\",\"action\":\"not_current_action\",\"name\":\"not_current_step\"}]"));
|
||||
}
|
||||
|
||||
public void testValidatedMoveClusterStateToNextStepInvalidNextStep() {
|
||||
String indexName = "my_index";
|
||||
String policyName = "my_policy";
|
||||
StepKey currentStepKey = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey nextStepKey = new StepKey("next_phase", "next_action", "next_step");
|
||||
long now = randomNonNegativeLong();
|
||||
Step step = new MockStep(currentStepKey, nextStepKey);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
|
||||
Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, currentStepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStepKey.getName());
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
||||
nextStepKey, () -> now, stepRegistry));
|
||||
assertThat(exception.getMessage(),
|
||||
equalTo("step [{\"phase\":\"next_phase\",\"action\":\"next_action\",\"name\":\"next_step\"}] does not exist"));
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToErrorStep() throws IOException {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
@ -927,15 +1012,15 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
return false;
|
||||
}
|
||||
MoveToErrorStepUpdateTask task = (MoveToErrorStepUpdateTask) argument;
|
||||
return Objects.equals(index, task.getIndex()) &&
|
||||
Objects.equals(policy, task.getPolicy())&&
|
||||
return Objects.equals(index, task.getIndex()) &&
|
||||
Objects.equals(policy, task.getPolicy())&&
|
||||
Objects.equals(currentStepKey, task.getCurrentStepKey()) &&
|
||||
Objects.equals(cause.getClass(), task.getCause().getClass()) &&
|
||||
Objects.equals(cause.getMessage(), task.getCause().getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static class SetStepInfoUpdateTaskMatcher extends ArgumentMatcher<SetStepInfoUpdateTask> {
|
||||
|
||||
private Index index;
|
||||
@ -956,8 +1041,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
return false;
|
||||
}
|
||||
SetStepInfoUpdateTask task = (SetStepInfoUpdateTask) argument;
|
||||
return Objects.equals(index, task.getIndex()) &&
|
||||
Objects.equals(policy, task.getPolicy())&&
|
||||
return Objects.equals(index, task.getIndex()) &&
|
||||
Objects.equals(policy, task.getPolicy())&&
|
||||
Objects.equals(currentStepKey, task.getCurrentStepKey()) &&
|
||||
Objects.equals(stepInfo, task.getStepInfo());
|
||||
}
|
||||
|
@ -15,18 +15,29 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TestLifecycleType;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
||||
|
||||
String policy;
|
||||
ClusterState clusterState;
|
||||
Index index;
|
||||
PolicyStepsRegistry stepsRegistry;
|
||||
|
||||
@Before
|
||||
public void setupClusterState() {
|
||||
@ -41,6 +52,15 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
||||
.put(IndexMetaData.builder(indexMetadata))
|
||||
.build();
|
||||
clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
|
||||
|
||||
|
||||
Step currentStep = new MockStep(new StepKey("current-phase", "current-action", "current-name"), null);
|
||||
Step nextStep = new MockStep(new StepKey("next-phase", "next-action", "next-name"), null);
|
||||
Map<StepKey, Step> stepMap = new HashMap<>();
|
||||
stepMap.put(currentStep.getKey(), currentStep);
|
||||
stepMap.put(nextStep.getKey(), nextStep);
|
||||
Map<String, Map<Step.StepKey, Step>> policyMap = Collections.singletonMap(policy, stepMap);
|
||||
stepsRegistry = new PolicyStepsRegistry(null, null, policyMap);
|
||||
}
|
||||
|
||||
public void testExecuteSuccessfullyMoved() {
|
||||
@ -52,7 +72,8 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
||||
|
||||
SetOnce<Boolean> changed = new SetOnce<>();
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, listener);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now,
|
||||
stepsRegistry, listener);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(actualKey, equalTo(nextStepKey));
|
||||
@ -63,27 +84,50 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
||||
assertTrue(changed.get());
|
||||
}
|
||||
|
||||
public void testExecuteNoopifferentStep() {
|
||||
public void testExecuteDifferentCurrentStep() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current");
|
||||
long now = randomNonNegativeLong();
|
||||
setStateToKey(notCurrentStepKey);
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> {
|
||||
};
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now,
|
||||
stepsRegistry, listener);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
assertSame(newState, clusterState);
|
||||
}
|
||||
|
||||
public void testExecuteNoopDifferentPolicy() {
|
||||
public void testExecuteDifferentPolicy() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
setStateToKey(currentStepKey);
|
||||
setStatePolicy("not-" + policy);
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> {};
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now,
|
||||
stepsRegistry, listener);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
assertSame(newState, clusterState);
|
||||
}
|
||||
|
||||
public void testExecuteSuccessfulMoveWithInvalidNextStep() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey invalidNextStep = new StepKey("next-invalid", "next-invalid", "next-invalid");
|
||||
long now = randomNonNegativeLong();
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
SetOnce<Boolean> changed = new SetOnce<>();
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, invalidNextStep, () -> now,
|
||||
stepsRegistry, listener);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(actualKey, equalTo(invalidNextStep));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
task.clusterStateProcessed("source", clusterState, newState);
|
||||
assertTrue(changed.get());
|
||||
}
|
||||
|
||||
public void testClusterProcessedWithNoChange() {
|
||||
@ -92,7 +136,8 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
||||
setStateToKey(currentStepKey);
|
||||
SetOnce<Boolean> changed = new SetOnce<>();
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, listener);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now,
|
||||
stepsRegistry, listener);
|
||||
task.clusterStateProcessed("source", clusterState, clusterState);
|
||||
assertNull(changed.get());
|
||||
}
|
||||
@ -106,7 +151,8 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase {
|
||||
|
||||
SetOnce<Boolean> changed = new SetOnce<>();
|
||||
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, listener);
|
||||
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now,
|
||||
stepsRegistry, listener);
|
||||
Exception expectedException = new RuntimeException();
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
|
||||
|
@ -0,0 +1,21 @@
|
||||
{
|
||||
"xpack.index_lifecycle.move_to_step": {
|
||||
"documentation": "http://www.elastic.co/guide/en/index_lifecycle/current/index_lifecycle.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/index_lifecycle/_move/{index}",
|
||||
"paths": ["/_xpack/index_lifecycle/_move/{index}"],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type" : "string",
|
||||
"description" : "The name of the index whose lifecycle step is to change"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
}
|
||||
},
|
||||
"body": {
|
||||
"description": "The new lifecycle step to move to"
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,188 @@
|
||||
---
|
||||
setup:
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: yellow
|
||||
- do:
|
||||
acknowlege: true
|
||||
xpack.index_lifecycle.put_lifecycle:
|
||||
lifecycle: "my_moveable_timeseries_lifecycle"
|
||||
body: |
|
||||
{
|
||||
"policy": {
|
||||
"type": "timeseries",
|
||||
"phases": {
|
||||
"warm": {
|
||||
"after": "1000s",
|
||||
"actions": {
|
||||
"forcemerge": {
|
||||
"max_num_segments": 10000
|
||||
}
|
||||
}
|
||||
},
|
||||
"hot": {
|
||||
"after": "1000s",
|
||||
"actions": { }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: my_index
|
||||
body:
|
||||
settings:
|
||||
index.lifecycle.name: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: my_index_no_policy
|
||||
|
||||
---
|
||||
teardown:
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.delete:
|
||||
index: my_index
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.delete:
|
||||
index: my_index_no_policy
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.delete_lifecycle:
|
||||
lifecycle: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
---
|
||||
"Test Basic Move To Step":
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.move_to_step:
|
||||
index: "my_index"
|
||||
body:
|
||||
current_step:
|
||||
phase: "hot"
|
||||
action: "pre-pre-readonly"
|
||||
name: "after"
|
||||
next_step:
|
||||
phase: "warm"
|
||||
action: "forcemerge"
|
||||
name: "forcemerge"
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.get:
|
||||
index: my_index
|
||||
- match: { my_index.settings.index.lifecycle.name: "my_moveable_timeseries_lifecycle" }
|
||||
- match: { my_index.settings.index.lifecycle.step: "forcemerge" }
|
||||
- match: { my_index.settings.index.lifecycle.action: "forcemerge" }
|
||||
- match: { my_index.settings.index.lifecycle.phase: "warm" }
|
||||
|
||||
---
|
||||
"Test Invalid Move To Step With Incorrect Current Step":
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
xpack.index_lifecycle.move_to_step:
|
||||
index: "my_index"
|
||||
body:
|
||||
current_step:
|
||||
phase: "warm"
|
||||
action: "forcemerge"
|
||||
name: "forcemerge"
|
||||
next_step:
|
||||
phase: "warm"
|
||||
action: "forcemerge"
|
||||
name: "forcemerge"
|
||||
- match: { error.root_cause.0.type: "illegal_argument_exception" }
|
||||
- match: { error.root_cause.0.reason: "index [my_index] is not on current step [{\"phase\":\"warm\",\"action\":\"forcemerge\",\"name\":\"forcemerge\"}]" }
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.get:
|
||||
index: my_index
|
||||
- match: { my_index.settings.index.lifecycle.name: "my_moveable_timeseries_lifecycle" }
|
||||
- match: { my_index.settings.index.lifecycle.step: "after" }
|
||||
- match: { my_index.settings.index.lifecycle.action: "pre-pre-readonly" }
|
||||
- match: { my_index.settings.index.lifecycle.phase: "hot" }
|
||||
|
||||
---
|
||||
"Test Invalid Move To Step With Invalid Next Step":
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
xpack.index_lifecycle.move_to_step:
|
||||
index: "my_index"
|
||||
body:
|
||||
current_step:
|
||||
phase: "hot"
|
||||
action: "pre-pre-readonly"
|
||||
name: "after"
|
||||
next_step:
|
||||
phase: "invalid"
|
||||
action: "invalid"
|
||||
name: "invalid"
|
||||
- match: { error.root_cause.0.type: "illegal_argument_exception" }
|
||||
- match: { error.root_cause.0.reason: "step [{\"phase\":\"invalid\",\"action\":\"invalid\",\"name\":\"invalid\"}] does not exist" }
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.get:
|
||||
index: my_index
|
||||
- match: { my_index.settings.index.lifecycle.name: "my_moveable_timeseries_lifecycle" }
|
||||
- match: { my_index.settings.index.lifecycle.step: "after" }
|
||||
- match: { my_index.settings.index.lifecycle.action: "pre-pre-readonly" }
|
||||
- match: { my_index.settings.index.lifecycle.phase: "hot" }
|
||||
|
||||
---
|
||||
"Test Invalid Move To Step With Invalid Policy":
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
xpack.index_lifecycle.move_to_step:
|
||||
index: "my_index_no_policy"
|
||||
body:
|
||||
current_step:
|
||||
phase: "hot"
|
||||
action: "pre-pre-readonly"
|
||||
name: "after"
|
||||
next_step:
|
||||
phase: "warm"
|
||||
action: "forcemerge"
|
||||
name: "forcemerge"
|
||||
- match: { error.root_cause.0.type: "illegal_argument_exception" }
|
||||
- match: { error.root_cause.0.reason: "index [my_index_no_policy] is not associated with an Index Lifecycle Policy" }
|
||||
|
||||
---
|
||||
"Test Invalid Move To Step With Invalid Index":
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
xpack.index_lifecycle.move_to_step:
|
||||
index: "does_not_exist"
|
||||
body:
|
||||
current_step:
|
||||
phase: "hot"
|
||||
action: "pre-pre-readonly"
|
||||
name: "after"
|
||||
next_step:
|
||||
phase: "warm"
|
||||
action: "forcemerge"
|
||||
name: "forcemerge"
|
||||
- match: { error.root_cause.0.type: "illegal_argument_exception" }
|
||||
- match: { error.root_cause.0.reason: "index [does_not_exist] does not exist" }
|
Loading…
x
Reference in New Issue
Block a user