From 856d9bfbc1807c0e7187d0e5ebad8371972c07f3 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 10 Mar 2020 08:30:47 -0400 Subject: [PATCH] [ML] fixing data frame analysis test when two jobs are started in succession quickly (#53192) (#53332) A previous change (#53029) is causing analysis jobs to wait for certain indices to be made available. While this it is good for jobs to wait, they could fail early on _start. This change will cause the persistent task to continually retry node assignment when the failure is due to shards not being available. If the shards are not available by the time `timeout` is reached by the predicate, it is treated as a _start failure and the task is canceled. For tasks seeking a new assignment after a node failure, that behavior is unchanged. closes #53188 --- .../ml/integration/ClassificationIT.java | 1 - .../xpack/ml/integration/RegressionIT.java | 1 - ...ransportStartDataFrameAnalyticsAction.java | 45 ++++++++++++++++--- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index 345a9ab0b6d..5327435f936 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -417,7 +417,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertEvaluation(ALIAS_TO_NESTED_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53188") public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception { String sourceIndex = "classification_two_jobs_with_same_randomize_seed_source"; String dependentVariable = KEYWORD_FIELD; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 60a57e6fe86..0a40012e5d3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -271,7 +271,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53188") public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception { String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source"; indexData(sourceIndex, 100, 0); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 9e7f5ff2d63..a00ed00bb9e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; @@ -96,6 +97,7 @@ public class TransportStartDataFrameAnalyticsAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportStartDataFrameAnalyticsAction.class); + private static final String PRIMARY_SHARDS_INACTIVE = "not all primary shards are active"; private final XPackLicenseState licenseState; private final Client client; @@ -409,8 +411,26 @@ public class TransportStartDataFrameAnalyticsAction @Override public void onTimeout(TimeValue timeout) { - listener.onFailure(new ElasticsearchException( - "Starting data frame analytics [" + task.getParams().getId() + "] timed out after [" + timeout + "]")); + logger.error( + () -> new ParameterizedMessage("[{}] timed out when starting task after [{}]. Assignment explanation [{}]", + task.getParams().getId(), + timeout, + predicate.assignmentExplanation)); + if (predicate.assignmentExplanation != null) { + cancelAnalyticsStart(task, + new ElasticsearchStatusException( + "Could not start data frame analytics task, timed out after [{}] waiting for task assignment. " + + "Assignment explanation [{}]", + RestStatus.TOO_MANY_REQUESTS, + timeout, + predicate.assignmentExplanation), + listener); + } else { + listener.onFailure(new ElasticsearchException( + "Starting data frame analytics [{}] timed out after [{}]", + task.getParams().getId(), + timeout)); + } } }); } @@ -435,6 +455,7 @@ public class TransportStartDataFrameAnalyticsAction private static class AnalyticsPredicate implements Predicate> { private volatile Exception exception; + private volatile String assignmentExplanation; @Override public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTask) { @@ -449,9 +470,15 @@ public class TransportStartDataFrameAnalyticsAction return true; } - if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && - assignment.isAssigned() == false) { - // Assignment has failed despite passing our "fast fail" validation + if (assignment != null + && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false + && assignment.isAssigned() == false) { + assignmentExplanation = assignment.getExplanation(); + // Assignment failed due to primary shard check. + // This is hopefully intermittent and we should allow another assignment attempt. + if (assignmentExplanation.contains(PRIMARY_SHARDS_INACTIVE)) { + return false; + } exception = new ElasticsearchStatusException("Could not start data frame analytics task, allocation explanation [" + assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); return true; @@ -582,8 +609,12 @@ public class TransportStartDataFrameAnalyticsAction MlStatsIndex.indexPattern(), AnomalyDetectorsIndex.jobStateIndexPattern()); if (unavailableIndices.size() != 0) { - String reason = "Not opening data frame analytics job [" + id + - "], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]"; + String reason = "Not opening data frame analytics job [" + + id + + "], because " + + PRIMARY_SHARDS_INACTIVE + + " for the following indices [" + + String.join(",", unavailableIndices) + "]"; logger.debug(reason); return new PersistentTasksCustomMetaData.Assignment(null, reason); }