[7.x][ML] Fix race condition when force stopping DF analytics job (#57680) (#57717)

When we force delete a DF analytics job, we currently first force
stop it and then we proceed with deleting the job config.
This may result in logging errors if the job config is deleted
before it is retrieved while the job is starting.

Instead of force stopping the job, it would make more sense to
try to stop the job gracefully first. So we now try that out first.
If normal stop fails, then we resort to force stopping the job to
ensure we can go through with the delete.

In addition, this commit introduces `timeout` for the delete action
and makes use of it in the child requests.

Backport of #57680
This commit is contained in:
Dimitris Athanasiou 2020-06-05 17:50:01 +03:00 committed by GitHub
parent c407b0f40d
commit f49a14ce6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 114 additions and 22 deletions

View File

@ -712,6 +712,9 @@ final class MLRequestConverters {
if (deleteRequest.getForce() != null) {
params.putParam("force", Boolean.toString(deleteRequest.getForce()));
}
if (deleteRequest.getTimeout() != null) {
params.withTimeout(deleteRequest.getTimeout());
}
request.addParameters(params.asMap());
return request;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client.ml;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Objects;
import java.util.Optional;
@ -32,6 +33,7 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
private final String id;
private Boolean force;
private TimeValue timeout;
public DeleteDataFrameAnalyticsRequest(String id) {
this.id = id;
@ -55,6 +57,19 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
this.force = force;
}
public TimeValue getTimeout() {
return timeout;
}
/**
* Sets the time to wait until the job is deleted.
*
* @param timeout The time to wait until the job is deleted.
*/
public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}
@Override
public Optional<ValidationException> validate() {
if (id == null) {
@ -69,11 +84,13 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
if (o == null || getClass() != o.getClass()) return false;
DeleteDataFrameAnalyticsRequest other = (DeleteDataFrameAnalyticsRequest) o;
return Objects.equals(id, other.id) && Objects.equals(force, other.force);
return Objects.equals(id, other.id)
&& Objects.equals(force, other.force)
&& Objects.equals(timeout, other.timeout);
}
@Override
public int hashCode() {
return Objects.hash(id, force);
return Objects.hash(id, force, timeout);
}
}

View File

@ -816,9 +816,21 @@ public class MLRequestConvertersTests extends ESTestCase {
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
assertNull(request.getEntity());
assertThat(request.getParameters().size(), equalTo(1));
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
}
public void testDeleteDataFrameAnalytics_WithTimeout() {
DeleteDataFrameAnalyticsRequest deleteRequest = new DeleteDataFrameAnalyticsRequest(randomAlphaOfLength(10));
deleteRequest.setTimeout(TimeValue.timeValueSeconds(10));
Request request = MLRequestConverters.deleteDataFrameAnalytics(deleteRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
assertNull(request.getEntity());
assertThat(request.getParameters().size(), equalTo(1));
assertEquals(request.getParameters().get("timeout"), "10s");
}
public void testEvaluateDataFrame() throws IOException {
EvaluateDataFrameRequest evaluateRequest = EvaluateDataFrameRequestTests.createRandom();
Request request = MLRequestConverters.evaluateDataFrame(evaluateRequest);

View File

@ -3090,9 +3090,10 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
DeleteDataFrameAnalyticsRequest request = new DeleteDataFrameAnalyticsRequest("my-analytics-config"); // <1>
// end::delete-data-frame-analytics-request
//tag::delete-data-frame-analytics-request-force
//tag::delete-data-frame-analytics-request-options
request.setForce(false); // <1>
//end::delete-data-frame-analytics-request-force
request.setTimeout(TimeValue.timeValueMinutes(1)); // <2>
//end::delete-data-frame-analytics-request-options
// tag::delete-data-frame-analytics-execute
AcknowledgedResponse response = client.machineLearning().deleteDataFrameAnalytics(request, RequestOptions.DEFAULT);

View File

@ -27,10 +27,11 @@ The following arguments are optional:
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-force]
include-tagged::{doc-tests-file}[{api}-request-options]
---------------------------------------------------
<1> Use to forcefully delete a job that is not stopped. This method is quicker than stopping
and deleting the job. Defaults to `false`.
<2> Use to set the time to wait until the job is deleted. Defaults to 1 minute.
include::../execution.asciidoc[]

View File

@ -43,6 +43,11 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics]
(Optional, boolean) If `true`, it deletes a job that is not stopped; this method is
quicker than stopping and deleting the job.
`timeout`::
(Optional, <<time-units,time units>>) The time to wait for the job to be deleted.
Defaults to 1 minute.
[[ml-delete-dfanalytics-example]]
==== {api-examples-title}

View File

@ -15,11 +15,13 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedResponse> {
@ -33,6 +35,10 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
public static class Request extends AcknowledgedRequest<Request> {
public static final ParseField FORCE = new ParseField("force");
public static final ParseField TIMEOUT = new ParseField("timeout");
// Default timeout matches that of delete by query
private static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
private String id;
private boolean force;
@ -47,9 +53,12 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
}
}
public Request() {}
public Request() {
timeout(DEFAULT_TIMEOUT);
}
public Request(String id) {
this();
this.id = ExceptionsHelper.requireNonNull(id, DataFrameAnalyticsConfig.ID);
}
@ -75,7 +84,9 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteDataFrameAnalyticsAction.Request request = (DeleteDataFrameAnalyticsAction.Request) o;
return Objects.equals(id, request.id) && force == request.force;
return Objects.equals(id, request.id)
&& force == request.force
&& Objects.equals(timeout, request.timeout);
}
@Override
@ -89,7 +100,7 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
@Override
public int hashCode() {
return Objects.hash(id, force);
return Objects.hash(id, force, timeout);
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction.Request;
@ -18,6 +19,9 @@ public class DeleteDataFrameAnalyticsActionRequestTests extends AbstractWireSeri
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLength(10));
request.setForce(randomBoolean());
if (randomBoolean()) {
request.timeout(TimeValue.parseTimeValue(randomTimeValue(), "test"));
}
return request;
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
@ -26,6 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
@ -109,33 +111,59 @@ public class TransportDeleteDataFrameAnalyticsAction
@Override
protected void masterOperation(Task task, DeleteDataFrameAnalyticsAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
String id = request.getId();
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
if (request.isForce()) {
forceDelete(parentTaskClient, id, listener);
forceDelete(parentTaskClient, request, listener);
} else {
normalDelete(parentTaskClient, state, id, listener);
normalDelete(parentTaskClient, state, request, listener);
}
}
private void forceDelete(ParentTaskAssigningClient parentTaskClient, String id,
private void forceDelete(ParentTaskAssigningClient parentTaskClient, DeleteDataFrameAnalyticsAction.Request request,
ActionListener<AcknowledgedResponse> listener) {
logger.debug("[{}] Force deleting data frame analytics job", id);
logger.debug("[{}] Force deleting data frame analytics job", request.getId());
ActionListener<StopDataFrameAnalyticsAction.Response> stopListener = ActionListener.wrap(
stopResponse -> normalDelete(parentTaskClient, clusterService.state(), id, listener),
stopResponse -> normalDelete(parentTaskClient, clusterService.state(), request, listener),
listener::onFailure
);
StopDataFrameAnalyticsAction.Request request = new StopDataFrameAnalyticsAction.Request(id);
request.setForce(true);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, request, stopListener);
stopJob(parentTaskClient, request, stopListener);
}
private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state, String id,
ActionListener<AcknowledgedResponse> listener) {
private void stopJob(ParentTaskAssigningClient parentTaskClient, DeleteDataFrameAnalyticsAction.Request request,
ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
// We first try to stop the job normally. Normal stop returns after the job was stopped.
// If that fails then we proceed to force stopping which returns as soon as the persistent task is removed.
// If we just did force stopping, then there is a chance we proceed to delete the config while it's
// still used from the running task which results in logging errors.
StopDataFrameAnalyticsAction.Request stopRequest = new StopDataFrameAnalyticsAction.Request(request.getId());
stopRequest.setTimeout(request.timeout());
ActionListener<StopDataFrameAnalyticsAction.Response> normalStopListener = ActionListener.wrap(
listener::onResponse,
normalStopFailure -> {
stopRequest.setForce(true);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, stopRequest, ActionListener.wrap(
listener::onResponse,
forceStopFailure -> {
logger.error(new ParameterizedMessage("[{}] Failed to stop normally", request.getId()), normalStopFailure);
logger.error(new ParameterizedMessage("[{}] Failed to stop forcefully", request.getId()), forceStopFailure);
listener.onFailure(forceStopFailure);
}
));
}
);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, stopRequest, normalStopListener);
}
private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state,
DeleteDataFrameAnalyticsAction.Request request, ActionListener<AcknowledgedResponse> listener) {
String id = request.getId();
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
DataFrameAnalyticsState taskState = MlTasks.getDataFrameAnalyticsState(id, tasks);
if (taskState != DataFrameAnalyticsState.STOPPED) {
@ -178,14 +206,14 @@ public class TransportDeleteDataFrameAnalyticsAction
logger.warn("[{}] DBQ failure: {}", id, failure);
}
}
deleteStats(parentTaskClient, id, deleteStatsHandler);
deleteStats(parentTaskClient, id, request.timeout(), deleteStatsHandler);
},
listener::onFailure
);
// Step 2. Delete state
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
config -> deleteState(parentTaskClient, config, deleteStateHandler),
config -> deleteState(parentTaskClient, config, request.timeout(), deleteStateHandler),
listener::onFailure
);
@ -214,6 +242,7 @@ public class TransportDeleteDataFrameAnalyticsAction
private void deleteState(ParentTaskAssigningClient parentTaskClient,
DataFrameAnalyticsConfig config,
TimeValue timeout,
ActionListener<BulkByScrollResponse> listener) {
List<String> ids = new ArrayList<>();
ids.add(StoredProgress.documentId(config.getId()));
@ -224,22 +253,25 @@ public class TransportDeleteDataFrameAnalyticsAction
parentTaskClient,
AnomalyDetectorsIndex.jobStateIndexPattern(),
QueryBuilders.idsQuery().addIds(ids.toArray(new String[0])),
timeout,
listener
);
}
private void deleteStats(ParentTaskAssigningClient parentTaskClient,
String jobId,
TimeValue timeout,
ActionListener<BulkByScrollResponse> listener) {
executeDeleteByQuery(
parentTaskClient,
MlStatsIndex.indexPattern(),
QueryBuilders.termQuery(Fields.JOB_ID.getPreferredName(), jobId),
timeout,
listener
);
}
private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query,
private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query, TimeValue timeout,
ActionListener<BulkByScrollResponse> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest(index);
request.setQuery(query);
@ -247,6 +279,7 @@ public class TransportDeleteDataFrameAnalyticsAction
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
request.setAbortOnVersionConflict(false);
request.setRefresh(true);
request.setTimeout(timeout);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener);
}

View File

@ -37,6 +37,7 @@ public class RestDeleteDataFrameAnalyticsAction extends BaseRestHandler {
String id = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id);
request.setForce(restRequest.paramAsBoolean(DeleteDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce()));
request.timeout(restRequest.paramAsTime(DeleteDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), request.timeout()));
return channel -> client.execute(DeleteDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -26,6 +26,10 @@
"type":"boolean",
"description":"True if the job should be forcefully deleted",
"default":false
},
"timeout":{
"type":"time",
"description":"Controls the time to wait until a job is deleted. Defaults to 1 minute"
}
}
}