mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 21:05:23 +00:00
* Add API to execute SLM retention on-demand (#47405) This is a backport of #47405 This commit adds the `/_slm/_execute_retention` API endpoint. This endpoint kicks off SLM retention and then returns immediately. This in particular allows us to run retention without scheduling it (for entirely manual invocation) or perform a one-off cleanup. This commit also includes HLRC for the new API, and fixes an issue in SLMSnapshotBlockingIntegTests where retention invoked prior to the test completing could resurrect an index the internal test cluster cleanup had already deleted. Resolves #46508 Relates to #43663
This commit is contained in:
parent
6acc5ca8d1
commit
2e3eb4b24e
@ -37,6 +37,7 @@ import org.elasticsearch.client.indexlifecycle.StopILMRequest;
|
||||
import org.elasticsearch.client.slm.DeleteSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.ExecuteSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.ExecuteSnapshotLifecyclePolicyResponse;
|
||||
import org.elasticsearch.client.slm.ExecuteSnapshotLifecycleRetentionRequest;
|
||||
import org.elasticsearch.client.slm.GetSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.GetSnapshotLifecyclePolicyResponse;
|
||||
import org.elasticsearch.client.slm.GetSnapshotLifecycleStatsRequest;
|
||||
@ -467,6 +468,44 @@ public class IndexLifecycleClient {
|
||||
options, ExecuteSnapshotLifecyclePolicyResponse::fromXContent, listener, emptySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute snapshot lifecycle retention
|
||||
* See <pre>
|
||||
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/
|
||||
* java-rest-high-ilm-slm-execute-snapshot-lifecycle-retention.html
|
||||
* </pre>
|
||||
* for more.
|
||||
* @param request the request
|
||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||
* @return the response
|
||||
* @throws IOException in case there is a problem sending the request or parsing back the response
|
||||
*/
|
||||
public AcknowledgedResponse executeSnapshotLifecycleRetention(ExecuteSnapshotLifecycleRetentionRequest request,
|
||||
RequestOptions options) throws IOException {
|
||||
return restHighLevelClient.performRequestAndParseEntity(request, IndexLifecycleRequestConverters::executeSnapshotLifecycleRetention,
|
||||
options, AcknowledgedResponse::fromXContent, emptySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously execute snapshot lifecycle retention
|
||||
* See <pre>
|
||||
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/
|
||||
* java-rest-high-ilm-slm-execute-snapshot-lifecycle-retention.html
|
||||
* </pre>
|
||||
* for more.
|
||||
* @param request the request
|
||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||
* @param listener the listener to be notified upon request completion
|
||||
* @return cancellable that may be used to cancel the request
|
||||
*/
|
||||
public Cancellable executeSnapshotLifecycleRetentionAsync(
|
||||
ExecuteSnapshotLifecycleRetentionRequest request, RequestOptions options,
|
||||
ActionListener<AcknowledgedResponse> listener) {
|
||||
return restHighLevelClient.performRequestAsyncAndParseEntity(
|
||||
request, IndexLifecycleRequestConverters::executeSnapshotLifecycleRetention,
|
||||
options, AcknowledgedResponse::fromXContent, listener, emptySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve snapshot lifecycle statistics.
|
||||
* See <pre>
|
||||
|
@ -34,6 +34,7 @@ import org.elasticsearch.client.indexlifecycle.StartILMRequest;
|
||||
import org.elasticsearch.client.indexlifecycle.StopILMRequest;
|
||||
import org.elasticsearch.client.slm.DeleteSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.ExecuteSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.ExecuteSnapshotLifecycleRetentionRequest;
|
||||
import org.elasticsearch.client.slm.GetSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.GetSnapshotLifecycleStatsRequest;
|
||||
import org.elasticsearch.client.slm.PutSnapshotLifecyclePolicyRequest;
|
||||
@ -217,6 +218,18 @@ final class IndexLifecycleRequestConverters {
|
||||
return request;
|
||||
}
|
||||
|
||||
static Request executeSnapshotLifecycleRetention(ExecuteSnapshotLifecycleRetentionRequest executeSnapshotLifecycleRetentionRequest) {
|
||||
Request request = new Request(HttpPost.METHOD_NAME,
|
||||
new RequestConverters.EndpointBuilder()
|
||||
.addPathPartAsIs("_slm/_execute_retention")
|
||||
.build());
|
||||
RequestConverters.Params params = new RequestConverters.Params();
|
||||
params.withMasterTimeout(executeSnapshotLifecycleRetentionRequest.masterNodeTimeout());
|
||||
params.withTimeout(executeSnapshotLifecycleRetentionRequest.timeout());
|
||||
request.addParameters(params.asMap());
|
||||
return request;
|
||||
}
|
||||
|
||||
static Request getSnapshotLifecycleStats(GetSnapshotLifecycleStatsRequest getSnapshotLifecycleStatsRequest) {
|
||||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_slm/stats").build();
|
||||
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
|
||||
|
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.slm;
|
||||
|
||||
import org.elasticsearch.client.TimedRequest;
|
||||
|
||||
public class ExecuteSnapshotLifecycleRetentionRequest extends TimedRequest {
|
||||
}
|
@ -57,6 +57,7 @@ import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.slm.DeleteSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.ExecuteSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.ExecuteSnapshotLifecyclePolicyResponse;
|
||||
import org.elasticsearch.client.slm.ExecuteSnapshotLifecycleRetentionRequest;
|
||||
import org.elasticsearch.client.slm.GetSnapshotLifecyclePolicyRequest;
|
||||
import org.elasticsearch.client.slm.GetSnapshotLifecyclePolicyResponse;
|
||||
import org.elasticsearch.client.slm.GetSnapshotLifecycleStatsRequest;
|
||||
@ -987,6 +988,44 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
|
||||
// end::slm-delete-snapshot-lifecycle-policy-execute-async
|
||||
|
||||
assertTrue(deleteResp.isAcknowledged());
|
||||
|
||||
//////// EXECUTE RETENTION
|
||||
// tag::slm-execute-snapshot-lifecycle-retention
|
||||
ExecuteSnapshotLifecycleRetentionRequest req =
|
||||
new ExecuteSnapshotLifecycleRetentionRequest();
|
||||
// end::slm-execute-snapshot-lifecycle-retention
|
||||
|
||||
// tag::slm-execute-snapshot-lifecycle-retention-execute
|
||||
AcknowledgedResponse retentionResp =
|
||||
client.indexLifecycle()
|
||||
.executeSnapshotLifecycleRetention(req,
|
||||
RequestOptions.DEFAULT);
|
||||
// end::slm-execute-snapshot-lifecycle-retention-execute
|
||||
|
||||
// tag::slm-execute-snapshot-lifecycle-retention-response
|
||||
final boolean acked = retentionResp.isAcknowledged();
|
||||
// end::slm-execute-snapshot-lifecycle-retention-response
|
||||
|
||||
// tag::slm-execute-snapshot-lifecycle-policy-execute-listener
|
||||
ActionListener<AcknowledgedResponse> retentionListener =
|
||||
new ActionListener<AcknowledgedResponse>() {
|
||||
@Override
|
||||
public void onResponse(AcknowledgedResponse r) {
|
||||
assert r.isAcknowledged(); // <1>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// <2>
|
||||
}
|
||||
};
|
||||
// end::slm-execute-snapshot-lifecycle-retention-execute-listener
|
||||
|
||||
// tag::slm-execute-snapshot-lifecycle-retention-execute-async
|
||||
client.indexLifecycle()
|
||||
.executeSnapshotLifecycleRetentionAsync(req,
|
||||
RequestOptions.DEFAULT, retentionListener);
|
||||
// end::slm-execute-snapshot-lifecycle-retention-execute-async
|
||||
}
|
||||
|
||||
private void assertSnapshotExists(final RestHighLevelClient client, final String repo, final String snapshotName) throws Exception {
|
||||
|
@ -0,0 +1,35 @@
|
||||
--
|
||||
:api: slm-execute-snapshot-lifecycle-retention
|
||||
:request: ExecuteSnapshotLifecycleRetentionRequest
|
||||
:response: AcknowledgedResponse
|
||||
--
|
||||
[role="xpack"]
|
||||
[id="{upid}-{api}"]
|
||||
=== Execute Snapshot Lifecycle Retention API
|
||||
|
||||
|
||||
[id="{upid}-{api}-request"]
|
||||
==== Request
|
||||
|
||||
The Execute Snapshot Lifecycle Retention API allows you to execute Snapshot Lifecycle Management
|
||||
Retention immediately, rather than waiting for its regularly scheduled execution.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests-file}[{api}-request]
|
||||
--------------------------------------------------
|
||||
|
||||
[id="{upid}-{api}-response"]
|
||||
==== Response
|
||||
|
||||
The returned +{response}+ contains a boolean for whether the request was
|
||||
acknowledged by the master node.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests-file}[{api}-response]
|
||||
--------------------------------------------------
|
||||
|
||||
include::../execution.asciidoc[]
|
||||
|
||||
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.slm.action;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class ExecuteSnapshotRetentionAction extends ActionType<AcknowledgedResponse> {
|
||||
public static final ExecuteSnapshotRetentionAction INSTANCE = new ExecuteSnapshotRetentionAction();
|
||||
public static final String NAME = "cluster:admin/slm/execute-retention";
|
||||
|
||||
protected ExecuteSnapshotRetentionAction() {
|
||||
super(NAME, AcknowledgedResponse::new);
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<ExecuteSnapshotRetentionAction.Request> implements ToXContentObject {
|
||||
|
||||
public Request() { }
|
||||
|
||||
public Request(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
@ -65,6 +65,7 @@ import org.elasticsearch.xpack.core.ilm.action.StopILMAction;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.slm.action.DeleteSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotRetentionAction;
|
||||
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleStatsAction;
|
||||
import org.elasticsearch.xpack.core.slm.action.PutSnapshotLifecycleAction;
|
||||
@ -96,11 +97,13 @@ import org.elasticsearch.xpack.slm.SnapshotRetentionService;
|
||||
import org.elasticsearch.xpack.slm.SnapshotRetentionTask;
|
||||
import org.elasticsearch.xpack.slm.action.RestDeleteSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.slm.action.RestExecuteSnapshotRetentionAction;
|
||||
import org.elasticsearch.xpack.slm.action.RestGetSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.slm.action.RestGetSnapshotLifecycleStatsAction;
|
||||
import org.elasticsearch.xpack.slm.action.RestPutSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.slm.action.TransportDeleteSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.slm.action.TransportExecuteSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.slm.action.TransportExecuteSnapshotRetentionAction;
|
||||
import org.elasticsearch.xpack.slm.action.TransportGetSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.slm.action.TransportGetSnapshotLifecycleStatsAction;
|
||||
import org.elasticsearch.xpack.slm.action.TransportPutSnapshotLifecycleAction;
|
||||
@ -247,7 +250,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
||||
new RestDeleteSnapshotLifecycleAction(restController),
|
||||
new RestGetSnapshotLifecycleAction(restController),
|
||||
new RestExecuteSnapshotLifecycleAction(restController),
|
||||
new RestGetSnapshotLifecycleStatsAction(restController)
|
||||
new RestGetSnapshotLifecycleStatsAction(restController),
|
||||
new RestExecuteSnapshotRetentionAction(restController)
|
||||
));
|
||||
}
|
||||
return handlers;
|
||||
@ -276,7 +280,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
||||
new ActionHandler<>(DeleteSnapshotLifecycleAction.INSTANCE, TransportDeleteSnapshotLifecycleAction.class),
|
||||
new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class),
|
||||
new ActionHandler<>(ExecuteSnapshotLifecycleAction.INSTANCE, TransportExecuteSnapshotLifecycleAction.class),
|
||||
new ActionHandler<>(GetSnapshotLifecycleStatsAction.INSTANCE, TransportGetSnapshotLifecycleStatsAction.class)
|
||||
new ActionHandler<>(GetSnapshotLifecycleStatsAction.INSTANCE, TransportGetSnapshotLifecycleStatsAction.class),
|
||||
new ActionHandler<>(ExecuteSnapshotRetentionAction.INSTANCE, TransportExecuteSnapshotRetentionAction.class)
|
||||
));
|
||||
}
|
||||
return actions;
|
||||
|
@ -31,10 +31,13 @@ import java.util.function.Supplier;
|
||||
public class SnapshotRetentionService implements LocalNodeMasterListener, Closeable {
|
||||
|
||||
static final String SLM_RETENTION_JOB_ID = "slm-retention-job";
|
||||
static final String SLM_RETENTION_MANUAL_JOB_ID = "slm-execute-manual-retention-job";
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(SnapshotRetentionService.class);
|
||||
|
||||
private final SchedulerEngine scheduler;
|
||||
private final SnapshotRetentionTask retentionTask;
|
||||
private final Clock clock;
|
||||
|
||||
private volatile String slmRetentionSchedule;
|
||||
private volatile boolean isMaster = false;
|
||||
@ -43,8 +46,10 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
|
||||
Supplier<SnapshotRetentionTask> taskSupplier,
|
||||
ClusterService clusterService,
|
||||
Clock clock) {
|
||||
this.clock = clock;
|
||||
this.scheduler = new SchedulerEngine(settings, clock);
|
||||
this.scheduler.register(taskSupplier.get());
|
||||
this.retentionTask = taskSupplier.get();
|
||||
this.scheduler.register(this.retentionTask);
|
||||
this.slmRetentionSchedule = LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING.get(settings);
|
||||
clusterService.addLocalNodeMasterListener(this);
|
||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
|
||||
@ -91,6 +96,16 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
|
||||
this.scheduler.scheduledJobIds().forEach(this.scheduler::remove);
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually trigger snapshot retention
|
||||
*/
|
||||
public void triggerRetention() {
|
||||
if (this.isMaster) {
|
||||
long now = clock.millis();
|
||||
this.retentionTask.triggered(new SchedulerEngine.Event(SLM_RETENTION_MANUAL_JOB_ID, now, now));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executorName() {
|
||||
return ThreadPool.Names.SNAPSHOT;
|
||||
|
@ -84,8 +84,10 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||
|
||||
@Override
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) :
|
||||
"expected id to be " + SnapshotRetentionService.SLM_RETENTION_JOB_ID + " but it was " + event.getJobName();
|
||||
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) ||
|
||||
event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID):
|
||||
"expected id to be " + SnapshotRetentionService.SLM_RETENTION_JOB_ID + " or " +
|
||||
SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID + " but it was " + event.getJobName();
|
||||
|
||||
final ClusterState state = clusterService.state();
|
||||
if (SnapshotLifecycleService.ilmStoppedOrStopping(state)) {
|
||||
|
@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.slm.action;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotRetentionAction;
|
||||
|
||||
public class RestExecuteSnapshotRetentionAction extends BaseRestHandler {
|
||||
|
||||
public RestExecuteSnapshotRetentionAction(RestController controller) {
|
||||
controller.registerHandler(RestRequest.Method.POST, "/_slm/_execute_retention", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "slm_execute_retention";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
|
||||
ExecuteSnapshotRetentionAction.Request req = new ExecuteSnapshotRetentionAction.Request();
|
||||
req.timeout(request.paramAsTime("timeout", req.timeout()));
|
||||
req.masterNodeTimeout(request.paramAsTime("master_timeout", req.masterNodeTimeout()));
|
||||
return channel -> client.execute(ExecuteSnapshotRetentionAction.INSTANCE, req, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.slm.action;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotRetentionAction;
|
||||
import org.elasticsearch.xpack.slm.SnapshotRetentionService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TransportExecuteSnapshotRetentionAction
|
||||
extends TransportMasterNodeAction<ExecuteSnapshotRetentionAction.Request, AcknowledgedResponse> {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportExecuteSnapshotRetentionAction.class);
|
||||
|
||||
private final SnapshotRetentionService retentionService;
|
||||
|
||||
@Inject
|
||||
public TransportExecuteSnapshotRetentionAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
SnapshotRetentionService retentionService) {
|
||||
super(ExecuteSnapshotRetentionAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
ExecuteSnapshotRetentionAction.Request::new, indexNameExpressionResolver);
|
||||
this.retentionService = retentionService;
|
||||
}
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse read(StreamInput in) throws IOException {
|
||||
return new AcknowledgedResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(final ExecuteSnapshotRetentionAction.Request request,
|
||||
final ClusterState state,
|
||||
final ActionListener<AcknowledgedResponse> listener) {
|
||||
try {
|
||||
logger.info("manually triggering SLM snapshot retention");
|
||||
this.retentionService.triggerRetention();
|
||||
listener.onResponse(new AcknowledgedResponse(true));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(new ElasticsearchException("failed to execute snapshot lifecycle retention", e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(ExecuteSnapshotRetentionAction.Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
|
||||
}
|
||||
}
|
@ -8,23 +8,27 @@ package org.elasticsearch.xpack.slm;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
|
||||
import org.elasticsearch.snapshots.SnapshotMissingException;
|
||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyItem;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration;
|
||||
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.slm.action.ExecuteSnapshotRetentionAction;
|
||||
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.slm.action.PutSnapshotLifecycleAction;
|
||||
import org.elasticsearch.xpack.ilm.IndexLifecycle;
|
||||
@ -43,17 +47,27 @@ import static org.hamcrest.Matchers.greaterThan;
|
||||
/**
|
||||
* Tests for Snapshot Lifecycle Management that require a slow or blocked snapshot repo (using {@link MockRepository}
|
||||
*/
|
||||
@TestLogging(value = "org.elasticsearch.snapshots.mockstore:DEBUG", reason = "d")
|
||||
@TestLogging(value = "org.elasticsearch.snapshots.mockstore:DEBUG",
|
||||
reason = "https://github.com/elastic/elasticsearch/issues/46508")
|
||||
public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
||||
|
||||
private static final String REPO = "repo-id";
|
||||
|
||||
@After
|
||||
public void resetSLMSettings() {
|
||||
// unset retention settings
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder()
|
||||
.put(LifecycleSettings.SLM_RETENTION_SCHEDULE, (String) null)
|
||||
.build())
|
||||
.get();
|
||||
public void resetSLMSettings() throws Exception {
|
||||
// Cancel/delete all snapshots
|
||||
assertBusy(() -> {
|
||||
logger.info("--> wiping all snapshots after test");
|
||||
client().admin().cluster().prepareGetSnapshots(REPO).get().getSnapshots()
|
||||
.forEach(snapshotInfo -> {
|
||||
try {
|
||||
client().admin().cluster().prepareDeleteSnapshot(REPO, snapshotInfo.snapshotId().getName()).get();
|
||||
} catch (Exception e) {
|
||||
logger.warn("exception cleaning up snapshot " + snapshotInfo.snapshotId().getName(), e);
|
||||
fail("exception cleanup up snapshot");
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -96,21 +110,20 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
||||
public void testSnapshotInProgress() throws Exception {
|
||||
final String indexName = "test";
|
||||
final String policyName = "test-policy";
|
||||
final String repoId = "my-repo";
|
||||
int docCount = 20;
|
||||
for (int i = 0; i < docCount; i++) {
|
||||
index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar"));
|
||||
}
|
||||
|
||||
// Create a snapshot repo
|
||||
initializeRepo(repoId);
|
||||
initializeRepo(REPO);
|
||||
|
||||
logger.info("--> creating policy {}", policyName);
|
||||
createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true);
|
||||
createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", REPO, indexName, true);
|
||||
|
||||
logger.info("--> blocking master from completing snapshot");
|
||||
blockAllDataNodes(repoId);
|
||||
blockMasterFromFinalizingSnapshotOnIndexFile(repoId);
|
||||
blockAllDataNodes(REPO);
|
||||
blockMasterFromFinalizingSnapshotOnIndexFile(REPO);
|
||||
|
||||
logger.info("--> executing snapshot lifecycle");
|
||||
final String snapshotName = executePolicy(policyName);
|
||||
@ -132,31 +145,29 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
||||
});
|
||||
|
||||
logger.info("--> unblocking snapshots");
|
||||
unblockAllDataNodes(repoId);
|
||||
unblockRepo(repoId);
|
||||
unblockAllDataNodes(REPO);
|
||||
unblockRepo(REPO);
|
||||
|
||||
// Cancel/delete the snapshot
|
||||
try {
|
||||
client().admin().cluster().prepareDeleteSnapshot(repoId, snapshotName).get();
|
||||
client().admin().cluster().prepareDeleteSnapshot(REPO, snapshotName).get();
|
||||
} catch (SnapshotMissingException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/46508")
|
||||
public void testRetentionWhileSnapshotInProgress() throws Exception {
|
||||
final String indexName = "test";
|
||||
final String policyId = "slm-policy";
|
||||
final String repoId = "slm-repo";
|
||||
int docCount = 20;
|
||||
for (int i = 0; i < docCount; i++) {
|
||||
index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar"));
|
||||
}
|
||||
|
||||
initializeRepo(repoId);
|
||||
initializeRepo(REPO);
|
||||
|
||||
logger.info("--> creating policy {}", policyId);
|
||||
createSnapshotPolicy(policyId, "snap", "1 2 3 4 5 ?", repoId, indexName, true,
|
||||
createSnapshotPolicy(policyId, "snap", "1 2 3 4 5 ?", REPO, indexName, true,
|
||||
new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null));
|
||||
|
||||
// Create a snapshot and wait for it to be complete (need something that can be deleted)
|
||||
@ -165,7 +176,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
SnapshotsStatusResponse s =
|
||||
client().admin().cluster().prepareSnapshotStatus(repoId).setSnapshots(completedSnapshotName).get();
|
||||
client().admin().cluster().prepareSnapshotStatus(REPO).setSnapshots(completedSnapshotName).get();
|
||||
assertThat("expected a snapshot but none were returned", s.getSnapshots().size(), equalTo(1));
|
||||
SnapshotStatus status = s.getSnapshots().get(0);
|
||||
logger.info("--> waiting for snapshot {} to be completed, got: {}", completedSnapshotName, status.getState());
|
||||
@ -185,59 +196,74 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
||||
|
||||
// Take another snapshot, but before doing that, block it from completing
|
||||
logger.info("--> blocking nodes from completing snapshot");
|
||||
blockAllDataNodes(repoId);
|
||||
final String secondSnapName = executePolicy(policyId);
|
||||
|
||||
// Check that the executed snapshot shows up in the SLM output as in_progress
|
||||
assertBusy(() -> {
|
||||
GetSnapshotLifecycleAction.Response getResp =
|
||||
client().execute(GetSnapshotLifecycleAction.INSTANCE, new GetSnapshotLifecycleAction.Request(policyId)).get();
|
||||
logger.info("--> checking for in progress snapshot...");
|
||||
|
||||
assertThat(getResp.getPolicies().size(), greaterThan(0));
|
||||
SnapshotLifecyclePolicyItem item = getResp.getPolicies().get(0);
|
||||
assertNotNull(item.getSnapshotInProgress());
|
||||
SnapshotLifecyclePolicyItem.SnapshotInProgress inProgress = item.getSnapshotInProgress();
|
||||
assertThat(inProgress.getSnapshotId().getName(), equalTo(secondSnapName));
|
||||
assertThat(inProgress.getStartTime(), greaterThan(0L));
|
||||
assertThat(inProgress.getState(), anyOf(equalTo(SnapshotsInProgress.State.INIT), equalTo(SnapshotsInProgress.State.STARTED)));
|
||||
assertNull(inProgress.getFailure());
|
||||
});
|
||||
|
||||
// Run retention every second
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder()
|
||||
.put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?")
|
||||
.build())
|
||||
.get();
|
||||
// Guarantee that retention gets a chance to run before unblocking, I know sleeps are not
|
||||
// ideal, but we don't currently have a way to force retention to run, so waiting at least
|
||||
// a second is the best we can do for now.
|
||||
Thread.sleep(1500);
|
||||
|
||||
logger.info("--> unblocking snapshots");
|
||||
unblockRepo(repoId);
|
||||
unblockAllDataNodes(repoId);
|
||||
|
||||
// Check that the snapshot created by the policy has been removed by retention
|
||||
assertBusy(() -> {
|
||||
// Trigger a cluster state update so that it re-checks for a snapshot in progress
|
||||
client().admin().cluster().prepareReroute().get();
|
||||
logger.info("--> waiting for snapshot to be deleted");
|
||||
try {
|
||||
SnapshotsStatusResponse s =
|
||||
client().admin().cluster().prepareSnapshotStatus(repoId).setSnapshots(completedSnapshotName).get();
|
||||
assertNull("expected no snapshot but one was returned", s.getSnapshots().get(0));
|
||||
} catch (SnapshotMissingException e) {
|
||||
// Great, we wanted it to be deleted!
|
||||
}
|
||||
});
|
||||
|
||||
// Cancel/delete the snapshot
|
||||
try {
|
||||
client().admin().cluster().prepareDeleteSnapshot(repoId, secondSnapName).get();
|
||||
} catch (SnapshotMissingException e) {
|
||||
// ignore
|
||||
blockAllDataNodes(REPO);
|
||||
blockMasterFromFinalizingSnapshotOnIndexFile(REPO);
|
||||
final String secondSnapName = executePolicy(policyId);
|
||||
|
||||
// Check that the executed snapshot shows up in the SLM output as in_progress
|
||||
assertBusy(() -> {
|
||||
GetSnapshotLifecycleAction.Response getResp =
|
||||
client().execute(GetSnapshotLifecycleAction.INSTANCE, new GetSnapshotLifecycleAction.Request(policyId)).get();
|
||||
logger.info("--> checking for in progress snapshot...");
|
||||
|
||||
assertThat(getResp.getPolicies().size(), greaterThan(0));
|
||||
SnapshotLifecyclePolicyItem item = getResp.getPolicies().get(0);
|
||||
assertNotNull(item.getSnapshotInProgress());
|
||||
SnapshotLifecyclePolicyItem.SnapshotInProgress inProgress = item.getSnapshotInProgress();
|
||||
assertThat(inProgress.getSnapshotId().getName(), equalTo(secondSnapName));
|
||||
assertThat(inProgress.getStartTime(), greaterThan(0L));
|
||||
assertThat(inProgress.getState(), anyOf(equalTo(SnapshotsInProgress.State.INIT),
|
||||
equalTo(SnapshotsInProgress.State.STARTED)));
|
||||
assertNull(inProgress.getFailure());
|
||||
});
|
||||
|
||||
// Run retention
|
||||
logger.info("--> triggering retention");
|
||||
assertTrue(client().execute(ExecuteSnapshotRetentionAction.INSTANCE,
|
||||
new ExecuteSnapshotRetentionAction.Request()).get().isAcknowledged());
|
||||
|
||||
logger.info("--> unblocking snapshots");
|
||||
unblockRepo(REPO);
|
||||
unblockAllDataNodes(REPO);
|
||||
|
||||
// Check that the snapshot created by the policy has been removed by retention
|
||||
assertBusy(() -> {
|
||||
// Trigger a cluster state update so that it re-checks for a snapshot in progress
|
||||
client().admin().cluster().prepareReroute().get();
|
||||
logger.info("--> waiting for snapshot to be deleted");
|
||||
try {
|
||||
SnapshotsStatusResponse s =
|
||||
client().admin().cluster().prepareSnapshotStatus(REPO).setSnapshots(completedSnapshotName).get();
|
||||
assertNull("expected no snapshot but one was returned", s.getSnapshots().get(0));
|
||||
} catch (SnapshotMissingException e) {
|
||||
// Great, we wanted it to be deleted!
|
||||
}
|
||||
});
|
||||
|
||||
// Cancel the ongoing snapshot to cancel it
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
logger.info("--> cancelling snapshot {}", secondSnapName);
|
||||
client().admin().cluster().prepareDeleteSnapshot(REPO, secondSnapName).get();
|
||||
} catch (ConcurrentSnapshotExecutionException e) {
|
||||
logger.info("--> attempted to stop second snapshot", e);
|
||||
// just wait and retry
|
||||
fail("attempted to stop second snapshot but a snapshot or delete was in progress");
|
||||
}
|
||||
});
|
||||
|
||||
// Assert that the history document has been written for taking the snapshot and deleting it
|
||||
assertBusy(() -> {
|
||||
SearchResponse resp = client().prepareSearch(".slm-history*")
|
||||
.setQuery(QueryBuilders.matchQuery("snapshot_name", completedSnapshotName)).get();
|
||||
logger.info("--> checking history written for {}, got: {}",
|
||||
completedSnapshotName, Strings.arrayToCommaDelimitedString(resp.getHits().getHits()));
|
||||
assertThat(resp.getHits().getTotalHits().value, equalTo(2L));
|
||||
});
|
||||
} finally {
|
||||
unblockRepo(REPO);
|
||||
unblockAllDataNodes(REPO);
|
||||
}
|
||||
}
|
||||
|
||||
@ -251,13 +277,13 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
||||
.get();
|
||||
}
|
||||
|
||||
private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
|
||||
private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String REPO,
|
||||
String indexPattern, boolean ignoreUnavailable) {
|
||||
createSnapshotPolicy(policyName, snapshotNamePattern, schedule, repoId, indexPattern,
|
||||
createSnapshotPolicy(policyName, snapshotNamePattern, schedule, REPO, indexPattern,
|
||||
ignoreUnavailable, SnapshotRetentionConfiguration.EMPTY);
|
||||
}
|
||||
|
||||
private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
|
||||
private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String REPO,
|
||||
String indexPattern, boolean ignoreUnavailable,
|
||||
SnapshotRetentionConfiguration retention) {
|
||||
Map<String, Object> snapConfig = new HashMap<>();
|
||||
@ -272,7 +298,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
||||
}
|
||||
}
|
||||
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule,
|
||||
repoId, snapConfig, retention);
|
||||
REPO, snapConfig, retention);
|
||||
|
||||
PutSnapshotLifecycleAction.Request putLifecycle = new PutSnapshotLifecycleAction.Request(policyName, policy);
|
||||
try {
|
||||
|
@ -26,6 +26,8 @@ import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
@ -68,14 +70,51 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testManualTriggering() {
|
||||
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
|
||||
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
|
||||
ClockMock clock = new ClockMock();
|
||||
AtomicInteger invoked = new AtomicInteger(0);
|
||||
|
||||
try (ThreadPool threadPool = new TestThreadPool("test");
|
||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
|
||||
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
|
||||
() -> new FakeRetentionTask(event -> {
|
||||
assertThat(event.getJobName(), equalTo(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID));
|
||||
invoked.incrementAndGet();
|
||||
}), clusterService, clock)) {
|
||||
|
||||
service.onMaster();
|
||||
service.triggerRetention();
|
||||
assertThat(invoked.get(), equalTo(1));
|
||||
|
||||
service.offMaster();
|
||||
service.triggerRetention();
|
||||
assertThat(invoked.get(), equalTo(1));
|
||||
|
||||
service.onMaster();
|
||||
service.triggerRetention();
|
||||
assertThat(invoked.get(), equalTo(2));
|
||||
|
||||
threadPool.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private static class FakeRetentionTask extends SnapshotRetentionTask {
|
||||
private final Consumer<SchedulerEngine.Event> onTrigger;
|
||||
|
||||
FakeRetentionTask() {
|
||||
this(evt -> {});
|
||||
}
|
||||
|
||||
FakeRetentionTask(Consumer<SchedulerEngine.Event> onTrigger) {
|
||||
super(fakeClient(), null, System::nanoTime, mock(SnapshotHistoryStore.class), mock(ThreadPool.class));
|
||||
this.onTrigger = onTrigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
super.triggered(event);
|
||||
onTrigger.accept(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,19 @@
|
||||
{
|
||||
"slm.execute_retention":{
|
||||
"documentation":{
|
||||
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/slm-api-execute-retention.html"
|
||||
},
|
||||
"stability":"stable",
|
||||
"url":{
|
||||
"paths":[
|
||||
{
|
||||
"path":"/_slm/_execute_retention",
|
||||
"methods":[
|
||||
"POST"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user