From 9b3764b7fa70e281d57a225e955e19681f0db168 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 19 Dec 2016 17:48:22 +0000 Subject: [PATCH] Max jobs per node cannot be dynamic due to threadpool (elastic/elasticsearch#578) The threadpool that supplies the threads used for job IO cannot be resized, so the number of jobs cannot be dynamic either Original commit: elastic/x-pack-elasticsearch@c584bf71471b29986c328e991ee319065694b962 --- .../xpack/prelert/PrelertPlugin.java | 2 +- .../job/manager/AutodetectProcessManager.java | 9 ++++---- .../job/metadata/JobLifeCycleService.java | 2 +- .../prelert/integration/TooManyJobsIT.java | 12 ++-------- .../AutodetectProcessManagerTests.java | 22 ++++--------------- 5 files changed, 12 insertions(+), 35 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index eb60f76c6ae..67f355e6781 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -189,7 +189,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, autodetectResultsParser, - autodetectProcessFactory, normalizerFactory, clusterService.getClusterSettings()); + autodetectProcessFactory, normalizerFactory); ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, // norelease: we will no longer need to pass the client here after we switch to a client based data extractor new HttpDataExtractorFactory(client), diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java index 15579f2b452..40c80d109d4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java @@ -59,8 +59,9 @@ import java.util.function.Supplier; public class AutodetectProcessManager extends AbstractComponent implements DataProcessor { // TODO (norelease) default needs to be reconsidered + // Cannot be dynamic because the thread pool that is sized to match cannot be resized public static final Setting MAX_RUNNING_JOBS_PER_NODE = - Setting.intSetting("max_running_jobs", 10, 1, 128, Setting.Property.NodeScope, Setting.Property.Dynamic); + Setting.intSetting("max_running_jobs", 10, 1, 512, Setting.Property.NodeScope); private final Client client; private final ThreadPool threadPool; @@ -78,14 +79,13 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP private final ConcurrentMap autoDetectCommunicatorByJob; - private volatile int maxAllowedRunningJobs; + private final int maxAllowedRunningJobs; public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister, JobRenormalizedResultsPersister jobRenormalizedResultsPersister, JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser, - AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, - ClusterSettings clusterSettings) { + AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory) { super(settings); this.client = client; this.threadPool = threadPool; @@ -103,7 +103,6 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP this.jobDataCountsPersister = jobDataCountsPersister; this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); - clusterSettings.addSettingsUpdateConsumer(MAX_RUNNING_JOBS_PER_NODE, val -> maxAllowedRunningJobs = val); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java index ff785154d3b..815a5ab89d3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java @@ -85,7 +85,7 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta try { dataProcessor.openJob(allocation.getJobId(), allocation.isIgnoreDowntime()); } catch (Exception e) { - logger.error("Failed to close job [" + allocation.getJobId() + "]", e); + logger.error("Failed to open job [" + allocation.getJobId() + "]", e); updateJobStatus(allocation.getJobId(), JobStatus.FAILED, "failed to open, " + e.getMessage()); } }); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java index e086a15bcb8..dc701ef1757 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java @@ -42,19 +42,11 @@ public class TooManyJobsIT extends ESIntegTestCase { @After public void clearPrelertMetadata() throws Exception { ScheduledJobsIT.clearPrelertMetadata(client()); - client().admin().cluster().prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), (String) null) - ).get(); } public void testCannotStartTooManyAnalyticalProcesses() throws Exception { - int maxRunningJobsPerNode = randomIntBetween(1, 16); - logger.info("Setting [{}] to [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode); - client().admin().cluster().prepareUpdateSettings() - .setPersistentSettings(Settings.builder() - .put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode) - ).get(); + int maxRunningJobsPerNode = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getDefault(Settings.EMPTY); + logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode); for (int i = 1; i <= (maxRunningJobsPerNode + 1); i++) { Job.Builder job = createJob(Integer.toString(i)); PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true)); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java index ff09721fa54..2119096ee76 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java @@ -8,8 +8,6 @@ package org.elasticsearch.xpack.prelert.job.manager; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.test.ESTestCase; @@ -116,7 +114,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); - ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); + ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable); ExecutorService executorService = mock(ExecutorService.class); doAnswer(invocation -> { @@ -138,13 +136,9 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess; Settings.Builder settings = Settings.builder(); settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); - Set> settingSet = new HashSet<>(); - settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE); - ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet); AutodetectProcessManager manager = new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, - normalizerFactory, clusterSettings); + normalizerFactory); manager.openJob("foo", false); manager.openJob("bar", false); @@ -283,16 +277,12 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id")); when(jobProvider.dataCounts("my_id")).thenReturn(new DataCounts("my_id")); - Set> settingSet = new HashSet<>(); - settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE); - ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet); AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, - normalizerFactory, clusterSettings); + normalizerFactory); expectThrows(EsRejectedExecutionException.class, () -> manager.create("my_id", false)); verify(autodetectProcess, times(1)).close(); @@ -313,13 +303,9 @@ public class AutodetectProcessManagerTests extends ESTestCase { ThreadPool threadPool = mock(ThreadPool.class); AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); - Set> settingSet = new HashSet<>(); - settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE); - ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet); AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, - normalizerFactory, clusterSettings); + normalizerFactory); manager = spy(manager); doReturn(communicator).when(manager).create(any(), anyBoolean()); return manager;