diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/GetSLMStatusAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/GetSLMStatusAction.java
new file mode 100644
index 00000000000..83a7d8da589
--- /dev/null
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/GetSLMStatusAction.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.slm.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.ilm.OperationMode;
+
+import java.io.IOException;
+
+public class GetSLMStatusAction extends ActionType<GetSLMStatusAction.Response> {
+    public static final GetSLMStatusAction INSTANCE = new GetSLMStatusAction();
+    public static final String NAME = "cluster:admin/slm/status";
+
+    protected GetSLMStatusAction() {
+        super(NAME, GetSLMStatusAction.Response::new);
+    }
+
+    public static class Response extends ActionResponse implements ToXContentObject {
+
+        private OperationMode mode;
+
+        public Response(StreamInput in) throws IOException {
+            super(in);
+            this.mode = in.readEnum(OperationMode.class);
+        }
+
+        public Response(OperationMode mode) {
+            this.mode = mode;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeEnum(this.mode);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field("operation_mode", this.mode);
+            builder.endObject();
+            return builder;
+        }
+    }
+
+    public static class Request extends AcknowledgedRequest<GetSLMStatusAction.Request> {
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        public Request() {
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+        }
+    }
+}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/StartSLMAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/StartSLMAction.java
new file mode 100644
index 00000000000..faea61c379e
--- /dev/null
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/StartSLMAction.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.slm.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+
+import java.io.IOException;
+
+public class StartSLMAction extends ActionType<AcknowledgedResponse> {
+    public static final StartSLMAction INSTANCE = new StartSLMAction();
+    public static final String NAME = "cluster:admin/slm/start";
+
+    protected StartSLMAction() {
+        super(NAME, AcknowledgedResponse::new);
+    }
+
+    public static class Request extends AcknowledgedRequest<StartSLMAction.Request> {
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        public Request() {
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public int hashCode() {
+            return 86;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (obj.getClass() != getClass()) {
+                return false;
+            }
+            return true;
+        }
+    }
+}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/StopSLMAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/StopSLMAction.java
new file mode 100644
index 00000000000..a1bbbafa070
--- /dev/null
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/action/StopSLMAction.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.slm.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+
+import java.io.IOException;
+
+public class StopSLMAction extends ActionType<AcknowledgedResponse> {
+    public static final StopSLMAction INSTANCE = new StopSLMAction();
+    public static final String NAME = "cluster:admin/slm/stop";
+
+    protected StopSLMAction() {
+        super(NAME, AcknowledgedResponse::new);
+    }
+
+    public static class Request extends AcknowledgedRequest<Request> {
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        public Request() {
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public int hashCode() {
+            return 85;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (obj.getClass() != getClass()) {
+                return false;
+            }
+            return true;
+        }
+    }
+}
diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java
index cfe5b2f0e06..645fe458d0a 100644
--- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java
+++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java
@@ -240,6 +240,98 @@ public class SnapshotLifecycleRestIT extends ESRestTestCase {
         });
     }
 
+
+    @SuppressWarnings("unchecked")
+    public void testStartStopStatus() throws Exception {
+        final String indexName = "test";
+        final String policyName = "start-stop-policy";
+        final String repoId = "start-stop-repo";
+        int docCount = randomIntBetween(10, 50);
+        for (int i = 0; i < docCount; i++) {
+            index(client(), indexName, "" + i, "foo", "bar");
+        }
+
+        // Create a snapshot repo
+        initializeRepo(repoId);
+
+        // Stop SLM so nothing happens
+        client().performRequest(new Request("POST", "/_slm/stop"));
+
+        assertBusy(() -> {
+            logger.info("--> waiting for SLM to stop");
+            assertThat(EntityUtils.toString(client().performRequest(new Request("GET", "/_slm/status")).getEntity()),
+                containsString("STOPPED"));
+        });
+
+        try {
+            createSnapshotPolicy(policyName, "snap", "*/1 * * * * ?", repoId, indexName, true,
+                new SnapshotRetentionConfiguration(TimeValue.ZERO, null, null));
+            long start = System.currentTimeMillis();
+            final String snapshotName = executePolicy(policyName);
+
+            // Check that the executed snapshot is created
+            assertBusy(() -> {
+                try {
+                    logger.info("--> checking for snapshot creation...");
+                    Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
+                    Map<String, Object> snapshotResponseMap;
+                    try (InputStream is = response.getEntity().getContent()) {
+                        snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
+                    }
+                    assertThat(snapshotResponseMap.size(), greaterThan(0));
+                    final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
+                    assertNotNull(metadata);
+                    assertThat(metadata.get("policy"), equalTo(policyName));
+                    assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);
+                } catch (ResponseException e) {
+                    fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
+                }
+            });
+
+            // Sleep for up to a second, but at least 1 second since we scheduled the policy so we can
+            // ensure it *would* have run if SLM were running
+            Thread.sleep(Math.min(0, TimeValue.timeValueSeconds(1).millis() - Math.min(0, System.currentTimeMillis() - start)));
+
+            client().performRequest(new Request("POST", "/_slm/_execute_retention"));
+
+            // Retention and the manually executed policy should still have run,
+            // but only the one we manually ran.
+            assertBusy(() -> {
+                logger.info("--> checking for stats updates...");
+                Map<String, Object> stats = getSLMStats();
+                Map<String, Object> policyStats = policyStatsAsMap(stats);
+                Map<String, Object> policyIdStats = (Map<String, Object>) policyStats.get(policyName);
+                int snapsTaken = (int) policyIdStats.get(SnapshotLifecycleStats.SnapshotPolicyStats.SNAPSHOTS_TAKEN.getPreferredName());
+                int totalTaken = (int) stats.get(SnapshotLifecycleStats.TOTAL_TAKEN.getPreferredName());
+                int totalFailed = (int) stats.get(SnapshotLifecycleStats.TOTAL_FAILED.getPreferredName());
+                int totalDeleted = (int) stats.get(SnapshotLifecycleStats.TOTAL_DELETIONS.getPreferredName());
+                assertThat(snapsTaken, equalTo(1));
+                assertThat(totalTaken, equalTo(1));
+                assertThat(totalDeleted, equalTo(1));
+                assertThat(totalFailed, equalTo(0));
+            });
+
+            assertBusy(() -> {
+                try {
+                    Map<String, List<Map<?, ?>>> snaps = wipeSnapshots();
+                    logger.info("--> checking for wiped snapshots: {}", snaps);
+                    assertThat(snaps.size(), equalTo(0));
+                } catch (ResponseException e) {
+                    logger.error("got exception wiping snapshots", e);
+                    fail("got exception: " + EntityUtils.toString(e.getResponse().getEntity()));
+                }
+            });
+        } finally {
+            client().performRequest(new Request("POST", "/_slm/start"));
+
+            assertBusy(() -> {
+                logger.info("--> waiting for SLM to start");
+                assertThat(EntityUtils.toString(client().performRequest(new Request("GET", "/_slm/status")).getEntity()),
+                    containsString("RUNNING"));
+            });
+        }
+    }
+
     @SuppressWarnings("unchecked")
     public void testBasicTimeBasedRetenion() throws Exception {
         final String indexName = "test";
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java
index 6fda8955d67..28c945e36e7 100644
--- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java
@@ -66,9 +66,12 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
 import org.elasticsearch.xpack.core.slm.action.DeleteSnapshotLifecycleAction;
 import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotLifecycleAction;
 import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotRetentionAction;
+import org.elasticsearch.xpack.core.slm.action.GetSLMStatusAction;
 import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleAction;
 import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleStatsAction;
 import org.elasticsearch.xpack.core.slm.action.PutSnapshotLifecycleAction;
+import org.elasticsearch.xpack.core.slm.action.StartSLMAction;
+import org.elasticsearch.xpack.core.slm.action.StopSLMAction;
 import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore;
 import org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry;
 import org.elasticsearch.xpack.ilm.action.RestDeleteLifecycleAction;
@@ -98,15 +101,21 @@ import org.elasticsearch.xpack.slm.SnapshotRetentionTask;
 import org.elasticsearch.xpack.slm.action.RestDeleteSnapshotLifecycleAction;
 import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotLifecycleAction;
 import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotRetentionAction;
+import org.elasticsearch.xpack.slm.action.RestGetSLMStatusAction;
 import org.elasticsearch.xpack.slm.action.RestGetSnapshotLifecycleAction;
 import org.elasticsearch.xpack.slm.action.RestGetSnapshotLifecycleStatsAction;
 import org.elasticsearch.xpack.slm.action.RestPutSnapshotLifecycleAction;
+import org.elasticsearch.xpack.slm.action.RestStartSLMAction;
+import org.elasticsearch.xpack.slm.action.RestStopSLMAction;
 import org.elasticsearch.xpack.slm.action.TransportDeleteSnapshotLifecycleAction;
 import org.elasticsearch.xpack.slm.action.TransportExecuteSnapshotLifecycleAction;
 import org.elasticsearch.xpack.slm.action.TransportExecuteSnapshotRetentionAction;
+import org.elasticsearch.xpack.slm.action.TransportGetSLMStatusAction;
 import org.elasticsearch.xpack.slm.action.TransportGetSnapshotLifecycleAction;
 import org.elasticsearch.xpack.slm.action.TransportGetSnapshotLifecycleStatsAction;
 import org.elasticsearch.xpack.slm.action.TransportPutSnapshotLifecycleAction;
+import org.elasticsearch.xpack.slm.action.TransportStartSLMAction;
+import org.elasticsearch.xpack.slm.action.TransportStopSLMAction;
 
 import java.io.IOException;
 import java.time.Clock;
@@ -251,7 +260,10 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
                 new RestGetSnapshotLifecycleAction(restController),
                 new RestExecuteSnapshotLifecycleAction(restController),
                 new RestGetSnapshotLifecycleStatsAction(restController),
-                new RestExecuteSnapshotRetentionAction(restController)
+                new RestExecuteSnapshotRetentionAction(restController),
+                new RestStopSLMAction(restController),
+                new RestStartSLMAction(restController),
+                new RestGetSLMStatusAction(restController)
             ));
         }
         return handlers;
@@ -281,7 +293,10 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
                 new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class),
                 new ActionHandler<>(ExecuteSnapshotLifecycleAction.INSTANCE, TransportExecuteSnapshotLifecycleAction.class),
                 new ActionHandler<>(GetSnapshotLifecycleStatsAction.INSTANCE, TransportGetSnapshotLifecycleStatsAction.class),
-                new ActionHandler<>(ExecuteSnapshotRetentionAction.INSTANCE, TransportExecuteSnapshotRetentionAction.class)
+                new ActionHandler<>(ExecuteSnapshotRetentionAction.INSTANCE, TransportExecuteSnapshotRetentionAction.class),
+                new ActionHandler<>(StartSLMAction.INSTANCE, TransportStartSLMAction.class),
+                new ActionHandler<>(StopSLMAction.INSTANCE, TransportStopSLMAction.class),
+                new ActionHandler<>(GetSLMStatusAction.INSTANCE, TransportGetSLMStatusAction.class)
             ));
         }
         return actions;
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java
index 1cf860bf5c7..7ab0d5a7e58 100644
--- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java
@@ -307,8 +307,7 @@ public class IndexLifecycleService
     }
 
     public void submitOperationModeUpdate(OperationMode mode) {
-        clusterService.submitStateUpdateTask("ilm_operation_mode_update",
-            new OperationModeUpdateTask(mode));
+        clusterService.submitStateUpdateTask("ilm_operation_mode_update", OperationModeUpdateTask.ilmMode(mode));
     }
 
     /**
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java
index 53d4a5307b0..9bb4a8df3d4 100644
--- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java
@@ -10,20 +10,37 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.xpack.core.ilm.OperationMode;
 import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
 import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
 
 public class OperationModeUpdateTask extends ClusterStateUpdateTask {
     private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
-    private final OperationMode mode;
+    @Nullable
+    private final OperationMode ilmMode;
+    @Nullable
+    private final OperationMode slmMode;
 
-    public OperationModeUpdateTask(OperationMode mode) {
-        this.mode = mode;
+    private OperationModeUpdateTask(OperationMode ilmMode, OperationMode slmMode) {
+        this.ilmMode = ilmMode;
+        this.slmMode = slmMode;
     }
 
-    OperationMode getOperationMode() {
-        return mode;
+    public static OperationModeUpdateTask ilmMode(OperationMode mode) {
+        return new OperationModeUpdateTask(mode, null);
+    }
+
+    public static OperationModeUpdateTask slmMode(OperationMode mode) {
+        return new OperationModeUpdateTask(null, mode);
+    }
+
+    OperationMode getILMOperationMode() {
+        return ilmMode;
+    }
+
+    OperationMode getSLMOperationMode() {
+        return slmMode;
     }
 
     @Override
@@ -35,20 +52,26 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
     }
 
     private ClusterState updateILMState(final ClusterState currentState) {
+        if (ilmMode == null) {
+            return currentState;
+        }
         IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
-        if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(mode) == false) {
+        if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(ilmMode) == false) {
             return currentState;
         } else if (currentMetadata == null) {
             currentMetadata = IndexLifecycleMetadata.EMPTY;
         }
 
         final OperationMode newMode;
-        if (currentMetadata.getOperationMode().isValidChange(mode)) {
-            newMode = mode;
+        if (currentMetadata.getOperationMode().isValidChange(ilmMode)) {
+            newMode = ilmMode;
         } else {
             newMode = currentMetadata.getOperationMode();
         }
 
+        if (newMode.equals(ilmMode) == false) {
+            logger.info("updating ILM operation mode to {}", newMode);
+        }
         return ClusterState.builder(currentState)
             .metaData(MetaData.builder(currentState.metaData())
                     .putCustom(IndexLifecycleMetadata.TYPE,
@@ -57,20 +80,26 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
     }
 
     private ClusterState updateSLMState(final ClusterState currentState) {
+        if (slmMode == null) {
+            return currentState;
+        }
         SnapshotLifecycleMetadata currentMetadata = currentState.metaData().custom(SnapshotLifecycleMetadata.TYPE);
-        if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(mode) == false) {
+        if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(slmMode) == false) {
             return currentState;
         } else if (currentMetadata == null) {
             currentMetadata = SnapshotLifecycleMetadata.EMPTY;
         }
 
         final OperationMode newMode;
-        if (currentMetadata.getOperationMode().isValidChange(mode)) {
-            newMode = mode;
+        if (currentMetadata.getOperationMode().isValidChange(slmMode)) {
+            newMode = slmMode;
         } else {
             newMode = currentMetadata.getOperationMode();
         }
 
+        if (newMode.equals(slmMode) == false) {
+            logger.info("updating SLM operation mode to {}", newMode);
+        }
         return ClusterState.builder(currentState)
             .metaData(MetaData.builder(currentState.metaData())
                 .putCustom(SnapshotLifecycleMetadata.TYPE,
@@ -81,6 +110,6 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
 
     @Override
     public void onFailure(String source, Exception e) {
-        logger.error("unable to update lifecycle metadata with new mode [" + mode + "]", e);
+        logger.error("unable to update lifecycle metadata with new ilm mode [" + ilmMode + "], slm mode [" + slmMode + "]", e);
     }
 }
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java
index dbe2f3c9a35..d0c2388a1ab 100644
--- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java
@@ -52,7 +52,7 @@ public class TransportStartILMAction extends TransportMasterNodeAction<StartILMR
                 new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                        return (new OperationModeUpdateTask(OperationMode.RUNNING)).execute(currentState);
+                        return (OperationModeUpdateTask.ilmMode(OperationMode.RUNNING)).execute(currentState);
                 }
 
                 @Override
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java
index 2cd135b7c85..9d8c120fa44 100644
--- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java
@@ -52,7 +52,7 @@ public class TransportStopILMAction extends TransportMasterNodeAction<StopILMReq
                 new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                        return (new OperationModeUpdateTask(OperationMode.STOPPING)).execute(currentState);
+                        return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState);
                 }
 
                 @Override
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java
index 0d27584d83e..c0f8e651587 100644
--- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java
@@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
 import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
 import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;
+import org.elasticsearch.xpack.ilm.OperationModeUpdateTask;
 
 import java.io.Closeable;
 import java.time.Clock;
@@ -65,10 +66,13 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
         if (this.isMaster) {
             final ClusterState state = event.state();
 
-            if (ilmStoppedOrStopping(state)) {
+            if (slmStoppedOrStopping(state)) {
                 if (scheduler.scheduledJobIds().size() > 0) {
                     cancelSnapshotJobs();
                 }
+                if (slmStopping(state)) {
+                    submitOperationModeUpdate(OperationMode.STOPPED);
+                }
                 return;
             }
 
@@ -82,8 +86,8 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
         this.isMaster = true;
         scheduler.register(snapshotTask);
         final ClusterState state = clusterService.state();
-        if (ilmStoppedOrStopping(state)) {
-            // ILM is currently stopped, so don't schedule jobs
+        if (slmStoppedOrStopping(state)) {
+            // SLM is currently stopped, so don't schedule jobs
             return;
         }
         scheduleSnapshotJobs(state);
@@ -102,15 +106,29 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
     }
 
     /**
-     * Returns true if ILM is in the stopped or stopped state
+     * Returns true if SLM is in the stopping or stopped state
      */
-    static boolean ilmStoppedOrStopping(ClusterState state) {
+    static boolean slmStoppedOrStopping(ClusterState state) {
         return Optional.ofNullable((SnapshotLifecycleMetadata) state.metaData().custom(SnapshotLifecycleMetadata.TYPE))
             .map(SnapshotLifecycleMetadata::getOperationMode)
             .map(mode -> OperationMode.STOPPING == mode || OperationMode.STOPPED == mode)
             .orElse(false);
     }
 
+    /**
+     * Returns true if SLM is in the stopping state
+     */
+    static boolean slmStopping(ClusterState state) {
+        return Optional.ofNullable((SnapshotLifecycleMetadata) state.metaData().custom(SnapshotLifecycleMetadata.TYPE))
+            .map(SnapshotLifecycleMetadata::getOperationMode)
+            .map(mode -> OperationMode.STOPPING == mode)
+            .orElse(false);
+    }
+
+    public void submitOperationModeUpdate(OperationMode mode) {
+        clusterService.submitStateUpdateTask("slm_operation_mode_update", OperationModeUpdateTask.slmMode(mode));
+    }
+
     /**
      * Schedule all non-scheduled snapshot jobs contained in the cluster state
      */
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java
index 48111c13d8d..f278f118125 100644
--- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java
@@ -92,8 +92,12 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
                 SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID + " but it was " + event.getJobName();
 
         final ClusterState state = clusterService.state();
-        if (SnapshotLifecycleService.ilmStoppedOrStopping(state)) {
-            logger.debug("skipping SLM retention as ILM is currently stopped or stopping");
+
+        // Skip running retention if SLM is disabled, however, even if it's
+        // disabled we allow manual running.
+        if (SnapshotLifecycleService.slmStoppedOrStopping(state) &&
+            event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID) == false) {
+            logger.debug("skipping SLM retention as SLM is currently stopped or stopping");
             return;
         }
 
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestGetSLMStatusAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestGetSLMStatusAction.java
new file mode 100644
index 00000000000..26f3197a761
--- /dev/null
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestGetSLMStatusAction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.slm.action;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.slm.action.GetSLMStatusAction;
+
+public class RestGetSLMStatusAction extends BaseRestHandler {
+
+    public RestGetSLMStatusAction(RestController controller) {
+        controller.registerHandler(RestRequest.Method.GET, "/_slm/status", this);
+    }
+
+    @Override
+    public String getName() {
+        return "slm_get_operation_mode_action";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
+        GetSLMStatusAction.Request request = new GetSLMStatusAction.Request();
+        request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
+        request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
+        return channel -> client.execute(GetSLMStatusAction.INSTANCE, request, new RestToXContentListener<>(channel));
+    }
+}
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestStartSLMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestStartSLMAction.java
new file mode 100644
index 00000000000..87dc7d2bb22
--- /dev/null
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestStartSLMAction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.slm.action;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.slm.action.StartSLMAction;
+
+public class RestStartSLMAction extends BaseRestHandler {
+
+    public RestStartSLMAction(RestController controller) {
+        controller.registerHandler(RestRequest.Method.POST, "/_slm/start", this);
+    }
+
+    @Override
+    public String getName() {
+        return "slm_start_action";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
+        StartSLMAction.Request request = new StartSLMAction.Request();
+        request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
+        request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
+        return channel -> client.execute(StartSLMAction.INSTANCE, request, new RestToXContentListener<>(channel));
+    }
+}
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestStopSLMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestStopSLMAction.java
new file mode 100644
index 00000000000..ac74b37d587
--- /dev/null
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/RestStopSLMAction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.slm.action;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.slm.action.StopSLMAction;
+
+public class RestStopSLMAction extends BaseRestHandler {
+
+    public RestStopSLMAction(RestController controller) {
+        controller.registerHandler(RestRequest.Method.POST, "/_slm/stop", this);
+    }
+
+    @Override
+    public String getName() {
+        return "slm_stop_action";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
+        StopSLMAction.Request request = new StopSLMAction.Request();
+        request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
+        request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
+        return channel -> client.execute(StopSLMAction.INSTANCE, request, new RestToXContentListener<>(channel));
+    }
+}
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportGetSLMStatusAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportGetSLMStatusAction.java
new file mode 100644
index 00000000000..f880229ef8d
--- /dev/null
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportGetSLMStatusAction.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.slm.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ilm.OperationMode;
+import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
+import org.elasticsearch.xpack.core.slm.action.GetSLMStatusAction;
+
+import java.io.IOException;
+
+public class TransportGetSLMStatusAction extends TransportMasterNodeAction<GetSLMStatusAction.Request, GetSLMStatusAction.Response> {
+
+    @Inject
+    public TransportGetSLMStatusAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
+                                       ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
+        super(GetSLMStatusAction.NAME, transportService, clusterService, threadPool, actionFilters,
+            GetSLMStatusAction.Request::new, indexNameExpressionResolver);
+    }
+
+    @Override
+    protected String executor() {
+        return ThreadPool.Names.SAME;
+    }
+
+    @Override
+    protected GetSLMStatusAction.Response read(StreamInput in) throws IOException {
+        return new GetSLMStatusAction.Response(in);
+    }
+
+    @Override
+    protected void masterOperation(GetSLMStatusAction.Request request,
+                                   ClusterState state, ActionListener<GetSLMStatusAction.Response> listener) {
+        SnapshotLifecycleMetadata metadata = state.metaData().custom(SnapshotLifecycleMetadata.TYPE);
+        final GetSLMStatusAction.Response response;
+        if (metadata == null) {
+            // no need to actually install metadata just yet, but safe to say it is not stopped
+            response = new GetSLMStatusAction.Response(OperationMode.RUNNING);
+        } else {
+            response = new GetSLMStatusAction.Response(metadata.getOperationMode());
+        }
+        listener.onResponse(response);
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(GetSLMStatusAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+}
+
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java
new file mode 100644
index 00000000000..c52ecf2e121
--- /dev/null
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.slm.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ilm.OperationMode;
+import org.elasticsearch.xpack.core.slm.action.StartSLMAction;
+import org.elasticsearch.xpack.ilm.OperationModeUpdateTask;
+
+import java.io.IOException;
+
+public class TransportStartSLMAction extends TransportMasterNodeAction<StartSLMAction.Request, AcknowledgedResponse> {
+
+    @Inject
+    public TransportStartSLMAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
+                                   ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
+        super(StartSLMAction.NAME, transportService, clusterService, threadPool, actionFilters, StartSLMAction.Request::new,
+            indexNameExpressionResolver);
+    }
+
+    @Override
+    protected String executor() {
+        return ThreadPool.Names.SAME;
+    }
+
+    @Override
+    protected AcknowledgedResponse read(StreamInput in) throws IOException {
+        return new AcknowledgedResponse(in);
+    }
+
+    @Override
+    protected void masterOperation(StartSLMAction.Request request, ClusterState state,
+                                   ActionListener<AcknowledgedResponse> listener) {
+        clusterService.submitStateUpdateTask("slm_operation_mode_update",
+            new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    return (OperationModeUpdateTask.slmMode(OperationMode.RUNNING)).execute(currentState);
+                }
+
+                @Override
+                protected AcknowledgedResponse newResponse(boolean acknowledged) {
+                    return new AcknowledgedResponse(acknowledged);
+                }
+            });
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(StartSLMAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+}
diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java
new file mode 100644
index 00000000000..ad9c7b17688
--- /dev/null
+++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.slm.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ilm.OperationMode;
+import org.elasticsearch.xpack.core.slm.action.StopSLMAction;
+import org.elasticsearch.xpack.ilm.OperationModeUpdateTask;
+
+import java.io.IOException;
+
+public class TransportStopSLMAction extends TransportMasterNodeAction<StopSLMAction.Request, AcknowledgedResponse> {
+
+    @Inject
+    public TransportStopSLMAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
+                                  ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
+        super(StopSLMAction.NAME, transportService, clusterService, threadPool, actionFilters, StopSLMAction.Request::new,
+            indexNameExpressionResolver);
+    }
+
+    @Override
+    protected String executor() {
+        return ThreadPool.Names.SAME;
+    }
+
+    @Override
+    protected AcknowledgedResponse read(StreamInput in) throws IOException {
+        return new AcknowledgedResponse(in);
+    }
+
+    @Override
+    protected void masterOperation(StopSLMAction.Request request, ClusterState state,
+                                   ActionListener<AcknowledgedResponse> listener) {
+        clusterService.submitStateUpdateTask("slm_operation_mode_update",
+            new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    return (OperationModeUpdateTask.slmMode(OperationMode.STOPPING)).execute(currentState);
+                }
+
+                @Override
+                protected AcknowledgedResponse newResponse(boolean acknowledged) {
+                    return new AcknowledgedResponse(acknowledged);
+                }
+            });
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(StopSLMAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+}
diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java
index 87151627b02..996d208aae9 100644
--- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java
+++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java
@@ -289,7 +289,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
 
         doAnswer(invocationOnMock -> {
             OperationModeUpdateTask task = (OperationModeUpdateTask) invocationOnMock.getArguments()[1];
-            assertThat(task.getOperationMode(), equalTo(OperationMode.STOPPED));
+            assertThat(task.getILMOperationMode(), equalTo(OperationMode.STOPPED));
             moveToMaintenance.set(true);
             return null;
         }).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTaskTests.java
index f3ed5924cfe..7e361ca9b87 100644
--- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTaskTests.java
+++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTaskTests.java
@@ -71,7 +71,7 @@ public class OperationModeUpdateTaskTests extends ESTestCase {
                 .build());
         }
         ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
-        OperationModeUpdateTask task = new OperationModeUpdateTask(requestMode);
+        OperationModeUpdateTask task = OperationModeUpdateTask.ilmMode(requestMode);
         ClusterState newState = task.execute(state);
         if (assertSameClusterState) {
             assertSame(state, newState);
diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java
index 24d137f6839..dff01f8e9b0 100644
--- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java
+++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java
@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -402,6 +403,47 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
         }
     }
 
+    public void testRunManuallyWhileStopping() throws Exception {
+        doTestRunManuallyDuringMode(OperationMode.STOPPING);
+    }
+
+    public void testRunManuallyWhileStopped() throws Exception {
+        doTestRunManuallyDuringMode(OperationMode.STOPPED);
+    }
+
+    private void doTestRunManuallyDuringMode(OperationMode mode) throws Exception {
+        try (ThreadPool threadPool = new TestThreadPool("slm-test");
+             ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
+             Client noOpClient = new NoOpClient("slm-test")) {
+            final String policyId = "policy";
+            final String repoId = "repo";
+            SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?",
+                repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null));
+
+            ClusterState state = createState(mode, policy);
+            ClusterServiceUtils.setState(clusterService, state);
+
+            AtomicBoolean retentionWasRun = new AtomicBoolean(false);
+            MockSnapshotRetentionTask task = new MockSnapshotRetentionTask(noOpClient, clusterService,
+                new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, (historyItem) -> { }),
+                threadPool,
+                () -> {
+                    retentionWasRun.set(true);
+                    return Collections.emptyMap();
+                },
+                (deletionPolicyId, repo, snapId, slmStats, listener) -> { },
+                System::nanoTime);
+
+            long time = System.currentTimeMillis();
+            task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID, time, time));
+
+            assertTrue("retention should be run manually even if SLM is disabled", retentionWasRun.get());
+
+            threadPool.shutdownNow();
+            threadPool.awaitTermination(10, TimeUnit.SECONDS);
+        }
+    }
+
     public ClusterState createState(SnapshotLifecyclePolicy... policies) {
         return createState(OperationMode.RUNNING, policies);
     }