[ML] fixing data frame analysis test when two jobs are started in succession quickly (#53192) (#53332)

A previous change (#53029) is causing analysis jobs to wait for certain indices to be made available. While this it is good for jobs to wait, they could fail early on _start. 

This change will cause the persistent task to continually retry node assignment when the failure is due to shards not being available.

If the shards are not available by the time `timeout` is reached by the predicate, it is treated as a _start failure and the task is canceled. 

For tasks seeking a new assignment after a node failure, that behavior is unchanged.


closes #53188
This commit is contained in:
Benjamin Trent 2020-03-10 08:30:47 -04:00 committed by GitHub
parent 5912895838
commit 856d9bfbc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 9 deletions

View File

@ -417,7 +417,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertEvaluation(ALIAS_TO_NESTED_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53188")
public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception {
String sourceIndex = "classification_two_jobs_with_same_randomize_seed_source";
String dependentVariable = KEYWORD_FIELD;

View File

@ -271,7 +271,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53188")
public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception {
String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source";
indexData(sourceIndex, 100, 0);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
@ -96,6 +97,7 @@ public class TransportStartDataFrameAnalyticsAction
extends TransportMasterNodeAction<StartDataFrameAnalyticsAction.Request, AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportStartDataFrameAnalyticsAction.class);
private static final String PRIMARY_SHARDS_INACTIVE = "not all primary shards are active";
private final XPackLicenseState licenseState;
private final Client client;
@ -409,8 +411,26 @@ public class TransportStartDataFrameAnalyticsAction
@Override
public void onTimeout(TimeValue timeout) {
logger.error(
() -> new ParameterizedMessage("[{}] timed out when starting task after [{}]. Assignment explanation [{}]",
task.getParams().getId(),
timeout,
predicate.assignmentExplanation));
if (predicate.assignmentExplanation != null) {
cancelAnalyticsStart(task,
new ElasticsearchStatusException(
"Could not start data frame analytics task, timed out after [{}] waiting for task assignment. "
+ "Assignment explanation [{}]",
RestStatus.TOO_MANY_REQUESTS,
timeout,
predicate.assignmentExplanation),
listener);
} else {
listener.onFailure(new ElasticsearchException(
"Starting data frame analytics [" + task.getParams().getId() + "] timed out after [" + timeout + "]"));
"Starting data frame analytics [{}] timed out after [{}]",
task.getParams().getId(),
timeout));
}
}
});
}
@ -435,6 +455,7 @@ public class TransportStartDataFrameAnalyticsAction
private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private volatile Exception exception;
private volatile String assignmentExplanation;
@Override
public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
@ -449,9 +470,15 @@ public class TransportStartDataFrameAnalyticsAction
return true;
}
if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false &&
assignment.isAssigned() == false) {
// Assignment has failed despite passing our "fast fail" validation
if (assignment != null
&& assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false
&& assignment.isAssigned() == false) {
assignmentExplanation = assignment.getExplanation();
// Assignment failed due to primary shard check.
// This is hopefully intermittent and we should allow another assignment attempt.
if (assignmentExplanation.contains(PRIMARY_SHARDS_INACTIVE)) {
return false;
}
exception = new ElasticsearchStatusException("Could not start data frame analytics task, allocation explanation [" +
assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
return true;
@ -582,8 +609,12 @@ public class TransportStartDataFrameAnalyticsAction
MlStatsIndex.indexPattern(),
AnomalyDetectorsIndex.jobStateIndexPattern());
if (unavailableIndices.size() != 0) {
String reason = "Not opening data frame analytics job [" + id +
"], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]";
String reason = "Not opening data frame analytics job ["
+ id
+ "], because "
+ PRIMARY_SHARDS_INACTIVE
+ " for the following indices ["
+ String.join(",", unavailableIndices) + "]";
logger.debug(reason);
return new PersistentTasksCustomMetaData.Assignment(null, reason);
}