Make certain ML node settings dynamic (#33565) (#33961)

* Make certain ML node settings dynamic (#33565)

* Changing to pull in updating settings and pass to constructor

* adding note about only newly opened jobs getting updated value
This commit is contained in:
Benjamin Trent 2018-09-24 12:54:32 -07:00 committed by GitHub
parent 2795ef561f
commit 74d7be805a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 165 additions and 44 deletions

View File

@ -292,8 +292,11 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
MAX_MACHINE_MEMORY_PERCENT, MAX_MACHINE_MEMORY_PERCENT,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING, AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE, AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE, AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP)); AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
@ -379,7 +382,12 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
// This will only only happen when path.home is not set, which is disallowed in production // This will only only happen when path.home is not set, which is disallowed in production
throw new ElasticsearchException("Failed to create native process controller for Machine Learning"); throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
} }
autodetectProcessFactory = new NativeAutodetectProcessFactory(environment, settings, nativeController, client); autodetectProcessFactory = new NativeAutodetectProcessFactory(
environment,
settings,
nativeController,
client,
clusterService);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController); normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController);
} catch (IOException e) { } catch (IOException e) {
// This also should not happen in production, as the MachineLearningFeatureSet should have // This also should not happen in production, as the MachineLearningFeatureSet should have
@ -397,7 +405,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool, AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor); normalizerFactory, xContentRegistry, auditor, clusterService);
this.autodetectProcessManager.set(autodetectProcessManager); this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
@ -42,15 +43,28 @@ public class DataCountsReporter extends AbstractComponent {
* The max percentage of date parse errors allowed before * The max percentage of date parse errors allowed before
* an exception is thrown. * an exception is thrown.
*/ */
@Deprecated
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING = Setting.intSetting("max.percent.date.errors", 25, public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING = Setting.intSetting("max.percent.date.errors", 25,
Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING = Setting.intSetting(
"xpack.ml.max_percent_date_errors",
ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
0,
Property.Dynamic,
Property.NodeScope); Property.NodeScope);
/** /**
* The max percentage of out of order records allowed before * The max percentage of out of order records allowed before
* an exception is thrown. * an exception is thrown.
*/ */
@Deprecated
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting
.intSetting("max.percent.outoforder.errors", 25, Property.NodeScope); .intSetting("max.percent.outoforder.errors", 25, Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING = Setting.intSetting(
"xpack.ml.max_percent_out_of_order_errors",
ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
0,
Property.Dynamic,
Property.NodeScope);
private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L); private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L);
@ -66,14 +80,15 @@ public class DataCountsReporter extends AbstractComponent {
private long logEvery = 1; private long logEvery = 1;
private long logCount = 0; private long logCount = 0;
private final int acceptablePercentDateParseErrors; private volatile int acceptablePercentDateParseErrors;
private final int acceptablePercentOutOfOrderErrors; private volatile int acceptablePercentOutOfOrderErrors;
private Function<Long, Boolean> reportingBoundaryFunction; private Function<Long, Boolean> reportingBoundaryFunction;
private DataStreamDiagnostics diagnostics; private DataStreamDiagnostics diagnostics;
public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister) { public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister,
ClusterService clusterService) {
super(settings); super(settings);
@ -84,9 +99,12 @@ public class DataCountsReporter extends AbstractComponent {
incrementalRecordStats = new DataCounts(job.getId()); incrementalRecordStats = new DataCounts(job.getId());
diagnostics = new DataStreamDiagnostics(job, counts); diagnostics = new DataStreamDiagnostics(job, counts);
acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); acceptablePercentDateParseErrors = MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); acceptablePercentOutOfOrderErrors = MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING, this::setAcceptablePercentDateParseErrors);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING, this::setAcceptablePercentOutOfOrderErrors);
reportingBoundaryFunction = this::reportEvery10000Records; reportingBoundaryFunction = this::reportEvery10000Records;
} }
@ -352,4 +370,17 @@ public class DataCountsReporter extends AbstractComponent {
diagnostics.resetCounts(); diagnostics.resetCounts();
} }
private void setAcceptablePercentDateParseErrors(int acceptablePercentDateParseErrors) {
logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(),
this.acceptablePercentDateParseErrors, acceptablePercentDateParseErrors);
this.acceptablePercentDateParseErrors = acceptablePercentDateParseErrors;
}
private void setAcceptablePercentOutOfOrderErrors(int acceptablePercentOutOfOrderErrors) {
logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(),
this.acceptablePercentOutOfOrderErrors, acceptablePercentOutOfOrderErrors);
this.acceptablePercentOutOfOrderErrors = acceptablePercentOutOfOrderErrors;
}
} }

