Max jobs per node cannot be dynamic due to threadpool (elastic/elasticsearch#578)

The threadpool that supplies the threads used for job IO cannot be
resized, so the number of jobs cannot be dynamic either

Original commit: elastic/x-pack-elasticsearch@c584bf7147
This commit is contained in:
David Roberts 2016-12-19 17:48:22 +00:00 committed by GitHub
parent f65125df55
commit 9b3764b7fa
5 changed files with 12 additions and 35 deletions

View File

@ -189,7 +189,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, autodetectResultsParser, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, autodetectResultsParser,
autodetectProcessFactory, normalizerFactory, clusterService.getClusterSettings()); autodetectProcessFactory, normalizerFactory);
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor // norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client), new HttpDataExtractorFactory(client),

View File

@ -59,8 +59,9 @@ import java.util.function.Supplier;
public class AutodetectProcessManager extends AbstractComponent implements DataProcessor { public class AutodetectProcessManager extends AbstractComponent implements DataProcessor {
// TODO (norelease) default needs to be reconsidered // TODO (norelease) default needs to be reconsidered
// Cannot be dynamic because the thread pool that is sized to match cannot be resized
public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE = public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE =
Setting.intSetting("max_running_jobs", 10, 1, 128, Setting.Property.NodeScope, Setting.Property.Dynamic); Setting.intSetting("max_running_jobs", 10, 1, 512, Setting.Property.NodeScope);
private final Client client; private final Client client;
private final ThreadPool threadPool; private final ThreadPool threadPool;
@ -78,14 +79,13 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob; private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob;
private volatile int maxAllowedRunningJobs; private final int maxAllowedRunningJobs;
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager,
JobProvider jobProvider, JobResultsPersister jobResultsPersister, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobRenormalizedResultsPersister jobRenormalizedResultsPersister, JobRenormalizedResultsPersister jobRenormalizedResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser, JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory) {
ClusterSettings clusterSettings) {
super(settings); super(settings);
this.client = client; this.client = client;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -103,7 +103,6 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
this.jobDataCountsPersister = jobDataCountsPersister; this.jobDataCountsPersister = jobDataCountsPersister;
this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>();
clusterSettings.addSettingsUpdateConsumer(MAX_RUNNING_JOBS_PER_NODE, val -> maxAllowedRunningJobs = val);
} }
@Override @Override

View File

@ -85,7 +85,7 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
try { try {
dataProcessor.openJob(allocation.getJobId(), allocation.isIgnoreDowntime()); dataProcessor.openJob(allocation.getJobId(), allocation.isIgnoreDowntime());
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to close job [" + allocation.getJobId() + "]", e); logger.error("Failed to open job [" + allocation.getJobId() + "]", e);
updateJobStatus(allocation.getJobId(), JobStatus.FAILED, "failed to open, " + e.getMessage()); updateJobStatus(allocation.getJobId(), JobStatus.FAILED, "failed to open, " + e.getMessage());
} }
}); });

View File

@ -42,19 +42,11 @@ public class TooManyJobsIT extends ESIntegTestCase {
@After @After
public void clearPrelertMetadata() throws Exception { public void clearPrelertMetadata() throws Exception {
ScheduledJobsIT.clearPrelertMetadata(client()); ScheduledJobsIT.clearPrelertMetadata(client());
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), (String) null)
).get();
} }
public void testCannotStartTooManyAnalyticalProcesses() throws Exception { public void testCannotStartTooManyAnalyticalProcesses() throws Exception {
int maxRunningJobsPerNode = randomIntBetween(1, 16); int maxRunningJobsPerNode = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getDefault(Settings.EMPTY);
logger.info("Setting [{}] to [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode); logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode);
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder()
.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxRunningJobsPerNode)
).get();
for (int i = 1; i <= (maxRunningJobsPerNode + 1); i++) { for (int i = 1; i <= (maxRunningJobsPerNode + 1); i++) {
Job.Builder job = createJob(Integer.toString(i)); Job.Builder job = createJob(Integer.toString(i));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true)); PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true));

View File

@ -8,8 +8,6 @@ package org.elasticsearch.xpack.prelert.job.manager;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -116,7 +114,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Client client = mock(Client.class); Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class);
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable);
ExecutorService executorService = mock(ExecutorService.class); ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> { doAnswer(invocation -> {
@ -138,13 +136,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess; AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess;
Settings.Builder settings = Settings.builder(); Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
Set<Setting<?>> settingSet = new HashSet<>();
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
AutodetectProcessManager manager = new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory, clusterSettings); normalizerFactory);
manager.openJob("foo", false); manager.openJob("foo", false);
manager.openJob("bar", false); manager.openJob("bar", false);
@ -283,16 +277,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id")); when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id"));
when(jobProvider.dataCounts("my_id")).thenReturn(new DataCounts("my_id")); when(jobProvider.dataCounts("my_id")).thenReturn(new DataCounts("my_id"));
Set<Setting<?>> settingSet = new HashSet<>();
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess; AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory, clusterSettings); normalizerFactory);
expectThrows(EsRejectedExecutionException.class, () -> manager.create("my_id", false)); expectThrows(EsRejectedExecutionException.class, () -> manager.create("my_id", false));
verify(autodetectProcess, times(1)).close(); verify(autodetectProcess, times(1)).close();
@ -313,13 +303,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
Set<Setting<?>> settingSet = new HashSet<>();
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory, clusterSettings); normalizerFactory);
manager = spy(manager); manager = spy(manager);
doReturn(communicator).when(manager).create(any(), anyBoolean()); doReturn(communicator).when(manager).create(any(), anyBoolean());
return manager; return manager;