This commit is contained in:
parent
1cff6897f3
commit
1fe2705826
|
@ -18,6 +18,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|||
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
||||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
|
||||
|
||||
import java.time.Clock;
|
||||
|
@ -123,6 +124,10 @@ public class MlDailyMaintenanceService implements Releasable {
|
|||
|
||||
private void triggerTasks() {
|
||||
try {
|
||||
if (MlMetadata.getMlMetadata(clusterService.state()).isUpgradeMode()) {
|
||||
LOGGER.warn("skipping scheduled [ML] maintenance tasks because upgrade mode is enabled");
|
||||
return;
|
||||
}
|
||||
LOGGER.info("triggering scheduled [ML] maintenance tasks");
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(),
|
||||
ActionListener.wrap(
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -28,6 +29,8 @@ import static org.elasticsearch.mock.orig.Mockito.verify;
|
|||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MlDailyMaintenanceServiceTests extends ESTestCase {
|
||||
|
@ -43,11 +46,6 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase {
|
|||
client = mock(Client.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
clusterService = mock(ClusterService.class);
|
||||
ClusterState state = ClusterState.builder(new ClusterName("MlDailyMaintenanceServiceTests"))
|
||||
.metadata(Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, PersistentTasksCustomMetadata.builder().build()))
|
||||
.nodes(DiscoveryNodes.builder().build())
|
||||
.build();
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
mlAssignmentNotifier = mock(MlAssignmentNotifier.class);
|
||||
}
|
||||
|
||||
|
@ -57,6 +55,8 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testScheduledTriggering() throws InterruptedException {
|
||||
when(clusterService.state()).thenReturn(createClusterState(false));
|
||||
|
||||
int triggerCount = randomIntBetween(2, 4);
|
||||
CountDownLatch latch = new CountDownLatch(triggerCount);
|
||||
try (MlDailyMaintenanceService service = createService(latch, client)) {
|
||||
|
@ -68,10 +68,33 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase {
|
|||
verify(mlAssignmentNotifier, Mockito.atLeast(triggerCount - 1)).auditUnassignedMlTasks(any(), any());
|
||||
}
|
||||
|
||||
public void testScheduledTriggeringWhileUpgradeModeIsEnabled() throws InterruptedException {
|
||||
when(clusterService.state()).thenReturn(createClusterState(true));
|
||||
|
||||
int triggerCount = randomIntBetween(2, 4);
|
||||
CountDownLatch latch = new CountDownLatch(triggerCount);
|
||||
try (MlDailyMaintenanceService service = createService(latch, client)) {
|
||||
service.start();
|
||||
latch.await(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
verify(clusterService, times(triggerCount - 1)).state();
|
||||
verifyNoMoreInteractions(client, clusterService, mlAssignmentNotifier);
|
||||
}
|
||||
|
||||
private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) {
|
||||
return new MlDailyMaintenanceService(threadPool, client, clusterService, mlAssignmentNotifier, () -> {
|
||||
latch.countDown();
|
||||
return TimeValue.timeValueMillis(100);
|
||||
});
|
||||
}
|
||||
|
||||
private static ClusterState createClusterState(boolean isUpgradeMode) {
|
||||
return ClusterState.builder(new ClusterName("MlDailyMaintenanceServiceTests"))
|
||||
.metadata(Metadata.builder()
|
||||
.putCustom(PersistentTasksCustomMetadata.TYPE, PersistentTasksCustomMetadata.builder().build())
|
||||
.putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).build()))
|
||||
.nodes(DiscoveryNodes.builder().build())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue