[7.x][ML] Extract DataFrameAnalyticsTask into its own class (#46402) (#46426)

This refactors `DataFrameAnalyticsTask` into its own class.
The task has quite a lot of functionality now and I believe it would
make code more readable to have it live as its own class rather than
an inner class of the start action class.

Backport of #46402
This commit is contained in:
Dimitris Athanasiou 2019-09-06 14:13:46 +03:00 committed by GitHub
parent 27889b3d98
commit a6834068e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 280 additions and 251 deletions

View File

@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import java.util.ArrayList;
import java.util.List;
@ -138,7 +138,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
protected SearchResponse searchStoredProgress(String id) {
return client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask.progressDocId(id)))
.setQuery(QueryBuilders.idsQuery().addIds(DataFrameAnalyticsTask.progressDocId(id)))
.get();
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
@ -161,7 +162,7 @@ public class TransportDeleteDataFrameAnalyticsAction
ActionListener<BulkByScrollResponse> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
request.setQuery(QueryBuilders.idsQuery().addIds(
TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask.progressDocId(analyticsId)));
DataFrameAnalyticsTask.progressDocId(analyticsId)));
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
request.setAbortOnVersionConflict(false);

View File

@ -46,7 +46,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import java.io.IOException;

View File

@ -7,22 +7,14 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
@ -33,16 +25,12 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.AllocatedPersistentTask;
@ -53,7 +41,6 @@ import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
@ -61,7 +48,6 @@ import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.EstimateMemoryUsageAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -69,13 +55,11 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.core.watcher.watch.Payload;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.MappingsMerger;
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
@ -83,16 +67,12 @@ import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
@ -409,226 +389,6 @@ public class TransportStartDataFrameAnalyticsAction
);
}
public static class DataFrameAnalyticsTask extends AllocatedPersistentTask implements StartDataFrameAnalyticsAction.TaskMatcher {
private final Client client;
private final ClusterService clusterService;
private final DataFrameAnalyticsManager analyticsManager;
private final StartDataFrameAnalyticsAction.TaskParams taskParams;
@Nullable
private volatile Long reindexingTaskId;
private volatile boolean isReindexingFinished;
private volatile boolean isStopping;
private final ProgressTracker progressTracker = new ProgressTracker();
public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers,
Client client, ClusterService clusterService, DataFrameAnalyticsManager analyticsManager,
StartDataFrameAnalyticsAction.TaskParams taskParams) {
super(id, type, action, MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + taskParams.getId(), parentTask, headers);
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.analyticsManager = Objects.requireNonNull(analyticsManager);
this.taskParams = Objects.requireNonNull(taskParams);
}
public StartDataFrameAnalyticsAction.TaskParams getParams() {
return taskParams;
}
public void setReindexingTaskId(Long reindexingTaskId) {
this.reindexingTaskId = reindexingTaskId;
}
public void setReindexingFinished() {
isReindexingFinished = true;
}
public boolean isStopping() {
return isStopping;
}
public ProgressTracker getProgressTracker() {
return progressTracker;
}
@Override
protected void onCancelled() {
stop(getReasonCancelled(), TimeValue.ZERO);
}
@Override
public void markAsCompleted() {
persistProgress(() -> super.markAsCompleted());
}
@Override
public void markAsFailed(Exception e) {
persistProgress(() -> super.markAsFailed(e));
}
public void stop(String reason, TimeValue timeout) {
isStopping = true;
ActionListener<Void> reindexProgressListener = ActionListener.wrap(
aVoid -> doStop(reason, timeout),
e -> {
LOGGER.error(new ParameterizedMessage("[{}] Error updating reindexing progress", taskParams.getId()), e);
// We should log the error but it shouldn't stop us from stopping the task
doStop(reason, timeout);
}
);
// We need to update reindexing progress before we cancel the task
updateReindexTaskProgress(reindexProgressListener);
}
private void doStop(String reason, TimeValue timeout) {
if (reindexingTaskId != null) {
cancelReindexingTask(reason, timeout);
}
analyticsManager.stop(this);
}
private void cancelReindexingTask(String reason, TimeValue timeout) {
TaskId reindexTaskId = new TaskId(clusterService.localNode().getId(), reindexingTaskId);
LOGGER.debug("[{}] Cancelling reindex task [{}]", taskParams.getId(), reindexTaskId);
CancelTasksRequest cancelReindex = new CancelTasksRequest();
cancelReindex.setTaskId(reindexTaskId);
cancelReindex.setReason(reason);
cancelReindex.setTimeout(timeout);
CancelTasksResponse cancelReindexResponse = client.admin().cluster().cancelTasks(cancelReindex).actionGet();
Throwable firstError = null;
if (cancelReindexResponse.getNodeFailures().isEmpty() == false) {
firstError = cancelReindexResponse.getNodeFailures().get(0).getRootCause();
}
if (cancelReindexResponse.getTaskFailures().isEmpty() == false) {
firstError = cancelReindexResponse.getTaskFailures().get(0).getCause();
}
// There is a chance that the task is finished by the time we cancel it in which case we'll get
// a ResourceNotFoundException which we can ignore.
if (firstError != null && firstError instanceof ResourceNotFoundException == false) {
throw ExceptionsHelper.serverError("[" + taskParams.getId() + "] Error cancelling reindex task", firstError);
} else {
LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId());
}
}
public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
updatePersistentTaskState(newTaskState, ActionListener.wrap(
updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state),
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]",
getParams().getId(), state, reason), e)
));
}
public void updateReindexTaskProgress(ActionListener<Void> listener) {
TaskId reindexTaskId = getReindexTaskId();
if (reindexTaskId == null) {
// The task is not present which means either it has not started yet or it finished.
// We keep track of whether the task has finished so we can use that to tell whether the progress 100.
if (isReindexingFinished) {
progressTracker.reindexingPercent.set(100);
}
listener.onResponse(null);
return;
}
GetTaskRequest getTaskRequest = new GetTaskRequest();
getTaskRequest.setTaskId(reindexTaskId);
client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(
taskResponse -> {
TaskResult taskResult = taskResponse.getTask();
BulkByScrollTask.Status taskStatus = (BulkByScrollTask.Status) taskResult.getTask().getStatus();
int progress = taskStatus.getTotal() == 0 ? 0 : (int) (taskStatus.getCreated() * 100.0 / taskStatus.getTotal());
progressTracker.reindexingPercent.set(progress);
listener.onResponse(null);
},
error -> {
if (error instanceof ResourceNotFoundException) {
// The task is not present which means either it has not started yet or it finished.
// We keep track of whether the task has finished so we can use that to tell whether the progress 100.
if (isReindexingFinished) {
progressTracker.reindexingPercent.set(100);
}
listener.onResponse(null);
} else {
listener.onFailure(error);
}
}
));
}
@Nullable
private TaskId getReindexTaskId() {
try {
return new TaskId(clusterService.localNode().getId(), reindexingTaskId);
} catch (NullPointerException e) {
// This may happen if there is no reindexing task id set which means we either never started the task yet or we're finished
return null;
}
}
private void persistProgress(Runnable runnable) {
GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(taskParams.getId());
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap(
statsResponse -> {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = statsResponse.getResponse().results().get(0);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
indexRequest.id(progressDocId(taskParams.getId()));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
indexRequest.source(jsonBuilder);
}
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
indexResponse -> {
LOGGER.debug("[{}] Successfully indexed progress document", taskParams.getId());
runnable.run();
},
indexError -> {
LOGGER.error(new ParameterizedMessage(
"[{}] cannot persist progress as an error occurred while indexing", taskParams.getId()), indexError);
runnable.run();
}
));
},
e -> {
LOGGER.error(new ParameterizedMessage(
"[{}] cannot persist progress as an error occurred while retrieving stats", taskParams.getId()), e);
runnable.run();
}
));
}
public static String progressDocId(String id) {
return "data_frame_analytics-" + id + "-progress";
}
public static class ProgressTracker {
public static final String REINDEXING = "reindexing";
public static final String LOADING_DATA = "loading_data";
public static final String ANALYZING = "analyzing";
public static final String WRITING_RESULTS = "writing_results";
public final AtomicInteger reindexingPercent = new AtomicInteger(0);
public final AtomicInteger loadingDataPercent = new AtomicInteger(0);
public final AtomicInteger analyzingPercent = new AtomicInteger(0);
public final AtomicInteger writingResultsPercent = new AtomicInteger(0);
public List<PhaseProgress> report() {
return Arrays.asList(
new PhaseProgress(REINDEXING, reindexingPercent.get()),
new PhaseProgress(LOADING_DATA, loadingDataPercent.get()),
new PhaseProgress(ANALYZING, analyzingPercent.get()),
new PhaseProgress(WRITING_RESULTS, writingResultsPercent.get())
);
}
}
}
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) {
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
String[] concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indexNames);

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import java.util.ArrayList;
@ -50,7 +51,7 @@ import java.util.stream.Collectors;
* TODO Add to the upgrade mode action
*/
public class TransportStopDataFrameAnalyticsAction
extends TransportTasksAction<TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask, StopDataFrameAnalyticsAction.Request,
extends TransportTasksAction<DataFrameAnalyticsTask, StopDataFrameAnalyticsAction.Request,
StopDataFrameAnalyticsAction.Response, StopDataFrameAnalyticsAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportStopDataFrameAnalyticsAction.class);
@ -222,7 +223,7 @@ public class TransportStopDataFrameAnalyticsAction
@Override
protected void taskOperation(StopDataFrameAnalyticsAction.Request request,
TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task,
DataFrameAnalyticsTask task,
ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
DataFrameAnalyticsTaskState stoppingState =
new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STOPPING, task.getAllocationId(), null);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessManager;