View File

@ -83,8 +83,16 @@ public class AutodetectBuilder {
/** /**
* The maximum number of anomaly records that will be written each bucket * The maximum number of anomaly records that will be written each bucket
*/ */
@Deprecated
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS, public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS,
Setting.Property.NodeScope); Setting.Property.NodeScope, Setting.Property.Deprecated);
// Though this setting is dynamic, it is only set when a new job is opened. So, already runnin jobs will not get the updated value.
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting(
"xpack.ml.max_anomaly_records",
MAX_ANOMALY_RECORDS_SETTING,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic);
/** /**
* Config setting storing the flag that disables model persistence * Config setting storing the flag that disables model persistence
@ -244,9 +252,8 @@ public class AutodetectBuilder {
return command; return command;
} }
static String maxAnomalyRecordsArg(Settings settings) { static String maxAnomalyRecordsArg(Settings settings) {
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING.get(settings); return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings);
} }
private static String getTimeFieldOrDefault(Job job) { private static String getTimeFieldOrDefault(Job job) {

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -130,12 +131,13 @@ public class AutodetectProcessManager extends AbstractComponent {
private final NamedXContentRegistry xContentRegistry; private final NamedXContentRegistry xContentRegistry;
private final Auditor auditor; private final Auditor auditor;
private final ClusterService clusterService;
public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister, JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, JobDataCountsPersister jobDataCountsPersister,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
NamedXContentRegistry xContentRegistry, Auditor auditor) { NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) {
super(settings); super(settings);
this.environment = environment; this.environment = environment;
this.client = client; this.client = client;
@ -150,6 +152,7 @@ public class AutodetectProcessManager extends AbstractComponent {
this.jobDataCountsPersister = jobDataCountsPersister; this.jobDataCountsPersister = jobDataCountsPersister;
this.auditor = auditor; this.auditor = auditor;
this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings)); this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
this.clusterService = clusterService;
} }
public void onNodeStartup() { public void onNodeStartup() {
@ -493,8 +496,11 @@ public class AutodetectProcessManager extends AbstractComponent {
Job job = jobManager.getJobOrThrowIfUnknown(jobId); Job job = jobManager.getJobOrThrowIfUnknown(jobId);
// A TP with no queue, so that we fail immediately if there are no threads available // A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(), DataCountsReporter dataCountsReporter = new DataCountsReporter(settings,
jobDataCountsPersister); job,
autodetectParams.dataCounts(),
jobDataCountsPersister,
clusterService);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider, ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory); new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@ -40,12 +41,15 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
private final Environment env; private final Environment env;
private final Settings settings; private final Settings settings;
private final NativeController nativeController; private final NativeController nativeController;
private final ClusterService clusterService;
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client) { public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
ClusterService clusterService) {
this.env = Objects.requireNonNull(env); this.env = Objects.requireNonNull(env);
this.settings = Objects.requireNonNull(settings); this.settings = Objects.requireNonNull(settings);
this.nativeController = Objects.requireNonNull(nativeController); this.nativeController = Objects.requireNonNull(nativeController);
this.client = client; this.client = client;
this.clusterService = clusterService;
} }
@Override @Override
@ -85,8 +89,15 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) { List<Path> filesToDelete) {
try { try {
Settings updatedSettings = Settings.builder()
.put(settings)
.put(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.getKey(),
clusterService.getClusterSettings().get(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC))
.build();
AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env, AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env,
settings, nativeController, processPipes) updatedSettings, nativeController, processPipes)
.referencedFilters(autodetectParams.filters()) .referencedFilters(autodetectParams.filters())
.scheduledEvents(autodetectParams.scheduledEvents()); .scheduledEvents(autodetectParams.scheduledEvents());
@ -95,7 +106,6 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
if (autodetectParams.quantiles() != null) { if (autodetectParams.quantiles() != null) {
autodetectBuilder.quantiles(autodetectParams.quantiles()); autodetectBuilder.quantiles(autodetectParams.quantiles());
} }
autodetectBuilder.build(); autodetectBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
} catch (IOException e) { } catch (IOException e) {
@ -104,5 +114,6 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
throw ExceptionsHelper.serverError(msg, e); throw ExceptionsHelper.serverError(msg, e);
} }
} }
} }

View File

@ -5,18 +5,44 @@
*/ */
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.mockito.Mockito.mock;
public class CountingInputStreamTests extends ESTestCase { public class CountingInputStreamTests extends ESTestCase {
private ClusterService clusterService;
@Before
public void setUpMocks() {
Settings settings = Settings.builder().put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), 10)
.put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), 10)
.build();
Set<Setting<?>> setOfSettings = new HashSet<>();
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING);
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING);
ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings);
clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
}
public void testRead_OneByteAtATime() throws IOException { public void testRead_OneByteAtATime() throws IOException {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
final String TEXT = "123"; final String TEXT = "123";
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
@ -30,7 +56,7 @@ public class CountingInputStreamTests extends ESTestCase {
public void testRead_WithBuffer() throws IOException { public void testRead_WithBuffer() throws IOException {
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
@ -44,7 +70,7 @@ public class CountingInputStreamTests extends ESTestCase {
public void testRead_WithTinyBuffer() throws IOException { public void testRead_WithTinyBuffer() throws IOException {
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
@ -57,7 +83,7 @@ public class CountingInputStreamTests extends ESTestCase {
public void testRead_WithResets() throws IOException { public void testRead_WithResets() throws IOException {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
@ -22,11 +25,15 @@ import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -38,12 +45,13 @@ public class DataCountsReporterTests extends ESTestCase {
private JobDataCountsPersister jobDataCountsPersister; private JobDataCountsPersister jobDataCountsPersister;
private Settings settings; private Settings settings;
private TimeValue bucketSpan = TimeValue.timeValueSeconds(300); private TimeValue bucketSpan = TimeValue.timeValueSeconds(300);
private ClusterService clusterService;
@Before @Before
public void setUpMocks() { public void setUpMocks() {
settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS) .put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS)
.put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS) .put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS)
.build(); .build();
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
@ -51,6 +59,15 @@ public class DataCountsReporterTests extends ESTestCase {
acBuilder.setLatency(TimeValue.ZERO); acBuilder.setLatency(TimeValue.ZERO);
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
Set<Setting<?>> setOfSettings = new HashSet<>();
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING);
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING);
ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings);
clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
Job.Builder builder = new Job.Builder("sr"); Job.Builder builder = new Job.Builder("sr");
builder.setAnalysisConfig(acBuilder); builder.setAnalysisConfig(acBuilder);
builder.setDataDescription(new DataDescription.Builder()); builder.setDataDescription(new DataDescription.Builder());
@ -61,14 +78,14 @@ public class DataCountsReporterTests extends ESTestCase {
public void testSettingAcceptablePercentages() throws IOException { public void testSettingAcceptablePercentages() throws IOException {
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister); jobDataCountsPersister, clusterService);
assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS); assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS);
assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS); assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS);
} }
public void testSimpleConstructor() throws Exception { public void testSimpleConstructor() throws Exception {
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister); jobDataCountsPersister, clusterService);
DataCounts stats = dataCountsReporter.incrementalStats(); DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
@ -79,7 +96,7 @@ public class DataCountsReporterTests extends ESTestCase {
new Date(), new Date(), new Date(), new Date(), new Date()); new Date(), new Date(), new Date(), new Date(), new Date());
DataCountsReporter dataCountsReporter = DataCountsReporter dataCountsReporter =
new DataCountsReporter(settings, job, counts, jobDataCountsPersister); new DataCountsReporter(settings, job, counts, jobDataCountsPersister, clusterService);
DataCounts stats = dataCountsReporter.incrementalStats(); DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
@ -97,7 +114,7 @@ public class DataCountsReporterTests extends ESTestCase {
public void testResetIncrementalCounts() throws Exception { public void testResetIncrementalCounts() throws Exception {
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister); jobDataCountsPersister, clusterService);
DataCounts stats = dataCountsReporter.incrementalStats(); DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
@ -150,7 +167,7 @@ public class DataCountsReporterTests extends ESTestCase {
public void testReportLatestTimeIncrementalStats() throws IOException { public void testReportLatestTimeIncrementalStats() throws IOException {
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister); jobDataCountsPersister, clusterService);
dataCountsReporter.startNewIncrementalCount(); dataCountsReporter.startNewIncrementalCount();
dataCountsReporter.reportLatestTimeIncrementalStats(5001L); dataCountsReporter.reportLatestTimeIncrementalStats(5001L);
assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
@ -158,7 +175,7 @@ public class DataCountsReporterTests extends ESTestCase {
public void testReportRecordsWritten() { public void testReportRecordsWritten() {
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister); jobDataCountsPersister, clusterService);
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 2000);
@ -182,7 +199,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testReportRecordsWritten_Given9999Records() { public void testReportRecordsWritten_Given9999Records() {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 9999; i++) { for (int i = 1; i <= 9999; i++) {
@ -199,7 +216,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testReportRecordsWritten_Given30000Records() { public void testReportRecordsWritten_Given30000Records() {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 30001; i++) { for (int i = 1; i <= 30001; i++) {
@ -216,7 +233,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testReportRecordsWritten_Given100_000Records() { public void testReportRecordsWritten_Given100_000Records() {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 100000; i++) { for (int i = 1; i <= 100000; i++) {
@ -233,7 +250,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testReportRecordsWritten_Given1_000_000Records() { public void testReportRecordsWritten_Given1_000_000Records() {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 1_000_000; i++) { for (int i = 1; i <= 1_000_000; i++) {
@ -250,7 +267,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testReportRecordsWritten_Given2_000_000Records() { public void testReportRecordsWritten_Given2_000_000Records() {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 2_000_000; i++) { for (int i = 1; i <= 2_000_000; i++) {
@ -269,7 +286,7 @@ public class DataCountsReporterTests extends ESTestCase {
public void testFinishReporting() { public void testFinishReporting() {
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()),
jobDataCountsPersister); jobDataCountsPersister, clusterService);
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
Date now = new Date(); Date now = new Date();

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
@ -26,9 +27,9 @@ class DummyDataCountsReporter extends DataCountsReporter {
int logStatusCallCount = 0; int logStatusCallCount = 0;
DummyDataCountsReporter() { DummyDataCountsReporter(ClusterService clusterService) {
super(Settings.EMPTY, createJob(), new DataCounts("DummyJobId"), super(Settings.EMPTY, createJob(), new DataCounts("DummyJobId"),
mock(JobDataCountsPersister.class)); mock(JobDataCountsPersister.class), clusterService);
} }
/** /**

View File

@ -8,7 +8,10 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@ -41,6 +44,7 @@ import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
@ -106,6 +110,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private JobDataCountsPersister jobDataCountsPersister; private JobDataCountsPersister jobDataCountsPersister;
private NormalizerFactory normalizerFactory; private NormalizerFactory normalizerFactory;
private Auditor auditor; private Auditor auditor;
private ClusterService clusterService;
private DataCounts dataCounts = new DataCounts("foo"); private DataCounts dataCounts = new DataCounts("foo");
private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build();
@ -126,6 +131,15 @@ public class AutodetectProcessManagerTests extends ESTestCase {
normalizerFactory = mock(NormalizerFactory.class); normalizerFactory = mock(NormalizerFactory.class);
auditor = mock(Auditor.class); auditor = mock(Auditor.class);
Set<Setting<?>> setOfSettings = new HashSet<>();
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING);
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING);
ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings);
clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -228,7 +242,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3); settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool, AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor)); normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService));
doReturn(executorService).when(manager).createAutodetectExecutorService(any()); doReturn(executorService).when(manager).createAutodetectExecutorService(any());
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
@ -583,7 +597,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
(j, autodetectParams, e, onProcessCrash) -> autodetectProcess; (j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY, AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY,
client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService);
JobTask jobTask = mock(JobTask.class); JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("my_id"); when(jobTask.getJobId()).thenReturn("my_id");
@ -656,7 +670,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
(j, autodetectParams, e, onProcessCrash) -> autodetectProcess; (j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
return new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager, return new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager,
jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService);
} }
private AutodetectParams buildAutodetectParams() { private AutodetectParams buildAutodetectParams() {
@ -682,7 +696,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY, AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY,
client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister,
autodetectProcessFactory, normalizerFactory, autodetectProcessFactory, normalizerFactory,
new NamedXContentRegistry(Collections.emptyList()), auditor); new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService);
manager = spy(manager); manager = spy(manager);
doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), any()); doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), any());
return manager; return manager;