* ILM use Priority.IMMEDIATE for stop ILM cluster update (#54909) This changes the priority of the cluster state update that stops ILM altogether to `IMMEDIATE`. We've chosen to change this as it can be useful to temporarily stop ILM if a cluster is overwhelmed, but a `NORMAL` priority can see the "stop ILM update" not make it up the tasks queue. On the same note, we're keeping the `start ILM` cluster update priority to `NORMAL` on purpose such that we only start `ILM` if the cluster can handle it. (cherry picked from commit d67df3a7cd2a8619c2c9efac4dde3ba83271f2fa) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
2655dfa2fe
commit
b8df265b42
|
@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterStateListener;
|
|||
import org.elasticsearch.cluster.LocalNodeMasterListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.Lifecycle.State;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -363,7 +364,13 @@ public class IndexLifecycleService
|
|||
}
|
||||
|
||||
public void submitOperationModeUpdate(OperationMode mode) {
|
||||
clusterService.submitStateUpdateTask("ilm_operation_mode_update", OperationModeUpdateTask.ilmMode(mode));
|
||||
OperationModeUpdateTask ilmOperationModeUpdateTask;
|
||||
if (mode == OperationMode.STOPPING || mode == OperationMode.STOPPED) {
|
||||
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.IMMEDIATE, mode);
|
||||
} else {
|
||||
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.NORMAL, mode);
|
||||
}
|
||||
clusterService.submitStateUpdateTask("ilm_operation_mode_update {OperationMode " + mode.name() + "}", ilmOperationModeUpdateTask);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -11,10 +11,17 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||
|
||||
/**
|
||||
* This task updates the operation mode state for ILM.
|
||||
*
|
||||
* As stopping ILM proved to be an action we want to sometimes take in order to allow clusters to stabilise when under heavy load this
|
||||
* task might run at {@link Priority#IMMEDIATE} priority so please make sure to keep this task as lightweight as possible.
|
||||
*/
|
||||
public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
||||
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
|
||||
@Nullable
|
||||
|
@ -22,17 +29,22 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
|||
@Nullable
|
||||
private final OperationMode slmMode;
|
||||
|
||||
private OperationModeUpdateTask(OperationMode ilmMode, OperationMode slmMode) {
|
||||
private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, OperationMode slmMode) {
|
||||
super(priority);
|
||||
this.ilmMode = ilmMode;
|
||||
this.slmMode = slmMode;
|
||||
}
|
||||
|
||||
public static OperationModeUpdateTask ilmMode(OperationMode mode) {
|
||||
return new OperationModeUpdateTask(mode, null);
|
||||
return ilmMode(Priority.NORMAL, mode);
|
||||
}
|
||||
|
||||
public static OperationModeUpdateTask ilmMode(Priority priority, OperationMode mode) {
|
||||
return new OperationModeUpdateTask(priority, mode, null);
|
||||
}
|
||||
|
||||
public static OperationModeUpdateTask slmMode(OperationMode mode) {
|
||||
return new OperationModeUpdateTask(null, mode);
|
||||
return new OperationModeUpdateTask(Priority.NORMAL, null, mode);
|
||||
}
|
||||
|
||||
OperationMode getILMOperationMode() {
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -49,7 +50,8 @@ public class TransportStopILMAction extends TransportMasterNodeAction<StopILMReq
|
|||
@Override
|
||||
protected void masterOperation(StopILMRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
|
||||
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.IMMEDIATE, request, listener) {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState);
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.component.Lifecycle.State;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
|
@ -40,8 +41,10 @@ import org.elasticsearch.xpack.core.ilm.ShrinkAction;
|
|||
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
|
||||
import org.elasticsearch.xpack.core.ilm.Step;
|
||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||
import org.hamcrest.Description;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.time.Clock;
|
||||
|
@ -62,9 +65,11 @@ import static org.elasticsearch.xpack.core.ilm.LifecyclePolicyTestsUtils.newTest
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class IndexLifecycleServiceTests extends ESTestCase {
|
||||
|
@ -242,7 +247,8 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
doAnswer(invocationOnMock -> {
|
||||
changedOperationMode.set(true);
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
|
||||
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"),
|
||||
any(OperationModeUpdateTask.class));
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.triggerPolicies(currentState, true);
|
||||
assertTrue(changedOperationMode.get());
|
||||
|
@ -294,7 +300,8 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
assertThat(task.getILMOperationMode(), equalTo(OperationMode.STOPPED));
|
||||
moveToMaintenance.set(true);
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
|
||||
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"),
|
||||
any(OperationModeUpdateTask.class));
|
||||
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
|
||||
|
@ -310,6 +317,40 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
doTestExceptionStillProcessesOtherIndices(true);
|
||||
}
|
||||
|
||||
public void testOperationModeUpdateTaskPriority() {
|
||||
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPING);
|
||||
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPING, Priority.IMMEDIATE);
|
||||
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPED);
|
||||
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPED, Priority.IMMEDIATE);
|
||||
indexLifecycleService.submitOperationModeUpdate(OperationMode.RUNNING);
|
||||
verifyOperationModeUpdateTaskPriority(OperationMode.RUNNING, Priority.NORMAL);
|
||||
}
|
||||
|
||||
private void verifyOperationModeUpdateTaskPriority(OperationMode mode, Priority expectedPriority) {
|
||||
verify(clusterService).submitStateUpdateTask(
|
||||
Mockito.eq("ilm_operation_mode_update {OperationMode " + mode.name() +"}"),
|
||||
argThat(new ArgumentMatcher<OperationModeUpdateTask>() {
|
||||
|
||||
Priority actualPriority = null;
|
||||
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
if (argument instanceof OperationModeUpdateTask == false) {
|
||||
return false;
|
||||
}
|
||||
actualPriority = ((OperationModeUpdateTask) argument).priority();
|
||||
return actualPriority == expectedPriority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description.appendText("the cluster state update task priority must be "+ expectedPriority+" but got: ")
|
||||
.appendText(actualPriority.name());
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
|
||||
String policy1 = randomAlphaOfLengthBetween(1, 20);
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ilm.action;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
|
||||
import org.hamcrest.Description;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class TransportStopILMActionTests extends ESTestCase {
|
||||
|
||||
private static final ActionListener<AcknowledgedResponse> EMPTY_LISTENER = new ActionListener<AcknowledgedResponse>() {
|
||||
@Override
|
||||
public void onResponse(AcknowledgedResponse response) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testStopILMClusterStatePriorityIsImmediate() {
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
|
||||
TransportStopILMAction transportStopILMAction = new TransportStopILMAction(mock(TransportService.class),
|
||||
clusterService, mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
|
||||
StopILMRequest request = new StopILMRequest();
|
||||
transportStopILMAction.masterOperation(request, ClusterState.EMPTY_STATE, EMPTY_LISTENER);
|
||||
|
||||
verify(clusterService).submitStateUpdateTask(
|
||||
eq("ilm_operation_mode_update"),
|
||||
argThat(new ArgumentMatcher<AckedClusterStateUpdateTask<AcknowledgedResponse>>() {
|
||||
|
||||
Priority actualPriority = null;
|
||||
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
if (argument instanceof AckedClusterStateUpdateTask == false) {
|
||||
return false;
|
||||
}
|
||||
actualPriority = ((AckedClusterStateUpdateTask<AcknowledgedResponse>) argument).priority();
|
||||
return actualPriority == Priority.IMMEDIATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description.appendText("the cluster state update task priority must be URGENT but got: ")
|
||||
.appendText(actualPriority.name());
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue