Separate SLM stop/start/status API from ILM (#47710)
* Separate SLM stop/start/status API from ILM This separates a start/stop/status API for SLM from being tied to ILM's operation mode. These APIs look like: ``` POST /_slm/stop POST /_slm/start GET /_slm/status ``` This allows administrators to have fine-grained control over preventing periodic snapshots and deletions while performing cluster maintenance. Relates to #43663 * Allow going from RUNNING to STOPPED * Align with the OperationMode rules * Fix slmStopping method * Make OperationModeUpdateTask constructor private * Wipe snapshots better in test
This commit is contained in:
parent
a492864a9d
commit
fb7abe9fa4
|
@ -0,0 +1,75 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.core.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
|
import org.elasticsearch.action.ActionResponse;
|
||||||
|
import org.elasticsearch.action.ActionType;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class GetSLMStatusAction extends ActionType<GetSLMStatusAction.Response> {
|
||||||
|
public static final GetSLMStatusAction INSTANCE = new GetSLMStatusAction();
|
||||||
|
public static final String NAME = "cluster:admin/slm/status";
|
||||||
|
|
||||||
|
protected GetSLMStatusAction() {
|
||||||
|
super(NAME, GetSLMStatusAction.Response::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Response extends ActionResponse implements ToXContentObject {
|
||||||
|
|
||||||
|
private OperationMode mode;
|
||||||
|
|
||||||
|
public Response(StreamInput in) throws IOException {
|
||||||
|
super(in);
|
||||||
|
this.mode = in.readEnum(OperationMode.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Response(OperationMode mode) {
|
||||||
|
this.mode = mode;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeEnum(this.mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("operation_mode", this.mode);
|
||||||
|
builder.endObject();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Request extends AcknowledgedRequest<GetSLMStatusAction.Request> {
|
||||||
|
|
||||||
|
public Request(StreamInput in) throws IOException {
|
||||||
|
super(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Request() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ActionRequestValidationException validate() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
super.writeTo(out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.core.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
|
import org.elasticsearch.action.ActionType;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class StartSLMAction extends ActionType<AcknowledgedResponse> {
|
||||||
|
public static final StartSLMAction INSTANCE = new StartSLMAction();
|
||||||
|
public static final String NAME = "cluster:admin/slm/start";
|
||||||
|
|
||||||
|
protected StartSLMAction() {
|
||||||
|
super(NAME, AcknowledgedResponse::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Request extends AcknowledgedRequest<StartSLMAction.Request> {
|
||||||
|
|
||||||
|
public Request(StreamInput in) throws IOException {
|
||||||
|
super(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Request() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ActionRequestValidationException validate() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return 86;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (obj.getClass() != getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.core.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
|
import org.elasticsearch.action.ActionType;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class StopSLMAction extends ActionType<AcknowledgedResponse> {
|
||||||
|
public static final StopSLMAction INSTANCE = new StopSLMAction();
|
||||||
|
public static final String NAME = "cluster:admin/slm/stop";
|
||||||
|
|
||||||
|
protected StopSLMAction() {
|
||||||
|
super(NAME, AcknowledgedResponse::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Request extends AcknowledgedRequest<Request> {
|
||||||
|
|
||||||
|
public Request(StreamInput in) throws IOException {
|
||||||
|
super(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Request() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ActionRequestValidationException validate() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return 85;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (obj.getClass() != getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -240,6 +240,98 @@ public class SnapshotLifecycleRestIT extends ESRestTestCase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testStartStopStatus() throws Exception {
|
||||||
|
final String indexName = "test";
|
||||||
|
final String policyName = "start-stop-policy";
|
||||||
|
final String repoId = "start-stop-repo";
|
||||||
|
int docCount = randomIntBetween(10, 50);
|
||||||
|
for (int i = 0; i < docCount; i++) {
|
||||||
|
index(client(), indexName, "" + i, "foo", "bar");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a snapshot repo
|
||||||
|
initializeRepo(repoId);
|
||||||
|
|
||||||
|
// Stop SLM so nothing happens
|
||||||
|
client().performRequest(new Request("POST", "/_slm/stop"));
|
||||||
|
|
||||||
|
assertBusy(() -> {
|
||||||
|
logger.info("--> waiting for SLM to stop");
|
||||||
|
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", "/_slm/status")).getEntity()),
|
||||||
|
containsString("STOPPED"));
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
createSnapshotPolicy(policyName, "snap", "*/1 * * * * ?", repoId, indexName, true,
|
||||||
|
new SnapshotRetentionConfiguration(TimeValue.ZERO, null, null));
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
final String snapshotName = executePolicy(policyName);
|
||||||
|
|
||||||
|
// Check that the executed snapshot is created
|
||||||
|
assertBusy(() -> {
|
||||||
|
try {
|
||||||
|
logger.info("--> checking for snapshot creation...");
|
||||||
|
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
|
||||||
|
Map<String, Object> snapshotResponseMap;
|
||||||
|
try (InputStream is = response.getEntity().getContent()) {
|
||||||
|
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
|
||||||
|
}
|
||||||
|
assertThat(snapshotResponseMap.size(), greaterThan(0));
|
||||||
|
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
|
||||||
|
assertNotNull(metadata);
|
||||||
|
assertThat(metadata.get("policy"), equalTo(policyName));
|
||||||
|
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);
|
||||||
|
} catch (ResponseException e) {
|
||||||
|
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Sleep for up to a second, but at least 1 second since we scheduled the policy so we can
|
||||||
|
// ensure it *would* have run if SLM were running
|
||||||
|
Thread.sleep(Math.min(0, TimeValue.timeValueSeconds(1).millis() - Math.min(0, System.currentTimeMillis() - start)));
|
||||||
|
|
||||||
|
client().performRequest(new Request("POST", "/_slm/_execute_retention"));
|
||||||
|
|
||||||
|
// Retention and the manually executed policy should still have run,
|
||||||
|
// but only the one we manually ran.
|
||||||
|
assertBusy(() -> {
|
||||||
|
logger.info("--> checking for stats updates...");
|
||||||
|
Map<String, Object> stats = getSLMStats();
|
||||||
|
Map<String, Object> policyStats = policyStatsAsMap(stats);
|
||||||
|
Map<String, Object> policyIdStats = (Map<String, Object>) policyStats.get(policyName);
|
||||||
|
int snapsTaken = (int) policyIdStats.get(SnapshotLifecycleStats.SnapshotPolicyStats.SNAPSHOTS_TAKEN.getPreferredName());
|
||||||
|
int totalTaken = (int) stats.get(SnapshotLifecycleStats.TOTAL_TAKEN.getPreferredName());
|
||||||
|
int totalFailed = (int) stats.get(SnapshotLifecycleStats.TOTAL_FAILED.getPreferredName());
|
||||||
|
int totalDeleted = (int) stats.get(SnapshotLifecycleStats.TOTAL_DELETIONS.getPreferredName());
|
||||||
|
assertThat(snapsTaken, equalTo(1));
|
||||||
|
assertThat(totalTaken, equalTo(1));
|
||||||
|
assertThat(totalDeleted, equalTo(1));
|
||||||
|
assertThat(totalFailed, equalTo(0));
|
||||||
|
});
|
||||||
|
|
||||||
|
assertBusy(() -> {
|
||||||
|
try {
|
||||||
|
Map<String, List<Map<?, ?>>> snaps = wipeSnapshots();
|
||||||
|
logger.info("--> checking for wiped snapshots: {}", snaps);
|
||||||
|
assertThat(snaps.size(), equalTo(0));
|
||||||
|
} catch (ResponseException e) {
|
||||||
|
logger.error("got exception wiping snapshots", e);
|
||||||
|
fail("got exception: " + EntityUtils.toString(e.getResponse().getEntity()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
client().performRequest(new Request("POST", "/_slm/start"));
|
||||||
|
|
||||||
|
assertBusy(() -> {
|
||||||
|
logger.info("--> waiting for SLM to start");
|
||||||
|
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", "/_slm/status")).getEntity()),
|
||||||
|
containsString("RUNNING"));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testBasicTimeBasedRetenion() throws Exception {
|
public void testBasicTimeBasedRetenion() throws Exception {
|
||||||
final String indexName = "test";
|
final String indexName = "test";
|
||||||
|
|
|
@ -66,9 +66,12 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||||
import org.elasticsearch.xpack.core.slm.action.DeleteSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.core.slm.action.DeleteSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotRetentionAction;
|
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotRetentionAction;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.GetSLMStatusAction;
|
||||||
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleStatsAction;
|
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleStatsAction;
|
||||||
import org.elasticsearch.xpack.core.slm.action.PutSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.core.slm.action.PutSnapshotLifecycleAction;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.StartSLMAction;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.StopSLMAction;
|
||||||
import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore;
|
import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore;
|
||||||
import org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry;
|
import org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry;
|
||||||
import org.elasticsearch.xpack.ilm.action.RestDeleteLifecycleAction;
|
import org.elasticsearch.xpack.ilm.action.RestDeleteLifecycleAction;
|
||||||
|
@ -98,15 +101,21 @@ import org.elasticsearch.xpack.slm.SnapshotRetentionTask;
|
||||||
import org.elasticsearch.xpack.slm.action.RestDeleteSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.slm.action.RestDeleteSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotRetentionAction;
|
import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotRetentionAction;
|
||||||
|
import org.elasticsearch.xpack.slm.action.RestGetSLMStatusAction;
|
||||||
import org.elasticsearch.xpack.slm.action.RestGetSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.slm.action.RestGetSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.slm.action.RestGetSnapshotLifecycleStatsAction;
|
import org.elasticsearch.xpack.slm.action.RestGetSnapshotLifecycleStatsAction;
|
||||||
import org.elasticsearch.xpack.slm.action.RestPutSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.slm.action.RestPutSnapshotLifecycleAction;
|
||||||
|
import org.elasticsearch.xpack.slm.action.RestStartSLMAction;
|
||||||
|
import org.elasticsearch.xpack.slm.action.RestStopSLMAction;
|
||||||
import org.elasticsearch.xpack.slm.action.TransportDeleteSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.slm.action.TransportDeleteSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.slm.action.TransportExecuteSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.slm.action.TransportExecuteSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.slm.action.TransportExecuteSnapshotRetentionAction;
|
import org.elasticsearch.xpack.slm.action.TransportExecuteSnapshotRetentionAction;
|
||||||
|
import org.elasticsearch.xpack.slm.action.TransportGetSLMStatusAction;
|
||||||
import org.elasticsearch.xpack.slm.action.TransportGetSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.slm.action.TransportGetSnapshotLifecycleAction;
|
||||||
import org.elasticsearch.xpack.slm.action.TransportGetSnapshotLifecycleStatsAction;
|
import org.elasticsearch.xpack.slm.action.TransportGetSnapshotLifecycleStatsAction;
|
||||||
import org.elasticsearch.xpack.slm.action.TransportPutSnapshotLifecycleAction;
|
import org.elasticsearch.xpack.slm.action.TransportPutSnapshotLifecycleAction;
|
||||||
|
import org.elasticsearch.xpack.slm.action.TransportStartSLMAction;
|
||||||
|
import org.elasticsearch.xpack.slm.action.TransportStopSLMAction;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
|
@ -251,7 +260,10 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
||||||
new RestGetSnapshotLifecycleAction(restController),
|
new RestGetSnapshotLifecycleAction(restController),
|
||||||
new RestExecuteSnapshotLifecycleAction(restController),
|
new RestExecuteSnapshotLifecycleAction(restController),
|
||||||
new RestGetSnapshotLifecycleStatsAction(restController),
|
new RestGetSnapshotLifecycleStatsAction(restController),
|
||||||
new RestExecuteSnapshotRetentionAction(restController)
|
new RestExecuteSnapshotRetentionAction(restController),
|
||||||
|
new RestStopSLMAction(restController),
|
||||||
|
new RestStartSLMAction(restController),
|
||||||
|
new RestGetSLMStatusAction(restController)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
return handlers;
|
return handlers;
|
||||||
|
@ -281,7 +293,10 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
||||||
new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class),
|
new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class),
|
||||||
new ActionHandler<>(ExecuteSnapshotLifecycleAction.INSTANCE, TransportExecuteSnapshotLifecycleAction.class),
|
new ActionHandler<>(ExecuteSnapshotLifecycleAction.INSTANCE, TransportExecuteSnapshotLifecycleAction.class),
|
||||||
new ActionHandler<>(GetSnapshotLifecycleStatsAction.INSTANCE, TransportGetSnapshotLifecycleStatsAction.class),
|
new ActionHandler<>(GetSnapshotLifecycleStatsAction.INSTANCE, TransportGetSnapshotLifecycleStatsAction.class),
|
||||||
new ActionHandler<>(ExecuteSnapshotRetentionAction.INSTANCE, TransportExecuteSnapshotRetentionAction.class)
|
new ActionHandler<>(ExecuteSnapshotRetentionAction.INSTANCE, TransportExecuteSnapshotRetentionAction.class),
|
||||||
|
new ActionHandler<>(StartSLMAction.INSTANCE, TransportStartSLMAction.class),
|
||||||
|
new ActionHandler<>(StopSLMAction.INSTANCE, TransportStopSLMAction.class),
|
||||||
|
new ActionHandler<>(GetSLMStatusAction.INSTANCE, TransportGetSLMStatusAction.class)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
return actions;
|
return actions;
|
||||||
|
|
|
@ -307,8 +307,7 @@ public class IndexLifecycleService
|
||||||
}
|
}
|
||||||
|
|
||||||
public void submitOperationModeUpdate(OperationMode mode) {
|
public void submitOperationModeUpdate(OperationMode mode) {
|
||||||
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
|
clusterService.submitStateUpdateTask("ilm_operation_mode_update", OperationModeUpdateTask.ilmMode(mode));
|
||||||
new OperationModeUpdateTask(mode));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -10,20 +10,37 @@ import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||||
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
|
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
|
||||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||||
|
|
||||||
public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
||||||
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
|
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
|
||||||
private final OperationMode mode;
|
@Nullable
|
||||||
|
private final OperationMode ilmMode;
|
||||||
|
@Nullable
|
||||||
|
private final OperationMode slmMode;
|
||||||
|
|
||||||
public OperationModeUpdateTask(OperationMode mode) {
|
private OperationModeUpdateTask(OperationMode ilmMode, OperationMode slmMode) {
|
||||||
this.mode = mode;
|
this.ilmMode = ilmMode;
|
||||||
|
this.slmMode = slmMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
OperationMode getOperationMode() {
|
public static OperationModeUpdateTask ilmMode(OperationMode mode) {
|
||||||
return mode;
|
return new OperationModeUpdateTask(mode, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static OperationModeUpdateTask slmMode(OperationMode mode) {
|
||||||
|
return new OperationModeUpdateTask(null, mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationMode getILMOperationMode() {
|
||||||
|
return ilmMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationMode getSLMOperationMode() {
|
||||||
|
return slmMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -35,20 +52,26 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterState updateILMState(final ClusterState currentState) {
|
private ClusterState updateILMState(final ClusterState currentState) {
|
||||||
|
if (ilmMode == null) {
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||||
if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(mode) == false) {
|
if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(ilmMode) == false) {
|
||||||
return currentState;
|
return currentState;
|
||||||
} else if (currentMetadata == null) {
|
} else if (currentMetadata == null) {
|
||||||
currentMetadata = IndexLifecycleMetadata.EMPTY;
|
currentMetadata = IndexLifecycleMetadata.EMPTY;
|
||||||
}
|
}
|
||||||
|
|
||||||
final OperationMode newMode;
|
final OperationMode newMode;
|
||||||
if (currentMetadata.getOperationMode().isValidChange(mode)) {
|
if (currentMetadata.getOperationMode().isValidChange(ilmMode)) {
|
||||||
newMode = mode;
|
newMode = ilmMode;
|
||||||
} else {
|
} else {
|
||||||
newMode = currentMetadata.getOperationMode();
|
newMode = currentMetadata.getOperationMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (newMode.equals(ilmMode) == false) {
|
||||||
|
logger.info("updating ILM operation mode to {}", newMode);
|
||||||
|
}
|
||||||
return ClusterState.builder(currentState)
|
return ClusterState.builder(currentState)
|
||||||
.metaData(MetaData.builder(currentState.metaData())
|
.metaData(MetaData.builder(currentState.metaData())
|
||||||
.putCustom(IndexLifecycleMetadata.TYPE,
|
.putCustom(IndexLifecycleMetadata.TYPE,
|
||||||
|
@ -57,20 +80,26 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterState updateSLMState(final ClusterState currentState) {
|
private ClusterState updateSLMState(final ClusterState currentState) {
|
||||||
|
if (slmMode == null) {
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
SnapshotLifecycleMetadata currentMetadata = currentState.metaData().custom(SnapshotLifecycleMetadata.TYPE);
|
SnapshotLifecycleMetadata currentMetadata = currentState.metaData().custom(SnapshotLifecycleMetadata.TYPE);
|
||||||
if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(mode) == false) {
|
if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(slmMode) == false) {
|
||||||
return currentState;
|
return currentState;
|
||||||
} else if (currentMetadata == null) {
|
} else if (currentMetadata == null) {
|
||||||
currentMetadata = SnapshotLifecycleMetadata.EMPTY;
|
currentMetadata = SnapshotLifecycleMetadata.EMPTY;
|
||||||
}
|
}
|
||||||
|
|
||||||
final OperationMode newMode;
|
final OperationMode newMode;
|
||||||
if (currentMetadata.getOperationMode().isValidChange(mode)) {
|
if (currentMetadata.getOperationMode().isValidChange(slmMode)) {
|
||||||
newMode = mode;
|
newMode = slmMode;
|
||||||
} else {
|
} else {
|
||||||
newMode = currentMetadata.getOperationMode();
|
newMode = currentMetadata.getOperationMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (newMode.equals(slmMode) == false) {
|
||||||
|
logger.info("updating SLM operation mode to {}", newMode);
|
||||||
|
}
|
||||||
return ClusterState.builder(currentState)
|
return ClusterState.builder(currentState)
|
||||||
.metaData(MetaData.builder(currentState.metaData())
|
.metaData(MetaData.builder(currentState.metaData())
|
||||||
.putCustom(SnapshotLifecycleMetadata.TYPE,
|
.putCustom(SnapshotLifecycleMetadata.TYPE,
|
||||||
|
@ -81,6 +110,6 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Exception e) {
|
public void onFailure(String source, Exception e) {
|
||||||
logger.error("unable to update lifecycle metadata with new mode [" + mode + "]", e);
|
logger.error("unable to update lifecycle metadata with new ilm mode [" + ilmMode + "], slm mode [" + slmMode + "]", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class TransportStartILMAction extends TransportMasterNodeAction<StartILMR
|
||||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
return (new OperationModeUpdateTask(OperationMode.RUNNING)).execute(currentState);
|
return (OperationModeUpdateTask.ilmMode(OperationMode.RUNNING)).execute(currentState);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class TransportStopILMAction extends TransportMasterNodeAction<StopILMReq
|
||||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
return (new OperationModeUpdateTask(OperationMode.STOPPING)).execute(currentState);
|
return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
|
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
|
||||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;
|
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;
|
||||||
|
import org.elasticsearch.xpack.ilm.OperationModeUpdateTask;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
|
@ -65,10 +66,13 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
|
||||||
if (this.isMaster) {
|
if (this.isMaster) {
|
||||||
final ClusterState state = event.state();
|
final ClusterState state = event.state();
|
||||||
|
|
||||||
if (ilmStoppedOrStopping(state)) {
|
if (slmStoppedOrStopping(state)) {
|
||||||
if (scheduler.scheduledJobIds().size() > 0) {
|
if (scheduler.scheduledJobIds().size() > 0) {
|
||||||
cancelSnapshotJobs();
|
cancelSnapshotJobs();
|
||||||
}
|
}
|
||||||
|
if (slmStopping(state)) {
|
||||||
|
submitOperationModeUpdate(OperationMode.STOPPED);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,8 +86,8 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
|
||||||
this.isMaster = true;
|
this.isMaster = true;
|
||||||
scheduler.register(snapshotTask);
|
scheduler.register(snapshotTask);
|
||||||
final ClusterState state = clusterService.state();
|
final ClusterState state = clusterService.state();
|
||||||
if (ilmStoppedOrStopping(state)) {
|
if (slmStoppedOrStopping(state)) {
|
||||||
// ILM is currently stopped, so don't schedule jobs
|
// SLM is currently stopped, so don't schedule jobs
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
scheduleSnapshotJobs(state);
|
scheduleSnapshotJobs(state);
|
||||||
|
@ -102,15 +106,29 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if ILM is in the stopped or stopped state
|
* Returns true if SLM is in the stopping or stopped state
|
||||||
*/
|
*/
|
||||||
static boolean ilmStoppedOrStopping(ClusterState state) {
|
static boolean slmStoppedOrStopping(ClusterState state) {
|
||||||
return Optional.ofNullable((SnapshotLifecycleMetadata) state.metaData().custom(SnapshotLifecycleMetadata.TYPE))
|
return Optional.ofNullable((SnapshotLifecycleMetadata) state.metaData().custom(SnapshotLifecycleMetadata.TYPE))
|
||||||
.map(SnapshotLifecycleMetadata::getOperationMode)
|
.map(SnapshotLifecycleMetadata::getOperationMode)
|
||||||
.map(mode -> OperationMode.STOPPING == mode || OperationMode.STOPPED == mode)
|
.map(mode -> OperationMode.STOPPING == mode || OperationMode.STOPPED == mode)
|
||||||
.orElse(false);
|
.orElse(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if SLM is in the stopping state
|
||||||
|
*/
|
||||||
|
static boolean slmStopping(ClusterState state) {
|
||||||
|
return Optional.ofNullable((SnapshotLifecycleMetadata) state.metaData().custom(SnapshotLifecycleMetadata.TYPE))
|
||||||
|
.map(SnapshotLifecycleMetadata::getOperationMode)
|
||||||
|
.map(mode -> OperationMode.STOPPING == mode)
|
||||||
|
.orElse(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void submitOperationModeUpdate(OperationMode mode) {
|
||||||
|
clusterService.submitStateUpdateTask("slm_operation_mode_update", OperationModeUpdateTask.slmMode(mode));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule all non-scheduled snapshot jobs contained in the cluster state
|
* Schedule all non-scheduled snapshot jobs contained in the cluster state
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -92,8 +92,12 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID + " but it was " + event.getJobName();
|
SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID + " but it was " + event.getJobName();
|
||||||
|
|
||||||
final ClusterState state = clusterService.state();
|
final ClusterState state = clusterService.state();
|
||||||
if (SnapshotLifecycleService.ilmStoppedOrStopping(state)) {
|
|
||||||
logger.debug("skipping SLM retention as ILM is currently stopped or stopping");
|
// Skip running retention if SLM is disabled, however, even if it's
|
||||||
|
// disabled we allow manual running.
|
||||||
|
if (SnapshotLifecycleService.slmStoppedOrStopping(state) &&
|
||||||
|
event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID) == false) {
|
||||||
|
logger.debug("skipping SLM retention as SLM is currently stopped or stopping");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.node.NodeClient;
|
||||||
|
import org.elasticsearch.rest.BaseRestHandler;
|
||||||
|
import org.elasticsearch.rest.RestController;
|
||||||
|
import org.elasticsearch.rest.RestRequest;
|
||||||
|
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.GetSLMStatusAction;
|
||||||
|
|
||||||
|
public class RestGetSLMStatusAction extends BaseRestHandler {
|
||||||
|
|
||||||
|
public RestGetSLMStatusAction(RestController controller) {
|
||||||
|
controller.registerHandler(RestRequest.Method.GET, "/_slm/status", this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return "slm_get_operation_mode_action";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||||
|
GetSLMStatusAction.Request request = new GetSLMStatusAction.Request();
|
||||||
|
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
|
||||||
|
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||||
|
return channel -> client.execute(GetSLMStatusAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.node.NodeClient;
|
||||||
|
import org.elasticsearch.rest.BaseRestHandler;
|
||||||
|
import org.elasticsearch.rest.RestController;
|
||||||
|
import org.elasticsearch.rest.RestRequest;
|
||||||
|
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.StartSLMAction;
|
||||||
|
|
||||||
|
public class RestStartSLMAction extends BaseRestHandler {
|
||||||
|
|
||||||
|
public RestStartSLMAction(RestController controller) {
|
||||||
|
controller.registerHandler(RestRequest.Method.POST, "/_slm/start", this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return "slm_start_action";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||||
|
StartSLMAction.Request request = new StartSLMAction.Request();
|
||||||
|
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
|
||||||
|
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||||
|
return channel -> client.execute(StartSLMAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.node.NodeClient;
|
||||||
|
import org.elasticsearch.rest.BaseRestHandler;
|
||||||
|
import org.elasticsearch.rest.RestController;
|
||||||
|
import org.elasticsearch.rest.RestRequest;
|
||||||
|
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.StopSLMAction;
|
||||||
|
|
||||||
|
public class RestStopSLMAction extends BaseRestHandler {
|
||||||
|
|
||||||
|
public RestStopSLMAction(RestController controller) {
|
||||||
|
controller.registerHandler(RestRequest.Method.POST, "/_slm/stop", this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return "slm_stop_action";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||||
|
StopSLMAction.Request request = new StopSLMAction.Request();
|
||||||
|
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
|
||||||
|
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||||
|
return channel -> client.execute(StopSLMAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
|
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||||
|
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.GetSLMStatusAction;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class TransportGetSLMStatusAction extends TransportMasterNodeAction<GetSLMStatusAction.Request, GetSLMStatusAction.Response> {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public TransportGetSLMStatusAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||||
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||||
|
super(GetSLMStatusAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||||
|
GetSLMStatusAction.Request::new, indexNameExpressionResolver);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String executor() {
|
||||||
|
return ThreadPool.Names.SAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected GetSLMStatusAction.Response read(StreamInput in) throws IOException {
|
||||||
|
return new GetSLMStatusAction.Response(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void masterOperation(GetSLMStatusAction.Request request,
|
||||||
|
ClusterState state, ActionListener<GetSLMStatusAction.Response> listener) {
|
||||||
|
SnapshotLifecycleMetadata metadata = state.metaData().custom(SnapshotLifecycleMetadata.TYPE);
|
||||||
|
final GetSLMStatusAction.Response response;
|
||||||
|
if (metadata == null) {
|
||||||
|
// no need to actually install metadata just yet, but safe to say it is not stopped
|
||||||
|
response = new GetSLMStatusAction.Response(OperationMode.RUNNING);
|
||||||
|
} else {
|
||||||
|
response = new GetSLMStatusAction.Response(metadata.getOperationMode());
|
||||||
|
}
|
||||||
|
listener.onResponse(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ClusterBlockException checkBlock(GetSLMStatusAction.Request request, ClusterState state) {
|
||||||
|
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||||
|
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.StartSLMAction;
|
||||||
|
import org.elasticsearch.xpack.ilm.OperationModeUpdateTask;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class TransportStartSLMAction extends TransportMasterNodeAction<StartSLMAction.Request, AcknowledgedResponse> {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public TransportStartSLMAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||||
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||||
|
super(StartSLMAction.NAME, transportService, clusterService, threadPool, actionFilters, StartSLMAction.Request::new,
|
||||||
|
indexNameExpressionResolver);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String executor() {
|
||||||
|
return ThreadPool.Names.SAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AcknowledgedResponse read(StreamInput in) throws IOException {
|
||||||
|
return new AcknowledgedResponse(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void masterOperation(StartSLMAction.Request request, ClusterState state,
|
||||||
|
ActionListener<AcknowledgedResponse> listener) {
|
||||||
|
clusterService.submitStateUpdateTask("slm_operation_mode_update",
|
||||||
|
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||||
|
@Override
|
||||||
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
return (OperationModeUpdateTask.slmMode(OperationMode.RUNNING)).execute(currentState);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
||||||
|
return new AcknowledgedResponse(acknowledged);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ClusterBlockException checkBlock(StartSLMAction.Request request, ClusterState state) {
|
||||||
|
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.slm.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||||
|
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||||
|
import org.elasticsearch.xpack.core.slm.action.StopSLMAction;
|
||||||
|
import org.elasticsearch.xpack.ilm.OperationModeUpdateTask;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class TransportStopSLMAction extends TransportMasterNodeAction<StopSLMAction.Request, AcknowledgedResponse> {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public TransportStopSLMAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||||
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||||
|
super(StopSLMAction.NAME, transportService, clusterService, threadPool, actionFilters, StopSLMAction.Request::new,
|
||||||
|
indexNameExpressionResolver);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String executor() {
|
||||||
|
return ThreadPool.Names.SAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AcknowledgedResponse read(StreamInput in) throws IOException {
|
||||||
|
return new AcknowledgedResponse(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void masterOperation(StopSLMAction.Request request, ClusterState state,
|
||||||
|
ActionListener<AcknowledgedResponse> listener) {
|
||||||
|
clusterService.submitStateUpdateTask("slm_operation_mode_update",
|
||||||
|
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||||
|
@Override
|
||||||
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
return (OperationModeUpdateTask.slmMode(OperationMode.STOPPING)).execute(currentState);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
||||||
|
return new AcknowledgedResponse(acknowledged);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ClusterBlockException checkBlock(StopSLMAction.Request request, ClusterState state) {
|
||||||
|
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||||
|
}
|
||||||
|
}
|
|
@ -289,7 +289,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
||||||
|
|
||||||
doAnswer(invocationOnMock -> {
|
doAnswer(invocationOnMock -> {
|
||||||
OperationModeUpdateTask task = (OperationModeUpdateTask) invocationOnMock.getArguments()[1];
|
OperationModeUpdateTask task = (OperationModeUpdateTask) invocationOnMock.getArguments()[1];
|
||||||
assertThat(task.getOperationMode(), equalTo(OperationMode.STOPPED));
|
assertThat(task.getILMOperationMode(), equalTo(OperationMode.STOPPED));
|
||||||
moveToMaintenance.set(true);
|
moveToMaintenance.set(true);
|
||||||
return null;
|
return null;
|
||||||
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
|
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class OperationModeUpdateTaskTests extends ESTestCase {
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
|
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
|
||||||
OperationModeUpdateTask task = new OperationModeUpdateTask(requestMode);
|
OperationModeUpdateTask task = OperationModeUpdateTask.ilmMode(requestMode);
|
||||||
ClusterState newState = task.execute(state);
|
ClusterState newState = task.execute(state);
|
||||||
if (assertSameClusterState) {
|
if (assertSameClusterState) {
|
||||||
assertSame(state, newState);
|
assertSame(state, newState);
|
||||||
|
|
|
@ -50,6 +50,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -402,6 +403,47 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRunManuallyWhileStopping() throws Exception {
|
||||||
|
doTestRunManuallyDuringMode(OperationMode.STOPPING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRunManuallyWhileStopped() throws Exception {
|
||||||
|
doTestRunManuallyDuringMode(OperationMode.STOPPED);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestRunManuallyDuringMode(OperationMode mode) throws Exception {
|
||||||
|
try (ThreadPool threadPool = new TestThreadPool("slm-test");
|
||||||
|
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||||
|
Client noOpClient = new NoOpClient("slm-test")) {
|
||||||
|
final String policyId = "policy";
|
||||||
|
final String repoId = "repo";
|
||||||
|
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?",
|
||||||
|
repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null));
|
||||||
|
|
||||||
|
ClusterState state = createState(mode, policy);
|
||||||
|
ClusterServiceUtils.setState(clusterService, state);
|
||||||
|
|
||||||
|
AtomicBoolean retentionWasRun = new AtomicBoolean(false);
|
||||||
|
MockSnapshotRetentionTask task = new MockSnapshotRetentionTask(noOpClient, clusterService,
|
||||||
|
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, (historyItem) -> { }),
|
||||||
|
threadPool,
|
||||||
|
() -> {
|
||||||
|
retentionWasRun.set(true);
|
||||||
|
return Collections.emptyMap();
|
||||||
|
},
|
||||||
|
(deletionPolicyId, repo, snapId, slmStats, listener) -> { },
|
||||||
|
System::nanoTime);
|
||||||
|
|
||||||
|
long time = System.currentTimeMillis();
|
||||||
|
task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID, time, time));
|
||||||
|
|
||||||
|
assertTrue("retention should be run manually even if SLM is disabled", retentionWasRun.get());
|
||||||
|
|
||||||
|
threadPool.shutdownNow();
|
||||||
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public ClusterState createState(SnapshotLifecyclePolicy... policies) {
|
public ClusterState createState(SnapshotLifecyclePolicy... policies) {
|
||||||
return createState(OperationMode.RUNNING, policies);
|
return createState(OperationMode.RUNNING, policies);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue