From 74d7be805a2546c55749b3f971f1a1c8b392d32d Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 24 Sep 2018 12:54:32 -0700 Subject: [PATCH] Make certain ML node settings dynamic (#33565) (#33961) * Make certain ML node settings dynamic (#33565) * Changing to pull in updating settings and pass to constructor * adding note about only newly opened jobs getting updated value --- .../xpack/ml/MachineLearning.java | 12 ++++- .../ml/job/process/DataCountsReporter.java | 49 +++++++++++++++---- .../process/autodetect/AutodetectBuilder.java | 13 +++-- .../autodetect/AutodetectProcessManager.java | 12 +++-- .../NativeAutodetectProcessFactory.java | 17 +++++-- .../job/process/CountingInputStreamTests.java | 34 +++++++++++-- .../job/process/DataCountsReporterTests.java | 45 +++++++++++------ .../job/process/DummyDataCountsReporter.java | 5 +- .../AutodetectProcessManagerTests.java | 22 +++++++-- 9 files changed, 165 insertions(+), 44 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index cd13b2c8bb6..538521909a7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -292,8 +292,11 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu MAX_MACHINE_MEMORY_PERCENT, AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING, + AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, + DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING, + DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING, AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE, AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE, AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP)); @@ -379,7 +382,12 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu // This will only only happen when path.home is not set, which is disallowed in production throw new ElasticsearchException("Failed to create native process controller for Machine Learning"); } - autodetectProcessFactory = new NativeAutodetectProcessFactory(environment, settings, nativeController, client); + autodetectProcessFactory = new NativeAutodetectProcessFactory( + environment, + settings, + nativeController, + client, + clusterService); normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController); } catch (IOException e) { // This also should not happen in production, as the MachineLearningFeatureSet should have @@ -397,7 +405,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, xContentRegistry, auditor); + normalizerFactory, xContentRegistry, auditor, clusterService); this.autodetectProcessManager.set(autodetectProcessManager); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index d906ccf2f7a..a00c14078eb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.process; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -42,15 +43,28 @@ public class DataCountsReporter extends AbstractComponent { * The max percentage of date parse errors allowed before * an exception is thrown. */ + @Deprecated public static final Setting ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING = Setting.intSetting("max.percent.date.errors", 25, - Property.NodeScope); - + Property.NodeScope, Property.Deprecated); + public static final Setting MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING = Setting.intSetting( + "xpack.ml.max_percent_date_errors", + ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, + 0, + Property.Dynamic, + Property.NodeScope); /** * The max percentage of out of order records allowed before * an exception is thrown. */ + @Deprecated public static final Setting ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting - .intSetting("max.percent.outoforder.errors", 25, Property.NodeScope); + .intSetting("max.percent.outoforder.errors", 25, Property.NodeScope, Property.Deprecated); + public static final Setting MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING = Setting.intSetting( + "xpack.ml.max_percent_out_of_order_errors", + ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING, + 0, + Property.Dynamic, + Property.NodeScope); private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L); @@ -66,14 +80,15 @@ public class DataCountsReporter extends AbstractComponent { private long logEvery = 1; private long logCount = 0; - private final int acceptablePercentDateParseErrors; - private final int acceptablePercentOutOfOrderErrors; + private volatile int acceptablePercentDateParseErrors; + private volatile int acceptablePercentOutOfOrderErrors; private Function reportingBoundaryFunction; private DataStreamDiagnostics diagnostics; - public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister) { + public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister, + ClusterService clusterService) { super(settings); @@ -84,9 +99,12 @@ public class DataCountsReporter extends AbstractComponent { incrementalRecordStats = new DataCounts(job.getId()); diagnostics = new DataStreamDiagnostics(job, counts); - acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); - acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); - + acceptablePercentDateParseErrors = MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.get(settings); + acceptablePercentOutOfOrderErrors = MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING, this::setAcceptablePercentDateParseErrors); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING, this::setAcceptablePercentOutOfOrderErrors); reportingBoundaryFunction = this::reportEvery10000Records; } @@ -352,4 +370,17 @@ public class DataCountsReporter extends AbstractComponent { diagnostics.resetCounts(); } + + private void setAcceptablePercentDateParseErrors(int acceptablePercentDateParseErrors) { + logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), + this.acceptablePercentDateParseErrors, acceptablePercentDateParseErrors); + this.acceptablePercentDateParseErrors = acceptablePercentDateParseErrors; + } + + private void setAcceptablePercentOutOfOrderErrors(int acceptablePercentOutOfOrderErrors) { + logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), + this.acceptablePercentOutOfOrderErrors, acceptablePercentOutOfOrderErrors); + this.acceptablePercentOutOfOrderErrors = acceptablePercentOutOfOrderErrors; + } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 0094eba97ce..4942200606d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -83,8 +83,16 @@ public class AutodetectBuilder { /** * The maximum number of anomaly records that will be written each bucket */ + @Deprecated public static final Setting MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS, - Setting.Property.NodeScope); + Setting.Property.NodeScope, Setting.Property.Deprecated); + // Though this setting is dynamic, it is only set when a new job is opened. So, already runnin jobs will not get the updated value. + public static final Setting MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting( + "xpack.ml.max_anomaly_records", + MAX_ANOMALY_RECORDS_SETTING, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic); /** * Config setting storing the flag that disables model persistence @@ -244,9 +252,8 @@ public class AutodetectBuilder { return command; } - static String maxAnomalyRecordsArg(Settings settings) { - return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING.get(settings); + return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings); } private static String getTimeFieldOrDefault(Job job) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index fa05c2e63ee..7e6d923bb51 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; @@ -130,12 +131,13 @@ public class AutodetectProcessManager extends AbstractComponent { private final NamedXContentRegistry xContentRegistry; private final Auditor auditor; + private final ClusterService clusterService; public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister, JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, - NamedXContentRegistry xContentRegistry, Auditor auditor) { + NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) { super(settings); this.environment = environment; this.client = client; @@ -150,6 +152,7 @@ public class AutodetectProcessManager extends AbstractComponent { this.jobDataCountsPersister = jobDataCountsPersister; this.auditor = auditor; this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings)); + this.clusterService = clusterService; } public void onNodeStartup() { @@ -493,8 +496,11 @@ public class AutodetectProcessManager extends AbstractComponent { Job job = jobManager.getJobOrThrowIfUnknown(jobId); // A TP with no queue, so that we fail immediately if there are no threads available ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); - DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(), - jobDataCountsPersister); + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, + job, + autodetectParams.dataCounts(), + jobDataCountsPersister, + clusterService); ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider, new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory); ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 01ad0bec85a..06055476f76 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -40,12 +41,15 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private final Environment env; private final Settings settings; private final NativeController nativeController; + private final ClusterService clusterService; - public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client) { + public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client, + ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.settings = Objects.requireNonNull(settings); this.nativeController = Objects.requireNonNull(nativeController); this.client = client; + this.clusterService = clusterService; } @Override @@ -85,8 +89,15 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, List filesToDelete) { try { + + Settings updatedSettings = Settings.builder() + .put(settings) + .put(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.getKey(), + clusterService.getClusterSettings().get(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC)) + .build(); + AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env, - settings, nativeController, processPipes) + updatedSettings, nativeController, processPipes) .referencedFilters(autodetectParams.filters()) .scheduledEvents(autodetectParams.scheduledEvents()); @@ -95,7 +106,6 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory if (autodetectParams.quantiles() != null) { autodetectBuilder.quantiles(autodetectParams.quantiles()); } - autodetectBuilder.build(); processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); } catch (IOException e) { @@ -104,5 +114,6 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory throw ExceptionsHelper.serverError(msg, e); } } + } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java index 45a5e57af5f..b867a6bbe1e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java @@ -5,18 +5,44 @@ */ package org.elasticsearch.xpack.ml.job.process; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; + +import static org.elasticsearch.mock.orig.Mockito.when; +import static org.mockito.Mockito.mock; public class CountingInputStreamTests extends ESTestCase { + private ClusterService clusterService; + + @Before + public void setUpMocks() { + Settings settings = Settings.builder().put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), 10) + .put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), 10) + .build(); + Set> setOfSettings = new HashSet<>(); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings); + + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } + public void testRead_OneByteAtATime() throws IOException { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); final String TEXT = "123"; InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); @@ -30,7 +56,7 @@ public class CountingInputStreamTests extends ESTestCase { public void testRead_WithBuffer() throws IOException { final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); @@ -44,7 +70,7 @@ public class CountingInputStreamTests extends ESTestCase { public void testRead_WithTinyBuffer() throws IOException { final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); @@ -57,7 +83,7 @@ public class CountingInputStreamTests extends ESTestCase { public void testRead_WithResets() throws IOException { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index d3afb732418..5f56db164a3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.ml.job.process; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; @@ -22,11 +25,15 @@ import org.mockito.Mockito; import java.io.IOException; import java.util.Arrays; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.mock.orig.Mockito.when; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -38,12 +45,13 @@ public class DataCountsReporterTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private Settings settings; private TimeValue bucketSpan = TimeValue.timeValueSeconds(300); + private ClusterService clusterService; @Before public void setUpMocks() { settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) - .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS) - .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS) + .put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS) + .put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS) .build(); AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); @@ -51,6 +59,15 @@ public class DataCountsReporterTests extends ESTestCase { acBuilder.setLatency(TimeValue.ZERO); acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + + Set> setOfSettings = new HashSet<>(); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings); + + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + Job.Builder builder = new Job.Builder("sr"); builder.setAnalysisConfig(acBuilder); builder.setDataDescription(new DataDescription.Builder()); @@ -61,14 +78,14 @@ public class DataCountsReporterTests extends ESTestCase { public void testSettingAcceptablePercentages() throws IOException { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS); assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS); } public void testSimpleConstructor() throws Exception { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); @@ -79,7 +96,7 @@ public class DataCountsReporterTests extends ESTestCase { new Date(), new Date(), new Date(), new Date(), new Date()); DataCountsReporter dataCountsReporter = - new DataCountsReporter(settings, job, counts, jobDataCountsPersister); + new DataCountsReporter(settings, job, counts, jobDataCountsPersister, clusterService); DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); @@ -97,7 +114,7 @@ public class DataCountsReporterTests extends ESTestCase { public void testResetIncrementalCounts() throws Exception { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); @@ -150,7 +167,7 @@ public class DataCountsReporterTests extends ESTestCase { public void testReportLatestTimeIncrementalStats() throws IOException { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); dataCountsReporter.startNewIncrementalCount(); dataCountsReporter.reportLatestTimeIncrementalStats(5001L); assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); @@ -158,7 +175,7 @@ public class DataCountsReporterTests extends ESTestCase { public void testReportRecordsWritten() { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.reportRecordWritten(5, 2000); @@ -182,7 +199,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testReportRecordsWritten_Given9999Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 9999; i++) { @@ -199,7 +216,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testReportRecordsWritten_Given30000Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 30001; i++) { @@ -216,7 +233,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testReportRecordsWritten_Given100_000Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 100000; i++) { @@ -233,7 +250,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testReportRecordsWritten_Given1_000_000Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 1_000_000; i++) { @@ -250,7 +267,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testReportRecordsWritten_Given2_000_000Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 2_000_000; i++) { @@ -269,7 +286,7 @@ public class DataCountsReporterTests extends ESTestCase { public void testFinishReporting() { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); Date now = new Date(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java index bcf41a994b9..6b4c68e1f30 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -26,9 +27,9 @@ class DummyDataCountsReporter extends DataCountsReporter { int logStatusCallCount = 0; - DummyDataCountsReporter() { + DummyDataCountsReporter(ClusterService clusterService) { super(Settings.EMPTY, createJob(), new DataCounts("DummyJobId"), - mock(JobDataCountsPersister.class)); + mock(JobDataCountsPersister.class), clusterService); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 43cc909e392..dd67de41996 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -8,7 +8,10 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -41,6 +44,7 @@ import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; @@ -106,6 +110,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; private Auditor auditor; + private ClusterService clusterService; private DataCounts dataCounts = new DataCounts("foo"); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); @@ -126,6 +131,15 @@ public class AutodetectProcessManagerTests extends ESTestCase { normalizerFactory = mock(NormalizerFactory.class); auditor = mock(Auditor.class); + + Set> setOfSettings = new HashSet<>(); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings); + + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -228,7 +242,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor)); + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService)); doReturn(executorService).when(manager).createAutodetectExecutorService(any()); doAnswer(invocationOnMock -> { @@ -583,7 +597,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { (j, autodetectParams, e, onProcessCrash) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("my_id"); @@ -656,7 +670,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { (j, autodetectParams, e, onProcessCrash) -> autodetectProcess; return new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService); } private AutodetectParams buildAutodetectParams() { @@ -682,7 +696,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, - new NamedXContentRegistry(Collections.emptyList()), auditor); + new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService); manager = spy(manager); doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), any()); return manager;