Register DFA jobs on putAnalytics rather than via a separate method (#55458) (#55708)

This commit is contained in:
Przemysław Witek 2020-04-24 10:59:32 +02:00 committed by GitHub
parent b8379872a7
commit c89917c799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 13 additions and 41 deletions

View File

@ -100,7 +100,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
null,
null,
null));
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);
@ -148,7 +147,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 300, 0, KEYWORD_FIELD);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);
@ -209,7 +207,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
destIndex,
null,
new Classification(dependentVariable, BoostedTreeParams.builder().build(), null, null, numTopClasses, 50.0, null));
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);
@ -305,7 +302,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 350, 0, KEYWORD_FIELD);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);
@ -374,7 +370,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
}
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(jobId));
@ -393,7 +388,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
QueryBuilder query = QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery(KEYWORD_FIELD, KEYWORD_FIELD_VALUES));
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD), query);
registerAnalytics(config);
putAnalytics(config);
// Should not throw
@ -409,7 +403,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 100, 0, NESTED_FIELD);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(NESTED_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
@ -428,7 +421,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(ALIAS_TO_KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
@ -447,7 +439,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 100, 0, NESTED_FIELD);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(ALIAS_TO_NESTED_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
@ -482,7 +473,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
DataFrameAnalyticsConfig firstJob = buildAnalytics(firstJobId, sourceIndex, firstJobDestIndex, null,
new Classification(dependentVariable, boostedTreeParams, null, null, 1, 50.0, null));
registerAnalytics(firstJob);
putAnalytics(firstJob);
String secondJobId = "classification_two_jobs_with_same_randomize_seed_2";
@ -492,7 +482,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
DataFrameAnalyticsConfig secondJob = buildAnalytics(secondJobId, sourceIndex, secondJobDestIndex, null,
new Classification(dependentVariable, boostedTreeParams, null, null, 1, 50.0, randomizeSeed));
registerAnalytics(secondJob);
putAnalytics(secondJob);
// Let's run both jobs in parallel and wait until they are finished
@ -515,7 +504,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(upgradeMode(), is(false));
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
assertThat(analyticsTaskList(), hasSize(1));
@ -561,7 +549,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(upgradeMode(), is(false));
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
setUpgradeModeTo(true);
@ -581,7 +568,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
@ -1210,7 +1211,10 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void clearMlState() throws Exception {
new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata();
// Don't check rollup jobs because we clear them in the superclass.
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(RollupJob.NAME));
// Don't check analytics jobs as they are independent of anomaly detection jobs and should not be created by this test.
waitForPendingTasks(
adminClient(),
taskName -> taskName.startsWith(RollupJob.NAME) || taskName.contains(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME));
}
private static class DatafeedBuilder {

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@ -827,6 +828,7 @@ public class MlJobIT extends ESRestTestCase {
@After
public void clearMlState() throws Exception {
new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata();
ESRestTestCase.waitForPendingTasks(adminClient());
// Don't check analytics jobs as they are independent of anomaly detection jobs and should not be created by this test.
waitForPendingTasks(adminClient(), taskName -> taskName.contains(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
@ -76,7 +77,7 @@ import static org.hamcrest.Matchers.nullValue;
*/
abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTestCase {
private List<DataFrameAnalyticsConfig> analytics = new ArrayList<>();
private final List<DataFrameAnalyticsConfig> analytics = new ArrayList<>();
@Override
protected void cleanUpResources() {
@ -91,7 +92,8 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true));
assertThat(searchStoredProgress(config.getId()).getHits().getTotalHits().value, equalTo(0L));
} catch (Exception e) {
// ignore
// just log and ignore
logger.error(new ParameterizedMessage("[{}] Could not clean up analytics job config", config.getId()), e);
}
}
}
@ -110,13 +112,10 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
}
}
protected void registerAnalytics(DataFrameAnalyticsConfig config) {
protected PutDataFrameAnalyticsAction.Response putAnalytics(DataFrameAnalyticsConfig config) {
if (analytics.add(config) == false) {
throw new IllegalArgumentException("analytics config [" + config.getId() + "] is already registered");
}
}
protected PutDataFrameAnalyticsAction.Response putAnalytics(DataFrameAnalyticsConfig config) {
PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(config);
return client().execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet();
}

View File

@ -71,7 +71,6 @@ public class OutlierDetectionWithMissingFieldsIT extends MlNativeDataFrameAnalyt
String id = "test_outlier_detection_with_missing_fields";
DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, sourceIndex + "-results", null,
new OutlierDetection.Builder().build());
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);

View File

@ -69,7 +69,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
null,
null)
);
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);
@ -127,7 +126,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 350, 0);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);
@ -183,7 +181,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
destIndex,
null,
new Regression(DEPENDENT_VARIABLE_FIELD, BoostedTreeParams.builder().build(), null, 50.0, null));
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);
@ -243,7 +240,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 350, 0);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);
@ -312,7 +308,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
DataFrameAnalyticsConfig firstJob = buildAnalytics(firstJobId, sourceIndex, firstJobDestIndex, null,
new Regression(DEPENDENT_VARIABLE_FIELD, boostedTreeParams, null, 50.0, null));
registerAnalytics(firstJob);
putAnalytics(firstJob);
String secondJobId = "regression_two_jobs_with_same_randomize_seed_2";
@ -322,7 +317,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
DataFrameAnalyticsConfig secondJob = buildAnalytics(secondJobId, sourceIndex, secondJobDestIndex, null,
new Regression(DEPENDENT_VARIABLE_FIELD, boostedTreeParams, null, 50.0, randomizeSeed));
registerAnalytics(secondJob);
putAnalytics(secondJob);
// Let's run both jobs in parallel and wait until they are finished
@ -344,7 +338,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
indexData(sourceIndex, 100, 0);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
@ -384,7 +377,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
destIndex,
null,
new Regression(DISCRETE_NUMERICAL_FEATURE_FIELD, BoostedTreeParams.builder().build(), null, null, null));
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(jobId);

View File

@ -98,7 +98,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_outlier_detection_with_few_docs";
DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, sourceIndex + "-results", null,
new OutlierDetection.Builder().build());
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -184,7 +183,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_outlier_detection_with_enough_docs_to_scroll";
DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, sourceIndex + "-results", "custom_ml",
new OutlierDetection.Builder().build());
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -259,7 +257,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_outlier_detection_with_more_fields_than_docvalue_limit";
DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, sourceIndex + "-results", null,
new OutlierDetection.Builder().build());
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -328,7 +325,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_stop_outlier_detection_with_enough_docs_to_scroll";
DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, sourceIndex + "-results", "custom_ml",
new OutlierDetection.Builder().build());
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -398,7 +394,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -460,7 +455,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_outlier_detection_with_pre_existing_dest_index";
DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, destIndex, null, new OutlierDetection.Builder().build());
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -524,7 +518,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
.setModelMemoryLimit(modelMemoryLimit)
.build();
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -569,7 +562,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
.setAllowLazyStart(true)
.build();
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -619,7 +611,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_outlier_detection_stop_and_restart";
DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, sourceIndex + "-results", "custom_ml",
new OutlierDetection.Builder().build());
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);
@ -697,7 +688,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
.setOutlierFraction(0.04)
.setStandardizationEnabled(true)
.build());
registerAnalytics(config);
putAnalytics(config);
assertIsStopped(id);