[ILM] Fix Move To Step API causing ILM to hang (#34618)
The Move To Step API now checks to see if the target step is an AsyncActionStep, and if so, runs it. Previously, AsyncActionSteps would only be run when they are entered by executing the previous step, so if an AsyncActionStep was entered via the Move To Step API, ILM would never touch that index again.
This commit is contained in:
parent
5c88356ad6
commit
f6ac0e4bbc
|
@ -77,26 +77,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));
|
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));
|
||||||
|
|
||||||
// create policy
|
// create policy
|
||||||
Map<String, LifecycleAction> warmActions = new HashMap<>();
|
createFullPolicy(TimeValue.ZERO);
|
||||||
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1));
|
|
||||||
warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "node-1,node-2"), null, null));
|
|
||||||
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
|
|
||||||
Map<String, Phase> phases = new HashMap<>();
|
|
||||||
phases.put("hot", new Phase("hot", TimeValue.ZERO, singletonMap(RolloverAction.NAME,
|
|
||||||
new RolloverAction(null, null, 1L))));
|
|
||||||
phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions));
|
|
||||||
phases.put("cold", new Phase("cold", TimeValue.ZERO, singletonMap(AllocateAction.NAME,
|
|
||||||
new AllocateAction(0, singletonMap("_name", "node-3"), null, null))));
|
|
||||||
phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction())));
|
|
||||||
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases);
|
|
||||||
// PUT policy
|
|
||||||
XContentBuilder builder = jsonBuilder();
|
|
||||||
lifecyclePolicy.toXContent(builder, null);
|
|
||||||
final StringEntity entity = new StringEntity(
|
|
||||||
"{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
|
|
||||||
Request request = new Request("PUT", "_ilm/" + policy);
|
|
||||||
request.setEntity(entity);
|
|
||||||
assertOK(client().performRequest(request));
|
|
||||||
// update policy on index
|
// update policy on index
|
||||||
updatePolicy(originalIndex, policy);
|
updatePolicy(originalIndex, policy);
|
||||||
// index document {"foo": "bar"} to trigger rollover
|
// index document {"foo": "bar"} to trigger rollover
|
||||||
|
@ -106,6 +87,74 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||||
assertBusy(() -> assertFalse(indexExists(originalIndex)));
|
assertBusy(() -> assertFalse(indexExists(originalIndex)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMoveToAllocateStep() throws Exception {
|
||||||
|
String originalIndex = index + "-000001";
|
||||||
|
createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 4)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
|
.put("index.routing.allocation.include._name", "node-0")
|
||||||
|
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));
|
||||||
|
|
||||||
|
// create policy
|
||||||
|
createFullPolicy(TimeValue.timeValueHours(10));
|
||||||
|
// update policy on index
|
||||||
|
updatePolicy(originalIndex, policy);
|
||||||
|
|
||||||
|
// move to a step
|
||||||
|
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
|
||||||
|
assertBusy(() -> assertTrue(getStepKeyForIndex(originalIndex).equals(new StepKey("new", "complete", "complete"))));
|
||||||
|
moveToStepRequest.setJsonEntity("{\n" +
|
||||||
|
" \"current_step\": {\n" +
|
||||||
|
" \"phase\": \"new\",\n" +
|
||||||
|
" \"action\": \"complete\",\n" +
|
||||||
|
" \"name\": \"complete\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" \"next_step\": {\n" +
|
||||||
|
" \"phase\": \"cold\",\n" +
|
||||||
|
" \"action\": \"allocate\",\n" +
|
||||||
|
" \"name\": \"allocate\"\n" +
|
||||||
|
" }\n" +
|
||||||
|
"}");
|
||||||
|
client().performRequest(moveToStepRequest);
|
||||||
|
assertBusy(() -> assertFalse(indexExists(originalIndex)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testMoveToRolloverStep() throws Exception {
|
||||||
|
String originalIndex = index + "-000001";
|
||||||
|
String shrunkenOriginalIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + originalIndex;
|
||||||
|
String secondIndex = index + "-000002";
|
||||||
|
createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 4)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
|
.put("index.routing.allocation.include._name", "node-0")
|
||||||
|
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));
|
||||||
|
|
||||||
|
createFullPolicy(TimeValue.timeValueHours(10));
|
||||||
|
// update policy on index
|
||||||
|
updatePolicy(originalIndex, policy);
|
||||||
|
|
||||||
|
// move to a step
|
||||||
|
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
|
||||||
|
// index document to trigger rollover
|
||||||
|
index(client(), originalIndex, "_id", "foo", "bar");
|
||||||
|
logger.info(getStepKeyForIndex(originalIndex));
|
||||||
|
moveToStepRequest.setJsonEntity("{\n" +
|
||||||
|
" \"current_step\": {\n" +
|
||||||
|
" \"phase\": \"new\",\n" +
|
||||||
|
" \"action\": \"complete\",\n" +
|
||||||
|
" \"name\": \"complete\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" \"next_step\": {\n" +
|
||||||
|
" \"phase\": \"hot\",\n" +
|
||||||
|
" \"action\": \"rollover\",\n" +
|
||||||
|
" \"name\": \"attempt_rollover\"\n" +
|
||||||
|
" }\n" +
|
||||||
|
"}");
|
||||||
|
client().performRequest(moveToStepRequest);
|
||||||
|
assertBusy(() -> assertTrue(indexExists(secondIndex)));
|
||||||
|
assertBusy(() -> assertFalse(indexExists(shrunkenOriginalIndex)));
|
||||||
|
assertBusy(() -> assertFalse(indexExists(originalIndex)));
|
||||||
|
}
|
||||||
|
|
||||||
public void testRolloverAction() throws Exception {
|
public void testRolloverAction() throws Exception {
|
||||||
String originalIndex = index + "-000001";
|
String originalIndex = index + "-000001";
|
||||||
String secondIndex = index + "-000002";
|
String secondIndex = index + "-000002";
|
||||||
|
@ -296,6 +345,29 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createFullPolicy(TimeValue hotTime) throws IOException {
|
||||||
|
Map<String, LifecycleAction> warmActions = new HashMap<>();
|
||||||
|
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1));
|
||||||
|
warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "node-1,node-2"), null, null));
|
||||||
|
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
|
||||||
|
Map<String, Phase> phases = new HashMap<>();
|
||||||
|
phases.put("hot", new Phase("hot", hotTime, singletonMap(RolloverAction.NAME,
|
||||||
|
new RolloverAction(null, null, 1L))));
|
||||||
|
phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions));
|
||||||
|
phases.put("cold", new Phase("cold", TimeValue.ZERO, singletonMap(AllocateAction.NAME,
|
||||||
|
new AllocateAction(0, singletonMap("_name", "node-3"), null, null))));
|
||||||
|
phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction())));
|
||||||
|
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases);
|
||||||
|
// PUT policy
|
||||||
|
XContentBuilder builder = jsonBuilder();
|
||||||
|
lifecyclePolicy.toXContent(builder, null);
|
||||||
|
final StringEntity entity = new StringEntity(
|
||||||
|
"{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
|
||||||
|
Request request = new Request("PUT", "_ilm/" + policy);
|
||||||
|
request.setEntity(entity);
|
||||||
|
assertOK(client().performRequest(request));
|
||||||
|
}
|
||||||
|
|
||||||
private void createNewSingletonPolicy(String phaseName, LifecycleAction action) throws IOException {
|
private void createNewSingletonPolicy(String phaseName, LifecycleAction action) throws IOException {
|
||||||
createNewSingletonPolicy(phaseName, action, TimeValue.ZERO);
|
createNewSingletonPolicy(phaseName, action, TimeValue.ZERO);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,8 +24,8 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.xpack.core.XPackField;
|
import org.elasticsearch.xpack.core.XPackField;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
|
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
|
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
|
||||||
|
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
|
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
|
||||||
|
@ -76,6 +76,11 @@ public class IndexLifecycleService extends AbstractComponent
|
||||||
this::updatePollInterval);
|
this::updatePollInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void maybeRunAsyncAction(ClusterState clusterState, IndexMetaData indexMetaData, StepKey nextStepKey) {
|
||||||
|
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings());
|
||||||
|
lifecycleRunner.maybeRunAsyncAction(clusterState, indexMetaData, policyName, nextStepKey);
|
||||||
|
}
|
||||||
|
|
||||||
public ClusterState moveClusterStateToStep(ClusterState currentState, String indexName, StepKey currentStepKey, StepKey nextStepKey) {
|
public ClusterState moveClusterStateToStep(ClusterState currentState, String indexName, StepKey currentStepKey, StepKey nextStepKey) {
|
||||||
return IndexLifecycleRunner.moveClusterStateToStep(indexName, currentState, currentStepKey, nextStepKey,
|
return IndexLifecycleRunner.moveClusterStateToStep(indexName, currentState, currentStepKey, nextStepKey,
|
||||||
nowSupplier, policyRegistry);
|
nowSupplier, policyRegistry);
|
||||||
|
|
|
@ -62,6 +62,18 @@ public class TransportMoveToStepAction extends TransportMasterNodeAction<Request
|
||||||
request.getNextStepKey());
|
request.getNextStepKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
IndexMetaData newIndexMetaData = newState.metaData().index(indexMetaData.getIndex());
|
||||||
|
if (newIndexMetaData == null) {
|
||||||
|
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
|
||||||
|
logger.debug("index [" + indexMetaData.getIndex() + "] has been deleted after moving to step [" +
|
||||||
|
request.getNextStepKey() + "], skipping async action check");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
indexLifecycleService.maybeRunAsyncAction(newState, newIndexMetaData, request.getNextStepKey());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Response newResponse(boolean acknowledged) {
|
protected Response newResponse(boolean acknowledged) {
|
||||||
return new Response(acknowledged);
|
return new Response(acknowledged);
|
||||||
|
|
Loading…
Reference in New Issue