add _retry API to index lifecycle policies (#30769)
This commit is contained in:
parent
bdf70e4f2f
commit
2f2832db04
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.IndicesOptions.Option;
|
||||
import org.elasticsearch.action.support.IndicesOptions.WildcardStates;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Objects;
|
||||
|
||||
public class RetryAction extends Action<RetryAction.Request, RetryAction.Response, RetryAction.RequestBuilder> {
|
||||
public static final RetryAction INSTANCE = new RetryAction();
|
||||
public static final String NAME = "indices:admin/xpack/index_lifecycle/_retry/post";
|
||||
|
||||
protected RetryAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new RequestBuilder(client, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
|
||||
super(client, action, new Request());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse implements ToXContentObject {
|
||||
|
||||
public Response() {
|
||||
}
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
readAcknowledged(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
writeAcknowledged(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(isAcknowledged());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
Response other = (Response) obj;
|
||||
return Objects.equals(isAcknowledged(), other.isAcknowledged());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this, true, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
|
||||
private String[] indices;
|
||||
|
||||
public Request(String... indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
|
||||
public Request() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Request indices(String... indices) {
|
||||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
// Re-run should only resolve to open concrete indices (not aliases)
|
||||
return new IndicesOptions(EnumSet.of(Option.IGNORE_ALIASES), EnumSet.of(WildcardStates.OPEN));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
this.indices = in.readStringArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeStringArray(indices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Arrays.hashCode(indices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
Request other = (Request) obj;
|
||||
return Arrays.equals(indices, other.indices);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Request;
|
||||
|
||||
public class ReRunRequestTests extends AbstractStreamableTestCase<Request> {
|
||||
|
||||
@Override
|
||||
protected Request createTestInstance() {
|
||||
String[] indices = new String[randomIntBetween(1, 10)];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
indices[i] = randomAlphaOfLengthBetween(2, 5);
|
||||
}
|
||||
return new Request(indices);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request createBlankInstance() {
|
||||
return new Request();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Response;
|
||||
|
||||
public class ReRunResponseTests extends AbstractStreamableTestCase<Response> {
|
||||
|
||||
@Override
|
||||
protected Response createTestInstance() {
|
||||
return new Response(randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response createBlankInstance() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response mutateInstance(Response response) {
|
||||
return new Response(response.isAcknowledged() == false);
|
||||
}
|
||||
|
||||
}
|
|
@ -38,14 +38,17 @@ import org.elasticsearch.xpack.core.indexlifecycle.action.DeleteLifecycleAction;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestMoveToStepAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestPutLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestRetryAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportDeleteLifcycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportGetLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportMoveToStepAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportPutLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportRetryAction;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
|
@ -140,7 +143,9 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
new RestPutLifecycleAction(settings, restController),
|
||||
new RestGetLifecycleAction(settings, restController),
|
||||
new RestDeleteLifecycleAction(settings, restController),
|
||||
new RestMoveToStepAction(settings, restController));
|
||||
new RestMoveToStepAction(settings, restController),
|
||||
new RestRetryAction(settings, restController)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -152,7 +157,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
new ActionHandler<>(PutLifecycleAction.INSTANCE, TransportPutLifecycleAction.class),
|
||||
new ActionHandler<>(GetLifecycleAction.INSTANCE, TransportGetLifecycleAction.class),
|
||||
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifcycleAction.class),
|
||||
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class));
|
||||
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class),
|
||||
new ActionHandler<>(RetryAction.INSTANCE, TransportRetryAction.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class IndexLifecycleRunner {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleRunner.class);
|
||||
|
@ -127,7 +128,7 @@ public class IndexLifecycleRunner {
|
|||
* @param indexSettings
|
||||
* the index settings to extract the {@link StepKey} from.
|
||||
*/
|
||||
static StepKey getCurrentStepKey(Settings indexSettings) {
|
||||
public static StepKey getCurrentStepKey(Settings indexSettings) {
|
||||
String currentPhase = LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(indexSettings);
|
||||
String currentAction = LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(indexSettings);
|
||||
String currentStep = LifecycleSettings.LIFECYCLE_STEP_SETTING.get(indexSettings);
|
||||
|
@ -199,13 +200,35 @@ public class IndexLifecycleRunner {
|
|||
return newClusterStateBuilder.build();
|
||||
}
|
||||
|
||||
ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
|
||||
ClusterState newState = currentState;
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
throw new IllegalArgumentException("index [" + index + "] does not exist");
|
||||
}
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(indexMetaData.getSettings());
|
||||
String failedStep = LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.get(indexMetaData.getSettings());
|
||||
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName())
|
||||
&& Strings.isNullOrEmpty(failedStep) == false) {
|
||||
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
|
||||
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry);
|
||||
} else {
|
||||
throw new IllegalArgumentException("cannot retry an action for an index ["
|
||||
+ index + "] that has not encountered an error when running a Lifecycle Policy");
|
||||
}
|
||||
}
|
||||
return newState;
|
||||
}
|
||||
|
||||
private static Settings.Builder moveIndexSettingsToNextStep(Settings existingSettings, StepKey currentStep, StepKey nextStep,
|
||||
LongSupplier nowSupplier) {
|
||||
long nowAsMillis = nowSupplier.getAsLong();
|
||||
Settings.Builder newSettings = Settings.builder().put(existingSettings).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, nowAsMillis)
|
||||
// clear any step info from the current step
|
||||
// clear any step info or error-related settings from the current step
|
||||
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, (String) null)
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, (String) null);
|
||||
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
|
||||
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowAsMillis);
|
||||
|
|
|
@ -67,6 +67,10 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
nowSupplier, policyRegistry);
|
||||
}
|
||||
|
||||
public ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
|
||||
return lifecycleRunner.moveClusterStateToFailedStep(currentState, indices);
|
||||
}
|
||||
|
||||
SchedulerEngine getScheduler() {
|
||||
return scheduler.get();
|
||||
}
|
||||
|
@ -75,6 +79,14 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
return scheduledJob;
|
||||
}
|
||||
|
||||
public LongSupplier getNowSupplier() {
|
||||
return nowSupplier;
|
||||
}
|
||||
|
||||
public PolicyStepsRegistry getPolicyRegistry() {
|
||||
return policyRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
|
||||
|
||||
public class RestRetryAction extends BaseRestHandler {
|
||||
|
||||
public RestRetryAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.POST, IndexLifecycle.BASE_PATH + "_retry/{index}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "xpack_lifecycle_retry_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||
String[] indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
|
||||
RetryAction.Request request = new RetryAction.Request(indices);
|
||||
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(RetryAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Request;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Response;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleService;
|
||||
|
||||
public class TransportRetryAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
IndexLifecycleService indexLifecycleService;
|
||||
|
||||
@Inject
|
||||
public TransportRetryAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
IndexLifecycleService indexLifecycleService) {
|
||||
super(settings, RetryAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
|
||||
Request::new);
|
||||
this.indexLifecycleService = indexLifecycleService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) {
|
||||
clusterService.submitStateUpdateTask("ilm-re-run",
|
||||
new AckedClusterStateUpdateTask<Response>(request, listener) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return indexLifecycleService.moveClusterStateToFailedStep(currentState, request.indices());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse(boolean acknowledged) {
|
||||
return new Response(acknowledged);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
|
||||
|
@ -675,6 +674,82 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
assertClusterStateOnErrorStep(clusterState, index, currentStep, newClusterState, cause, now);
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToFailedStep() {
|
||||
String indexName = "my_index";
|
||||
String[] indices = new String[] { indexName };
|
||||
String policyName = "my_policy";
|
||||
long now = randomNonNegativeLong();
|
||||
StepKey failedStepKey = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey errorStepKey = new StepKey(failedStepKey.getPhase(), failedStepKey.getAction(), ErrorStep.NAME);
|
||||
Step step = new MockStep(failedStepKey, null);
|
||||
PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, errorStepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, errorStepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, failedStepKey.getName())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, errorStepKey.getName());
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now);
|
||||
ClusterState nextClusterState = runner.moveClusterStateToFailedStep(clusterState, indices);
|
||||
IndexLifecycleRunnerTests.assertClusterStateOnNextStep(clusterState, index, errorStepKey, failedStepKey,
|
||||
nextClusterState, now);
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToFailedStepIndexNotFound() {
|
||||
String existingIndexName = "my_index";
|
||||
String invalidIndexName = "does_not_exist";
|
||||
ClusterState clusterState = buildClusterState(existingIndexName, Settings.builder());
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(null, null, () -> 0L);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> runner.moveClusterStateToFailedStep(clusterState, new String[] { invalidIndexName }));
|
||||
assertThat(exception.getMessage(), equalTo("index [" + invalidIndexName + "] does not exist"));
|
||||
}
|
||||
//
|
||||
public void testMoveClusterStateToFailedStepInvalidPolicySetting() {
|
||||
String indexName = "my_index";
|
||||
String[] indices = new String[] { indexName };
|
||||
String policyName = "my_policy";
|
||||
long now = randomNonNegativeLong();
|
||||
StepKey failedStepKey = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey errorStepKey = new StepKey(failedStepKey.getPhase(), failedStepKey.getAction(), ErrorStep.NAME);
|
||||
Step step = new MockStep(failedStepKey, null);
|
||||
PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, (String) null)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, errorStepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, errorStepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, failedStepKey.getName())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, errorStepKey.getName());
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> runner.moveClusterStateToFailedStep(clusterState, indices));
|
||||
assertThat(exception.getMessage(), equalTo("index [" + indexName + "] is not associated with an Index Lifecycle Policy"));
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToFailedNotOnError() {
|
||||
String indexName = "my_index";
|
||||
String[] indices = new String[] { indexName };
|
||||
String policyName = "my_policy";
|
||||
long now = randomNonNegativeLong();
|
||||
StepKey failedStepKey = new StepKey("current_phase", "current_action", "current_step");
|
||||
Step step = new MockStep(failedStepKey, null);
|
||||
PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, (String) null)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, failedStepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, failedStepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, failedStepKey.getName());
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> runner.moveClusterStateToFailedStep(clusterState, indices));
|
||||
assertThat(exception.getMessage(), equalTo("cannot retry an action for an index [" + indices[0]
|
||||
+ "] that has not encountered an error when running a Lifecycle Policy"));
|
||||
}
|
||||
|
||||
public void testAddStepInfoToClusterState() throws IOException {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
|
@ -716,7 +791,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
return ClusterState.builder(new ClusterName("my_cluster")).metaData(metadata).build();
|
||||
}
|
||||
|
||||
private void assertClusterStateOnNextStep(ClusterState oldClusterState, Index index, StepKey currentStep, StepKey nextStep,
|
||||
public static void assertClusterStateOnNextStep(ClusterState oldClusterState, Index index, StepKey currentStep, StepKey nextStep,
|
||||
ClusterState newClusterState, long now) {
|
||||
assertNotSame(oldClusterState, newClusterState);
|
||||
MetaData newMetadata = newClusterState.metaData();
|
||||
|
@ -741,7 +816,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
|
||||
assertFalse(LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.exists(newIndexSettings));
|
||||
assertEquals("", LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.get(newIndexSettings));
|
||||
assertEquals("", LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newIndexSettings));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"xpack.index_lifecycle.retry": {
|
||||
"documentation": "http://www.elastic.co/guide/en/index_lifecycle/current/index_lifecycle.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/index_lifecycle/_retry/{index}",
|
||||
"paths": ["/_xpack/index_lifecycle/_retry/{index}"],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type" : "string",
|
||||
"description" : "The name of the indices (comma-separated) whose failed lifecycle step is to be retry"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
---
|
||||
setup:
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: yellow
|
||||
|
||||
- do:
|
||||
acknowlege: true
|
||||
xpack.index_lifecycle.put_lifecycle:
|
||||
lifecycle: "my_lifecycle"
|
||||
body: |
|
||||
{
|
||||
"policy": {
|
||||
"type": "timeseries",
|
||||
"phases": {
|
||||
"warm": {
|
||||
"after": "1000s",
|
||||
"actions": {
|
||||
"forcemerge": {
|
||||
"max_num_segments": 10000
|
||||
}
|
||||
}
|
||||
},
|
||||
"hot": {
|
||||
"after": "1000s",
|
||||
"actions": { }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_lifecycle"
|
||||
|
||||
---
|
||||
teardown:
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.delete:
|
||||
index: my_index
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.delete_lifecycle:
|
||||
lifecycle: "my_lifecycle"
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_lifecycle"
|
||||
|
||||
---
|
||||
"Test Invalid Retry With Non-errored Policy":
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: my_index
|
||||
body:
|
||||
settings:
|
||||
index.lifecycle.name: "my_lifecycle"
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
xpack.index_lifecycle.retry:
|
||||
index: "my_index"
|
||||
- match: { error.root_cause.0.type: "illegal_argument_exception" }
|
||||
- match: { error.root_cause.0.reason: "cannot retry an action for an index [my_index] that has not encountered an error when running a Lifecycle Policy" }
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.get:
|
||||
index: my_index
|
||||
- match: { my_index.settings.index.lifecycle.name: "my_lifecycle" }
|
||||
- match: { my_index.settings.index.lifecycle.step: "after" }
|
||||
- match: { my_index.settings.index.lifecycle.action: "after" }
|
||||
- match: { my_index.settings.index.lifecycle.phase: "new" }
|
||||
|
||||
|
||||
---
|
||||
"Test Invalid Retry With No Policy":
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: my_index
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
xpack.index_lifecycle.retry:
|
||||
index: "my_index"
|
||||
- match: { error.root_cause.0.type: "illegal_argument_exception" }
|
||||
- match: { error.root_cause.0.reason: "cannot retry an action for an index [my_index] that has not encountered an error when running a Lifecycle Policy" }
|
||||
|
||||
---
|
||||
"Test Invalid Re-run With Invalid Index":
|
||||
- do:
|
||||
indices.create:
|
||||
index: my_index
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
xpack.index_lifecycle.retry:
|
||||
index: "does_not_exist"
|
||||
- match: { error.root_cause.0.type: "illegal_argument_exception" }
|
||||
- match: { error.root_cause.0.reason: "index [does_not_exist] does not exist" }
|
Loading…
Reference in New Issue