diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 8a320749fc8..2a9ca0e1552 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -718,8 +718,9 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider, dataFrameAnalyticsConfigProvider); this.memoryTracker.set(memoryTracker); - MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, - autodetectProcessManager, memoryTracker); + MlLifeCycleService mlLifeCycleService = + new MlLifeCycleService( + environment, clusterService, datafeedManager, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker); MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java index 7309afa6b3a..241d8f40ec9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java @@ -9,6 +9,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; +import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.NativeControllerHolder; @@ -22,14 +23,17 @@ public class MlLifeCycleService { private final ClusterService clusterService; private final DatafeedManager datafeedManager; private final AutodetectProcessManager autodetectProcessManager; + private final DataFrameAnalyticsManager analyticsManager; private final MlMemoryTracker memoryTracker; public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager, - AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) { + AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager, + MlMemoryTracker memoryTracker) { this.environment = environment; this.clusterService = clusterService; this.datafeedManager = datafeedManager; this.autodetectProcessManager = autodetectProcessManager; + this.analyticsManager = analyticsManager; this.memoryTracker = memoryTracker; clusterService.addLifecycleListener(new LifecycleListener() { @Override @@ -42,6 +46,9 @@ public class MlLifeCycleService { public synchronized void stop() { try { if (MachineLearningFeatureSet.isRunningOnMlPlatform(false)) { + // This prevents data frame analytics from being marked as failed due to exceptions occurring while the node is shutting + // down. + analyticsManager.markNodeAsShuttingDown(); // This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the // datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds // could fail if they send data to a dead autodetect process. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 8095c98bcfa..4c868ad7dfd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -50,6 +50,7 @@ import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import java.time.Clock; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -67,6 +68,8 @@ public class DataFrameAnalyticsManager { private final AnalyticsProcessManager processManager; private final DataFrameAnalyticsAuditor auditor; private final IndexNameExpressionResolver expressionResolver; + /** Indicates whether the node is shutting down. */ + private final AtomicBoolean nodeShuttingDown = new AtomicBoolean(); public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProvider configProvider, AnalyticsProcessManager processManager, DataFrameAnalyticsAuditor auditor, @@ -392,4 +395,12 @@ public class DataFrameAnalyticsManager { public void stop(DataFrameAnalyticsTask task) { processManager.stop(task); } + + public boolean isNodeShuttingDown() { + return nodeShuttingDown.get(); + } + + public void markNodeAsShuttingDown() { + nodeShuttingDown.set(true); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index d046fbc137d..177dfdb313b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -32,8 +32,10 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskResult; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; @@ -106,6 +108,14 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S return statsHolder; } + @Override + protected void init(PersistentTasksService persistentTasksService, + TaskManager taskManager, + String persistentTaskId, + long allocationId) { + super.init(persistentTasksService, taskManager, persistentTaskId, allocationId); + } + @Override protected void onCancelled() { stop(getReasonCancelled(), StopDataFrameAnalyticsAction.DEFAULT_TIMEOUT); @@ -200,10 +210,16 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S } public void setFailed(Exception error) { + if (analyticsManager.isNodeShuttingDown()) { + LOGGER.warn( + new ParameterizedMessage("[{}] *Not* setting task to failed because the node is being shutdown", taskParams.getId()), + error); + return; + } LOGGER.error(new ParameterizedMessage("[{}] Setting task to failed", taskParams.getId()), error); String reason = ExceptionsHelper.unwrapCause(error).getMessage(); - DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, - getAllocationId(), reason); + DataFrameAnalyticsTaskState newTaskState = + new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, getAllocationId(), reason); updatePersistentTaskState( newTaskState, ActionListener.wrap( @@ -370,5 +386,4 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S } return StartingState.RESUMING_ANALYZING; } - } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java index 666456eb63c..1407ece3701 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.dataframe; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; @@ -12,16 +13,25 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsActionResponseTests; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask.StartingState; +import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.stubbing.Answer; @@ -33,9 +43,13 @@ import java.util.List; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class DataFrameAnalyticsTaskTests extends ESTestCase { @@ -155,6 +169,57 @@ public class DataFrameAnalyticsTaskTests extends ESTestCase { ".ml-state-dummy"); } + public void testSetFailed() { + testSetFailed(false); + } + + public void testSetFailedDuringNodeShutdown() { + testSetFailed(true); + } + + private void testSetFailed(boolean nodeShuttingDown) { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + when(client.threadPool()).thenReturn(threadPool); + ClusterService clusterService = mock(ClusterService.class); + DataFrameAnalyticsManager analyticsManager = mock(DataFrameAnalyticsManager.class); + when(analyticsManager.isNodeShuttingDown()).thenReturn(nodeShuttingDown); + DataFrameAnalyticsAuditor auditor = mock(DataFrameAnalyticsAuditor.class); + PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, mock(ThreadPool.class), client); + TaskManager taskManager = mock(TaskManager.class); + + StartDataFrameAnalyticsAction.TaskParams taskParams = + new StartDataFrameAnalyticsAction.TaskParams( + "job-id", + Version.CURRENT, + Arrays.asList( + new PhaseProgress(ProgressTracker.REINDEXING, 0), + new PhaseProgress(ProgressTracker.LOADING_DATA, 0), + new PhaseProgress(ProgressTracker.WRITING_RESULTS, 0)), + false); + DataFrameAnalyticsTask task = + new DataFrameAnalyticsTask( + 123, "type", "action", null, Collections.emptyMap(), client, clusterService, analyticsManager, auditor, taskParams); + task.init(persistentTasksService, taskManager, "task-id", 42); + Exception exception = new Exception("some exception"); + + task.setFailed(exception); + + verify(analyticsManager).isNodeShuttingDown(); + verify(client, atLeastOnce()).settings(); + verify(client, atLeastOnce()).threadPool(); + if (nodeShuttingDown == false) { + verify(client).execute( + same(UpdatePersistentTaskStatusAction.INSTANCE), + eq(new UpdatePersistentTaskStatusAction.Request( + "task-id", 42, new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, 42, "some exception"))), + any()); + } + verifyNoMoreInteractions(client, clusterService, analyticsManager, auditor, taskManager); + } + @SuppressWarnings("unchecked") private static Answer withResponse(Response response) { return invocationOnMock -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameAnalyticsManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameAnalyticsManagerTests.java new file mode 100644 index 00000000000..3ad87df9429 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameAnalyticsManagerTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.process; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager; +import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; + +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; + +public class DataFrameAnalyticsManagerTests extends ESTestCase { + + public void testNodeShuttingDown() { + DataFrameAnalyticsManager manager = + new DataFrameAnalyticsManager( + mock(NodeClient.class), + mock(DataFrameAnalyticsConfigProvider.class), + mock(AnalyticsProcessManager.class), + mock(DataFrameAnalyticsAuditor.class), + mock(IndexNameExpressionResolver.class)); + assertThat(manager.isNodeShuttingDown(), is(false)); + + manager.markNodeAsShuttingDown(); + assertThat(manager.isNodeShuttingDown(), is(true)); + } +}