From d5bb574e1ec6d1385b7f06a21f416985f5dbbc3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Tue, 14 Apr 2020 14:09:02 +0200 Subject: [PATCH] [7.x] Unassign DFA tasks in SetUpgradeModeAction (#54523) (#55143) --- .../ml/integration/ClassificationIT.java | 57 +++++++++++++++++++ ...NativeDataFrameAnalyticsIntegTestCase.java | 25 +++++++- .../action/TransportSetUpgradeModeAction.java | 42 +++++++------- ...ransportStartDataFrameAnalyticsAction.java | 2 - ...TransportStopDataFrameAnalyticsAction.java | 2 - 5 files changed, 100 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index c01f5de8af3..056e6a4daf4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -40,8 +40,10 @@ import java.util.Map; import java.util.Set; import static java.util.stream.Collectors.toList; +import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -51,6 +53,8 @@ import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { @@ -498,6 +502,59 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds)); } + public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception { + initialize("classification_set_upgrade_mode"); + indexData(sourceIndex, 300, 0, KEYWORD_FIELD); + + assertThat(upgradeMode(), is(false)); + + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD)); + registerAnalytics(config); + putAnalytics(config); + startAnalytics(jobId); + assertThat(analyticsTaskList(), hasSize(1)); + assertThat(analyticsAssignedTaskList(), hasSize(1)); + + setUpgradeModeTo(true); + assertThat(analyticsTaskList(), hasSize(1)); + assertThat(analyticsAssignedTaskList(), is(empty())); + + GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId); + assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation()))); + assertThat(analyticsStats.getNode(), is(nullValue())); + + setUpgradeModeTo(false); + assertThat(analyticsTaskList(), hasSize(1)); + assertBusy(() -> assertThat(analyticsAssignedTaskList(), hasSize(1))); + + analyticsStats = getAnalyticsStats(jobId); + assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation())))); + + waitUntilAnalyticsIsStopped(jobId); + } + + public void testSetUpgradeMode_NewTaskDoesNotStart() throws Exception { + initialize("classification_set_upgrade_mode_task_should_not_start"); + indexData(sourceIndex, 100, 0, KEYWORD_FIELD); + + assertThat(upgradeMode(), is(false)); + + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD)); + registerAnalytics(config); + putAnalytics(config); + + setUpgradeModeTo(true); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(config.getId())); + assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS))); + assertThat( + e.getMessage(), + is(equalTo("Cannot perform cluster:admin/xpack/ml/data_frame/analytics/start action while upgrade mode is enabled"))); + + assertThat(analyticsTaskList(), is(empty())); + assertThat(analyticsAssignedTaskList(), is(empty())); + } + public void testDeleteExpiredData_RemovesUnusedState() throws Exception { initialize("classification_delete_expired_data"); indexData(sourceIndex, 100, 0, KEYWORD_FIELD); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index a9c6c9eb91d..b49e3ca0282 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -15,13 +15,17 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; @@ -48,6 +52,8 @@ import org.hamcrest.Matchers; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -60,6 +66,7 @@ import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -152,7 +159,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest GetDataFrameAnalyticsStatsAction.Response response = client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE, request) .actionGet(); List stats = response.getResponse().results(); - assertThat("Got: " + stats.toString(), stats.size(), equalTo(1)); + assertThat("Got: " + stats.toString(), stats, hasSize(1)); return stats.get(0); } @@ -196,7 +203,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); assertThat(stats.getId(), equalTo(id)); List progress = stats.getProgress(); - assertThat(progress.size(), equalTo(4)); + assertThat(progress, hasSize(4)); assertThat(progress.get(0).getPhase(), equalTo("reindexing")); assertThat(progress.get(1).getPhase(), equalTo("loading_data")); assertThat(progress.get(2).getPhase(), equalTo("analyzing")); @@ -221,6 +228,18 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest assertThat("Hits were: " + Strings.toString(searchResponse.getHits()), searchResponse.getHits().getHits(), arrayWithSize(1)); } + protected Collection> analyticsTaskList() { + ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); + PersistentTasksCustomMetadata persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + return persistentTasks != null + ? persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> true) + : Collections.emptyList(); + } + + protected List analyticsAssignedTaskList() { + return client().admin().cluster().prepareListTasks().setActions(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME + "[c]").get().getTasks(); + } + /** * Asserts whether the audit messages fetched from index match provided prefixes. * More specifically, in order to pass: @@ -284,7 +303,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setQuery(QueryBuilders.idsQuery().addIds(stateDocId)) .get(); - assertThat(searchResponse.getHits().getHits().length, equalTo(1)); + assertThat("Hits were: " + Strings.toString(searchResponse.getHits()), searchResponse.getHits().getHits(), is(arrayWithSize(1))); } protected static void assertMlResultsFieldMappings(String index, String predictedClassField, String expectedType) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index 1f6c7beaf4f..c4684b44ebf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -40,7 +40,10 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,9 +55,13 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; import static org.elasticsearch.xpack.core.ml.MlTasks.DATAFEED_TASK_NAME; import static org.elasticsearch.xpack.core.ml.MlTasks.JOB_TASK_NAME; +import static org.elasticsearch.xpack.core.ml.MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME; public class TransportSetUpgradeModeAction extends TransportMasterNodeAction { + private static final Set ML_TASK_NAMES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(JOB_TASK_NAME, DATAFEED_TASK_NAME, DATA_FRAME_ANALYTICS_TASK_NAME))); + private static final Logger logger = LogManager.getLogger(TransportSetUpgradeModeAction.class); private final AtomicBoolean isRunning = new AtomicBoolean(false); @@ -124,12 +131,12 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction We have unassigned the tasks, respond to the listener. ActionListener>> unassignPersistentTasksListener = ActionListener.wrap( - unassigndPersistentTasks -> { + unassignedPersistentTasks -> { // Wait for our tasks to all stop client.admin() .cluster() .prepareListTasks() - .setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]") + .setActions(ML_TASK_NAMES.stream().map(taskName -> taskName + "[c]").toArray(String[]::new)) // There is a chance that we failed un-allocating a task due to allocation_id being changed // This call will timeout in that case and return an error .setWaitForCompletion(true) @@ -161,8 +168,8 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction <.2> - If we are disabling the option, we need to wait to make sure all the job and datafeed tasks no longer have the upgrade mode - assignment + If we are disabling the option, we need to wait to make sure all the job, datafeed and analytics tasks no longer have the + upgrade mode assignment We make no guarantees around which tasks will be running again once upgrade_mode is disabled. @@ -198,16 +205,10 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction - // Wait for jobs to not be "Awaiting upgrade" - persistentTasksCustomMetadata.findTasks(JOB_TASK_NAME, - (t) -> t.getAssignment().equals(AWAITING_UPGRADE)) - .isEmpty() && - - // Wait for datafeeds to not be "Awaiting upgrade" - persistentTasksCustomMetadata.findTasks(DATAFEED_TASK_NAME, - (t) -> t.getAssignment().equals(AWAITING_UPGRADE)) - .isEmpty(), + // Wait for jobs, datafeeds and analytics not to be "Awaiting upgrade" + persistentTasksCustomMetadata -> + persistentTasksCustomMetadata.tasks().stream() + .noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE)), request.timeout(), ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure) ); @@ -242,9 +243,9 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction - * The reason for unassigning both types is that we want the Datafeed to attempt re-assignment once `upgrade_mode` is + * The reason for unassigning both Job and Datafeed is that we want the Datafeed to attempt re-assignment once `upgrade_mode` is * disabled. *

* If we do not force an allocation change for the Datafeed tasks, they will never start again, since they were isolated. @@ -256,18 +257,17 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction>> listener) { - List> datafeedAndJobTasks = tasksCustomMetadata + List> mlTasks = tasksCustomMetadata .tasks() .stream() - .filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) || - persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME))) + .filter(persistentTask -> ML_TASK_NAMES.contains(persistentTask.getTaskName())) // We want to always have the same ordering of which tasks we un-allocate first. // However, the order in which the distributed tasks handle the un-allocation event is not guaranteed. .sorted(Comparator.comparing(PersistentTask::getTaskName)) .collect(Collectors.toList()); logger.info("Un-assigning persistent tasks : " + - datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]"))); + mlTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]"))); TypedChainTaskExecutor> chainTaskExecutor = new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), @@ -278,7 +278,7 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction ExceptionsHelper.unwrapCause(ex) instanceof ResourceNotFoundException == false); - for (PersistentTask task : datafeedAndJobTasks) { + for (PersistentTask task : mlTasks) { chainTaskExecutor.add( chainedTask -> persistentTasksClusterService.unassignPersistentTask(task.getId(), task.getAllocationId(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index a27c2d19930..affbffc572c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -90,8 +90,6 @@ import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE; /** * Starts the persistent task for running data frame analytics. - * - * TODO Add to the upgrade mode action */ public class TransportStartDataFrameAnalyticsAction extends TransportMasterNodeAction { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java index 895e09bc28c..123e038c5ca 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java @@ -53,8 +53,6 @@ import java.util.stream.Stream; /** * Stops the persistent task for running data frame analytics. - * - * TODO Add to the upgrade mode action */ public class TransportStopDataFrameAnalyticsAction extends TransportTasksAction