[7.x] [ML] Do not mark the DFA job as FAILED when a failure occurs after the node is shutdown (#61331) (#61526)

This commit is contained in:
Przemysław Witek 2020-08-26 09:53:13 +02:00 committed by GitHub
parent 5fa839b906
commit 11c2710e7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 6 deletions

View File

@ -718,8 +718,9 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider, MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider,
dataFrameAnalyticsConfigProvider); dataFrameAnalyticsConfigProvider);
this.memoryTracker.set(memoryTracker); this.memoryTracker.set(memoryTracker);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, MlLifeCycleService mlLifeCycleService =
autodetectProcessManager, memoryTracker); new MlLifeCycleService(
environment, clusterService, datafeedManager, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService); new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService);

View File

@ -9,6 +9,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
@ -22,14 +23,17 @@ public class MlLifeCycleService {
private final ClusterService clusterService; private final ClusterService clusterService;
private final DatafeedManager datafeedManager; private final DatafeedManager datafeedManager;
private final AutodetectProcessManager autodetectProcessManager; private final AutodetectProcessManager autodetectProcessManager;
private final DataFrameAnalyticsManager analyticsManager;
private final MlMemoryTracker memoryTracker; private final MlMemoryTracker memoryTracker;
public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager, public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager,
AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) { AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager,
MlMemoryTracker memoryTracker) {
this.environment = environment; this.environment = environment;
this.clusterService = clusterService; this.clusterService = clusterService;
this.datafeedManager = datafeedManager; this.datafeedManager = datafeedManager;
this.autodetectProcessManager = autodetectProcessManager; this.autodetectProcessManager = autodetectProcessManager;
this.analyticsManager = analyticsManager;
this.memoryTracker = memoryTracker; this.memoryTracker = memoryTracker;
clusterService.addLifecycleListener(new LifecycleListener() { clusterService.addLifecycleListener(new LifecycleListener() {
@Override @Override
@ -42,6 +46,9 @@ public class MlLifeCycleService {
public synchronized void stop() { public synchronized void stop() {
try { try {
if (MachineLearningFeatureSet.isRunningOnMlPlatform(false)) { if (MachineLearningFeatureSet.isRunningOnMlPlatform(false)) {
// This prevents data frame analytics from being marked as failed due to exceptions occurring while the node is shutting
// down.
analyticsManager.markNodeAsShuttingDown();
// This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the // This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the
// datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds // datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds
// could fail if they send data to a dead autodetect process. // could fail if they send data to a dead autodetect process.

View File

@ -50,6 +50,7 @@ import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import java.time.Clock; import java.time.Clock;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -67,6 +68,8 @@ public class DataFrameAnalyticsManager {
private final AnalyticsProcessManager processManager; private final AnalyticsProcessManager processManager;
private final DataFrameAnalyticsAuditor auditor; private final DataFrameAnalyticsAuditor auditor;
private final IndexNameExpressionResolver expressionResolver; private final IndexNameExpressionResolver expressionResolver;
/** Indicates whether the node is shutting down. */
private final AtomicBoolean nodeShuttingDown = new AtomicBoolean();
public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProvider configProvider, public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProvider configProvider,
AnalyticsProcessManager processManager, DataFrameAnalyticsAuditor auditor, AnalyticsProcessManager processManager, DataFrameAnalyticsAuditor auditor,
@ -392,4 +395,12 @@ public class DataFrameAnalyticsManager {
public void stop(DataFrameAnalyticsTask task) { public void stop(DataFrameAnalyticsTask task) {
processManager.stop(task); processManager.stop(task);
} }
public boolean isNodeShuttingDown() {
return nodeShuttingDown.get();
}
public void markNodeAsShuttingDown() {
nodeShuttingDown.set(true);
}
} }

View File

@ -32,8 +32,10 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.tasks.TaskResult; import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
@ -106,6 +108,14 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
return statsHolder; return statsHolder;
} }
@Override
protected void init(PersistentTasksService persistentTasksService,
TaskManager taskManager,
String persistentTaskId,
long allocationId) {
super.init(persistentTasksService, taskManager, persistentTaskId, allocationId);
}
@Override @Override
protected void onCancelled() { protected void onCancelled() {
stop(getReasonCancelled(), StopDataFrameAnalyticsAction.DEFAULT_TIMEOUT); stop(getReasonCancelled(), StopDataFrameAnalyticsAction.DEFAULT_TIMEOUT);
@ -200,10 +210,16 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
} }
public void setFailed(Exception error) { public void setFailed(Exception error) {
if (analyticsManager.isNodeShuttingDown()) {
LOGGER.warn(
new ParameterizedMessage("[{}] *Not* setting task to failed because the node is being shutdown", taskParams.getId()),
error);
return;
}
LOGGER.error(new ParameterizedMessage("[{}] Setting task to failed", taskParams.getId()), error); LOGGER.error(new ParameterizedMessage("[{}] Setting task to failed", taskParams.getId()), error);
String reason = ExceptionsHelper.unwrapCause(error).getMessage(); String reason = ExceptionsHelper.unwrapCause(error).getMessage();
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, DataFrameAnalyticsTaskState newTaskState =
getAllocationId(), reason); new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, getAllocationId(), reason);
updatePersistentTaskState( updatePersistentTaskState(
newTaskState, newTaskState,
ActionListener.wrap( ActionListener.wrap(
@ -370,5 +386,4 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
} }
return StartingState.RESUMING_ANALYZING; return StartingState.RESUMING_ANALYZING;
} }
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.dataframe; package org.elasticsearch.xpack.ml.dataframe;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -12,16 +13,25 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsActionResponseTests; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsActionResponseTests;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask.StartingState; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask.StartingState;
import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -33,9 +43,13 @@ import java.util.List;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class DataFrameAnalyticsTaskTests extends ESTestCase { public class DataFrameAnalyticsTaskTests extends ESTestCase {
@ -155,6 +169,57 @@ public class DataFrameAnalyticsTaskTests extends ESTestCase {
".ml-state-dummy"); ".ml-state-dummy");
} }
public void testSetFailed() {
testSetFailed(false);
}
public void testSetFailedDuringNodeShutdown() {
testSetFailed(true);
}
private void testSetFailed(boolean nodeShuttingDown) {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
DataFrameAnalyticsManager analyticsManager = mock(DataFrameAnalyticsManager.class);
when(analyticsManager.isNodeShuttingDown()).thenReturn(nodeShuttingDown);
DataFrameAnalyticsAuditor auditor = mock(DataFrameAnalyticsAuditor.class);
PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, mock(ThreadPool.class), client);
TaskManager taskManager = mock(TaskManager.class);
StartDataFrameAnalyticsAction.TaskParams taskParams =
new StartDataFrameAnalyticsAction.TaskParams(
"job-id",
Version.CURRENT,
Arrays.asList(
new PhaseProgress(ProgressTracker.REINDEXING, 0),
new PhaseProgress(ProgressTracker.LOADING_DATA, 0),
new PhaseProgress(ProgressTracker.WRITING_RESULTS, 0)),
false);
DataFrameAnalyticsTask task =
new DataFrameAnalyticsTask(
123, "type", "action", null, Collections.emptyMap(), client, clusterService, analyticsManager, auditor, taskParams);
task.init(persistentTasksService, taskManager, "task-id", 42);
Exception exception = new Exception("some exception");
task.setFailed(exception);
verify(analyticsManager).isNodeShuttingDown();
verify(client, atLeastOnce()).settings();
verify(client, atLeastOnce()).threadPool();
if (nodeShuttingDown == false) {
verify(client).execute(
same(UpdatePersistentTaskStatusAction.INSTANCE),
eq(new UpdatePersistentTaskStatusAction.Request(
"task-id", 42, new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, 42, "some exception"))),
any());
}
verifyNoMoreInteractions(client, clusterService, analyticsManager, auditor, taskManager);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) { private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> { return invocationOnMock -> {

View File

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe.process;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
public class DataFrameAnalyticsManagerTests extends ESTestCase {
public void testNodeShuttingDown() {
DataFrameAnalyticsManager manager =
new DataFrameAnalyticsManager(
mock(NodeClient.class),
mock(DataFrameAnalyticsConfigProvider.class),
mock(AnalyticsProcessManager.class),
mock(DataFrameAnalyticsAuditor.class),
mock(IndexNameExpressionResolver.class));
assertThat(manager.isNodeShuttingDown(), is(false));
manager.markNodeAsShuttingDown();
assertThat(manager.isNodeShuttingDown(), is(true));
}
}