View File

@ -0,0 +1,268 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.core.watcher.watch.Payload;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements StartDataFrameAnalyticsAction.TaskMatcher {
private static final Logger LOGGER = LogManager.getLogger(DataFrameAnalyticsTask.class);
private final Client client;
private final ClusterService clusterService;
private final DataFrameAnalyticsManager analyticsManager;
private final StartDataFrameAnalyticsAction.TaskParams taskParams;
@Nullable
private volatile Long reindexingTaskId;
private volatile boolean isReindexingFinished;
private volatile boolean isStopping;
private final ProgressTracker progressTracker = new ProgressTracker();
public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers,
Client client, ClusterService clusterService, DataFrameAnalyticsManager analyticsManager,
StartDataFrameAnalyticsAction.TaskParams taskParams) {
super(id, type, action, MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + taskParams.getId(), parentTask, headers);
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.analyticsManager = Objects.requireNonNull(analyticsManager);
this.taskParams = Objects.requireNonNull(taskParams);
}
public StartDataFrameAnalyticsAction.TaskParams getParams() {
return taskParams;
}
public void setReindexingTaskId(Long reindexingTaskId) {
this.reindexingTaskId = reindexingTaskId;
}
public void setReindexingFinished() {
isReindexingFinished = true;
}
public boolean isStopping() {
return isStopping;
}
public ProgressTracker getProgressTracker() {
return progressTracker;
}
@Override
protected void onCancelled() {
stop(getReasonCancelled(), TimeValue.ZERO);
}
@Override
public void markAsCompleted() {
persistProgress(() -> super.markAsCompleted());
}
@Override
public void markAsFailed(Exception e) {
persistProgress(() -> super.markAsFailed(e));
}
public void stop(String reason, TimeValue timeout) {
isStopping = true;
ActionListener<Void> reindexProgressListener = ActionListener.wrap(
aVoid -> doStop(reason, timeout),
e -> {
LOGGER.error(new ParameterizedMessage("[{}] Error updating reindexing progress", taskParams.getId()), e);
// We should log the error but it shouldn't stop us from stopping the task
doStop(reason, timeout);
}
);
// We need to update reindexing progress before we cancel the task
updateReindexTaskProgress(reindexProgressListener);
}
private void doStop(String reason, TimeValue timeout) {
if (reindexingTaskId != null) {
cancelReindexingTask(reason, timeout);
}
analyticsManager.stop(this);
}
private void cancelReindexingTask(String reason, TimeValue timeout) {
TaskId reindexTaskId = new TaskId(clusterService.localNode().getId(), reindexingTaskId);
LOGGER.debug("[{}] Cancelling reindex task [{}]", taskParams.getId(), reindexTaskId);
CancelTasksRequest cancelReindex = new CancelTasksRequest();
cancelReindex.setTaskId(reindexTaskId);
cancelReindex.setReason(reason);
cancelReindex.setTimeout(timeout);
CancelTasksResponse cancelReindexResponse = client.admin().cluster().cancelTasks(cancelReindex).actionGet();
Throwable firstError = null;
if (cancelReindexResponse.getNodeFailures().isEmpty() == false) {
firstError = cancelReindexResponse.getNodeFailures().get(0).getRootCause();
}
if (cancelReindexResponse.getTaskFailures().isEmpty() == false) {
firstError = cancelReindexResponse.getTaskFailures().get(0).getCause();
}
// There is a chance that the task is finished by the time we cancel it in which case we'll get
// a ResourceNotFoundException which we can ignore.
if (firstError != null && firstError instanceof ResourceNotFoundException == false) {
throw ExceptionsHelper.serverError("[" + taskParams.getId() + "] Error cancelling reindex task", firstError);
} else {
LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId());
}
}
public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
updatePersistentTaskState(newTaskState, ActionListener.wrap(
updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state),
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]",
getParams().getId(), state, reason), e)
));
}
public void updateReindexTaskProgress(ActionListener<Void> listener) {
TaskId reindexTaskId = getReindexTaskId();
if (reindexTaskId == null) {
// The task is not present which means either it has not started yet or it finished.
// We keep track of whether the task has finished so we can use that to tell whether the progress 100.
if (isReindexingFinished) {
progressTracker.reindexingPercent.set(100);
}
listener.onResponse(null);
return;
}
GetTaskRequest getTaskRequest = new GetTaskRequest();
getTaskRequest.setTaskId(reindexTaskId);
client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(
taskResponse -> {
TaskResult taskResult = taskResponse.getTask();
BulkByScrollTask.Status taskStatus = (BulkByScrollTask.Status) taskResult.getTask().getStatus();
int progress = taskStatus.getTotal() == 0 ? 0 : (int) (taskStatus.getCreated() * 100.0 / taskStatus.getTotal());
progressTracker.reindexingPercent.set(progress);
listener.onResponse(null);
},
error -> {
if (error instanceof ResourceNotFoundException) {
// The task is not present which means either it has not started yet or it finished.
// We keep track of whether the task has finished so we can use that to tell whether the progress 100.
if (isReindexingFinished) {
progressTracker.reindexingPercent.set(100);
}
listener.onResponse(null);
} else {
listener.onFailure(error);
}
}
));
}
@Nullable
private TaskId getReindexTaskId() {
try {
return new TaskId(clusterService.localNode().getId(), reindexingTaskId);
} catch (NullPointerException e) {
// This may happen if there is no reindexing task id set which means we either never started the task yet or we're finished
return null;
}
}
private void persistProgress(Runnable runnable) {
GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(taskParams.getId());
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap(
statsResponse -> {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = statsResponse.getResponse().results().get(0);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
indexRequest.id(progressDocId(taskParams.getId()));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
indexRequest.source(jsonBuilder);
}
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
indexResponse -> {
LOGGER.debug("[{}] Successfully indexed progress document", taskParams.getId());
runnable.run();
},
indexError -> {
LOGGER.error(new ParameterizedMessage(
"[{}] cannot persist progress as an error occurred while indexing", taskParams.getId()), indexError);
runnable.run();
}
));
},
e -> {
LOGGER.error(new ParameterizedMessage(
"[{}] cannot persist progress as an error occurred while retrieving stats", taskParams.getId()), e);
runnable.run();
}
));
}
public static String progressDocId(String id) {
return "data_frame_analytics-" + id + "-progress";
}
public static class ProgressTracker {
public static final String REINDEXING = "reindexing";
public static final String LOADING_DATA = "loading_data";
public static final String ANALYZING = "analyzing";
public static final String WRITING_RESULTS = "writing_results";
public final AtomicInteger reindexingPercent = new AtomicInteger(0);
public final AtomicInteger loadingDataPercent = new AtomicInteger(0);
public final AtomicInteger analyzingPercent = new AtomicInteger(0);
public final AtomicInteger writingResultsPercent = new AtomicInteger(0);
public List<PhaseProgress> report() {
return Arrays.asList(
new PhaseProgress(REINDEXING, reindexingPercent.get()),
new PhaseProgress(LOADING_DATA, loadingDataPercent.get()),
new PhaseProgress(ANALYZING, analyzingPercent.get()),
new PhaseProgress(WRITING_RESULTS, writingResultsPercent.get())
);
}
}
}

View File

@ -19,7 +19,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.process.customprocessing.CustomProcessor;

View File

@ -9,7 +9,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask.ProgressTracker;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask.ProgressTracker;
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.process;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask.ProgressTracker;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask.ProgressTracker;
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;
import org.junit.Before;