[ML] Fix minor race condition in dataframe analytics _stop (#53029) (#53164)

Tests have been periodically failing due to a race condition on checking a recently `STOPPED` task's state. The `.ml-state` index is not created until the task has already been transitioned to `STARTED`. This allows the `_start` API call to return. But, if a user (or test) immediately attempts to `_stop` that job, the job could stop and the task removed BEFORE the `.ml-state|stats` indices are created/updated.

This change moves towards the task cleaning up itself in its main execution thread. `stop` flips the flag of the task to `isStopping` and now we check `isStopping` at every necessary method. Allowing the task to gracefully stop.

closes #53007
This commit is contained in:
Benjamin Trent 2020-03-05 09:59:18 -05:00 committed by GitHub
parent 181ee3ae0b
commit af0b1c2860
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 6 deletions

View File

@ -266,7 +266,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
"Finished analysis");
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53007")
public void testStopOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
String sourceIndex = "test-stop-outlier-detection-with-enough-docs-to-scroll";

View File

@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
@ -509,6 +510,10 @@ public class TransportStartDataFrameAnalyticsAction
String[] concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indexNames);
List<String> unavailableIndices = new ArrayList<>(concreteIndices.length);
for (String index : concreteIndices) {
// This is OK as indices are created on demand
if (clusterState.metaData().hasIndex(index) == false) {
continue;
}
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
unavailableIndices.add(index);
@ -571,7 +576,11 @@ public class TransportStartDataFrameAnalyticsAction
String id = params.getId();
List<String> unavailableIndices =
verifyIndicesPrimaryShardsAreActive(clusterState, resolver, AnomalyDetectorsIndex.configIndexName());
verifyIndicesPrimaryShardsAreActive(clusterState,
resolver,
AnomalyDetectorsIndex.configIndexName(),
MlStatsIndex.indexPattern(),
AnomalyDetectorsIndex.jobStateIndexPattern());
if (unavailableIndices.size() != 0) {
String reason = "Not opening data frame analytics job [" + id +
"], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]";

View File

@ -134,6 +134,11 @@ public class DataFrameAnalyticsManager {
}
private void executeStartingJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before starting job.", task.getParams().getId());
task.markAsCompleted();
return;
}
DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING,
task.getAllocationId(), null);
DataFrameAnalyticsTask.StartingState startingState = DataFrameAnalyticsTask.determineStartingState(
@ -163,6 +168,11 @@ public class DataFrameAnalyticsManager {
}
private void executeJobInMiddleOfReindexing(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before restarting reindexing.", task.getParams().getId());
task.markAsCompleted();
return;
}
ClientHelper.executeAsyncWithOrigin(client,
ML_ORIGIN,
DeleteIndexAction.INSTANCE,
@ -182,7 +192,8 @@ public class DataFrameAnalyticsManager {
private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
if (task.isStopping()) {
// The task was requested to stop before we started reindexing
LOGGER.debug("[{}] task is stopping. Marking as complete before starting reindexing and analysis.",
task.getParams().getId());
task.markAsCompleted();
return;
}
@ -190,8 +201,12 @@ public class DataFrameAnalyticsManager {
// Reindexing is complete; start analytics
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
reindexResponse -> {
// If the reindex task is canceled, this listener is called.
// Consequently, we should not signal reindex completion.
if (task.isStopping()) {
LOGGER.debug("[{}] Stopping before starting analytics process", config.getId());
LOGGER.debug("[{}] task is stopping. Marking as complete before marking reindex as finished.",
task.getParams().getId());
task.markAsCompleted();
return;
}
task.setReindexingTaskId(null);
@ -256,13 +271,26 @@ public class DataFrameAnalyticsManager {
}
private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before starting analysis.", task.getParams().getId());
task.markAsCompleted();
return;
}
// Update state to ANALYZING and start process
ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap(
dataExtractorFactory -> {
DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING,
task.getAllocationId(), null);
task.updatePersistentTaskState(analyzingState, ActionListener.wrap(
updatedTask -> processManager.runJob(task, config, dataExtractorFactory),
updatedTask -> {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before starting native process.",
task.getParams().getId());
task.markAsCompleted();
return;
}
processManager.runJob(task, config, dataExtractorFactory);
},
error -> {
Throwable cause = ExceptionsHelper.unwrapCause(error);
if (cause instanceof ResourceNotFoundException) {

View File

@ -103,6 +103,8 @@ public class AnalyticsProcessManager {
ProcessContext processContext = new ProcessContext(config);
synchronized (processContextByAllocation) {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before creating process context.",
task.getParams().getId());
// The task was requested to stop before we created the process context
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
task.markAsCompleted();
@ -329,7 +331,6 @@ public class AnalyticsProcessManager {
processContext.stop();
} else {
LOGGER.debug("[{}] No process context to stop", task.getParams().getId());
task.markAsCompleted();
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.dataframe.process;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
@ -12,6 +13,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests;
@ -28,6 +30,7 @@ import org.junit.Before;
import org.mockito.InOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import static org.hamcrest.Matchers.equalTo;
@ -107,12 +110,15 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
public void testRunJob_TaskIsStopping() {
when(task.isStopping()).thenReturn(true);
when(task.getParams()).thenReturn(
new StartDataFrameAnalyticsAction.TaskParams("data_frame_id", Version.CURRENT, Collections.emptyList(), false));
processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
assertThat(processManager.getProcessContextCount(), equalTo(0));
InOrder inOrder = inOrder(task);
inOrder.verify(task).isStopping();
inOrder.verify(task).getParams();
inOrder.verify(task).markAsCompleted();
verifyNoMoreInteractions(task);
}