mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 17:38:44 +00:00
Register thread pool settings
This commit refactors the handling of thread pool settings so that the individual settings can be registered rather than registering the top level group. With this refactoring, individual plugins must now register their own settings for custom thread pools that they need, but a dedicated API is provided for this in the thread pool module. This commit also renames the prefix on the thread pool settings from "threadpool" to "thread_pool". This enables a hard break on the settings so that: - some of the settings can be given more sensible names (e.g., the max number of threads in a scaling thread pool is now named "max" instead of "size") - change the soft limit on the number of threads in the bulk and indexing thread pools to a hard limit - the settings names for custom plugins for thread pools can be prefixed (e.g., "xpack.watcher.thread_pool.size") - remove dynamic thread pool settings Relates #18674
This commit is contained in:
parent
3dfe2bbcb6
commit
da74323141
@ -190,7 +190,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
|
||||
ThreadPool.THREADPOOL_GROUP_SETTING,
|
||||
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
|
||||
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
|
||||
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
|
||||
@ -419,6 +418,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
ResourceWatcherService.RELOAD_INTERVAL_HIGH,
|
||||
ResourceWatcherService.RELOAD_INTERVAL_MEDIUM,
|
||||
ResourceWatcherService.RELOAD_INTERVAL_LOW,
|
||||
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING
|
||||
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
|
||||
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING
|
||||
)));
|
||||
}
|
||||
|
@ -83,13 +83,16 @@ public class EsExecutors {
|
||||
}
|
||||
|
||||
public static String threadName(Settings settings, String namePrefix) {
|
||||
String name = settings.get("node.name");
|
||||
if (name == null) {
|
||||
name = "elasticsearch";
|
||||
String nodeName = settings.get("node.name");
|
||||
if (nodeName == null) {
|
||||
return threadName("", namePrefix);
|
||||
} else {
|
||||
name = "elasticsearch[" + name + "]";
|
||||
return threadName(nodeName, namePrefix);
|
||||
}
|
||||
return name + "[" + namePrefix + "]";
|
||||
}
|
||||
|
||||
public static String threadName(final String nodeName, final String namePrefix) {
|
||||
return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]";
|
||||
}
|
||||
|
||||
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
|
||||
|
@ -98,6 +98,7 @@ import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.snapshots.SnapshotShardsService;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.tasks.TaskResultsService;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPoolModule;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
@ -210,11 +211,12 @@ public class Node implements Closeable {
|
||||
throw new IllegalStateException("Failed to created node environment", ex);
|
||||
}
|
||||
final NetworkService networkService = new NetworkService(settings);
|
||||
final ThreadPool threadPool = new ThreadPool(settings);
|
||||
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
|
||||
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
|
||||
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
boolean success = false;
|
||||
try {
|
||||
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
|
||||
ModulesBuilder modules = new ModulesBuilder();
|
||||
modules.add(new Version.Module(version));
|
||||
modules.add(new CircuitBreakerModule(settings));
|
||||
@ -222,6 +224,7 @@ public class Node implements Closeable {
|
||||
for (Module pluginModule : pluginsService.nodeModules()) {
|
||||
modules.add(pluginModule);
|
||||
}
|
||||
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
|
||||
modules.add(new PluginsModule(pluginsService));
|
||||
SettingsModule settingsModule = new SettingsModule(this.settings);
|
||||
modules.add(settingsModule);
|
||||
@ -232,7 +235,8 @@ public class Node implements Closeable {
|
||||
modules.add(scriptModule);
|
||||
modules.add(new NodeEnvironmentModule(nodeEnvironment));
|
||||
modules.add(new ClusterNameModule(this.settings));
|
||||
modules.add(new ThreadPoolModule(threadPool));
|
||||
final ThreadPoolModule threadPoolModule = new ThreadPoolModule(threadPool);
|
||||
modules.add(threadPoolModule);
|
||||
modules.add(new DiscoveryModule(this.settings));
|
||||
modules.add(new ClusterModule(this.settings));
|
||||
modules.add(new IndicesModule());
|
||||
@ -246,11 +250,14 @@ public class Node implements Closeable {
|
||||
modules.add(new AnalysisModule(environment));
|
||||
|
||||
pluginsService.processModules(modules);
|
||||
|
||||
scriptModule.prepareSettings(settingsModule);
|
||||
|
||||
threadPoolModule.prepareSettings(settingsModule);
|
||||
|
||||
injector = modules.createInjector();
|
||||
|
||||
client = injector.getInstance(Client.class);
|
||||
threadPool.setClusterSettings(injector.getInstance(ClusterSettings.class));
|
||||
success = true;
|
||||
} catch (IOException ex) {
|
||||
throw new ElasticsearchException("failed to bind service", ex);
|
||||
|
@ -23,9 +23,12 @@ import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* An extension point allowing to plug in custom functionality.
|
||||
@ -80,4 +83,15 @@ public abstract class Plugin {
|
||||
*/
|
||||
@Deprecated
|
||||
public final void onModule(IndexModule indexModule) {}
|
||||
|
||||
/**
|
||||
* Provides the list of this plugin's custom thread pools, empty if
|
||||
* none.
|
||||
*
|
||||
* @param settings the current settings
|
||||
* @return executors builders for this plugin's custom thread pools
|
||||
*/
|
||||
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
@ -261,6 +262,14 @@ public class PluginsService extends AbstractComponent {
|
||||
return modules;
|
||||
}
|
||||
|
||||
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
|
||||
final ArrayList<ExecutorBuilder<?>> builders = new ArrayList<>();
|
||||
for (final Tuple<PluginInfo, Plugin> plugin : plugins) {
|
||||
builders.addAll(plugin.v2().getExecutorBuilders(settings));
|
||||
}
|
||||
return builders;
|
||||
}
|
||||
|
||||
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
|
||||
List<Class<? extends LifecycleComponent>> services = new ArrayList<>();
|
||||
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
|
||||
|
@ -0,0 +1,91 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Base class for executor builders.
|
||||
*
|
||||
* @param <U> the underlying type of the executor settings
|
||||
*/
|
||||
public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {
|
||||
|
||||
private final String name;
|
||||
|
||||
public ExecutorBuilder(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
protected String name() {
|
||||
return name;
|
||||
}
|
||||
|
||||
protected static String settingsKey(final String prefix, final String key) {
|
||||
return String.join(".", prefix, key);
|
||||
}
|
||||
|
||||
/**
|
||||
* The list of settings this builder will register.
|
||||
*
|
||||
* @return the list of registered settings
|
||||
*/
|
||||
abstract List<Setting<?>> getRegisteredSettings();
|
||||
|
||||
/**
|
||||
* Return an executor settings object from the node-level settings.
|
||||
*
|
||||
* @param settings the node-level settings
|
||||
* @return the executor settings object
|
||||
*/
|
||||
abstract U getSettings(Settings settings);
|
||||
|
||||
/**
|
||||
* Builds the executor with the specified executor settings.
|
||||
*
|
||||
* @param settings the executor settings
|
||||
* @param threadContext the current thread context
|
||||
* @return a new executor built from the specified executor settings
|
||||
*/
|
||||
abstract ThreadPool.ExecutorHolder build(U settings, ThreadContext threadContext);
|
||||
|
||||
/**
|
||||
* Format the thread pool info object for this executor.
|
||||
*
|
||||
* @param info the thread pool info object to format
|
||||
* @return a formatted thread pool info (useful for logging)
|
||||
*/
|
||||
abstract String formatInfo(ThreadPool.Info info);
|
||||
|
||||
static abstract class ExecutorSettings {
|
||||
|
||||
protected final String nodeName;
|
||||
|
||||
public ExecutorSettings(String nodeName) {
|
||||
this.nodeName = nodeName;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,135 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.node.Node;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
/**
|
||||
* A builder for fixed executors.
|
||||
*/
|
||||
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {
|
||||
|
||||
private final Setting<Integer> sizeSetting;
|
||||
private final Setting<Integer> queueSizeSetting;
|
||||
|
||||
/**
|
||||
* Construct a fixed executor builder; the settings will have the
|
||||
* key prefix "thread_pool." followed by the executor name.
|
||||
*
|
||||
* @param settings the node-level settings
|
||||
* @param name the name of the executor
|
||||
* @param size the fixed number of threads
|
||||
* @param queueSize the size of the backing queue, -1 for unbounded
|
||||
*/
|
||||
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
|
||||
this(settings, name, size, queueSize, "thread_pool." + name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a fixed executor builder.
|
||||
*
|
||||
* @param settings the node-level settings
|
||||
* @param name the name of the executor
|
||||
* @param size the fixed number of threads
|
||||
* @param queueSize the size of the backing queue, -1 for unbounded
|
||||
* @param prefix the prefix for the settings keys
|
||||
*/
|
||||
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
|
||||
super(name);
|
||||
final String sizeKey = settingsKey(prefix, "size");
|
||||
this.sizeSetting =
|
||||
new Setting<>(
|
||||
sizeKey,
|
||||
s -> Integer.toString(size),
|
||||
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
|
||||
Setting.Property.NodeScope);
|
||||
final String queueSizeKey = settingsKey(prefix, "queue_size");
|
||||
this.queueSizeSetting =
|
||||
Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
|
||||
}
|
||||
|
||||
private int applyHardSizeLimit(final Settings settings, final String name) {
|
||||
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
|
||||
return 1 + EsExecutors.boundedNumberOfProcessors(settings);
|
||||
} else {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
List<Setting<?>> getRegisteredSettings() {
|
||||
return Arrays.asList(sizeSetting, queueSizeSetting);
|
||||
}
|
||||
|
||||
@Override
|
||||
FixedExecutorSettings getSettings(Settings settings) {
|
||||
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
|
||||
final int size = sizeSetting.get(settings);
|
||||
final int queueSize = queueSizeSetting.get(settings);
|
||||
return new FixedExecutorSettings(nodeName, size, queueSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) {
|
||||
int size = settings.size;
|
||||
int queueSize = settings.queueSize;
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
|
||||
Executor executor = EsExecutors.newFixed(name(), size, queueSize, threadFactory, threadContext);
|
||||
final ThreadPool.Info info =
|
||||
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
|
||||
return new ThreadPool.ExecutorHolder(executor, info);
|
||||
}
|
||||
|
||||
@Override
|
||||
String formatInfo(ThreadPool.Info info) {
|
||||
return String.format(
|
||||
Locale.ROOT,
|
||||
"name [%s], size [%d], queue size [%s]",
|
||||
info.getName(),
|
||||
info.getMax(),
|
||||
info.getQueueSize() == null ? "unbounded" : info.getQueueSize());
|
||||
}
|
||||
|
||||
static class FixedExecutorSettings extends ExecutorBuilder.ExecutorSettings {
|
||||
|
||||
private final int size;
|
||||
private final int queueSize;
|
||||
|
||||
public FixedExecutorSettings(final String nodeName, final int size, final int queueSize) {
|
||||
super(nodeName);
|
||||
this.size = size;
|
||||
this.queueSize = queueSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.node.Node;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* A builder for scaling executors.
|
||||
*/
|
||||
public final class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecutorBuilder.ScalingExecutorSettings> {
|
||||
|
||||
private final Setting<Integer> coreSetting;
|
||||
private final Setting<Integer> maxSetting;
|
||||
private final Setting<TimeValue> keepAliveSetting;
|
||||
|
||||
/**
|
||||
* Construct a scaling executor builder; the settings will have the
|
||||
* key prefix "thread_pool." followed by the executor name.
|
||||
*
|
||||
* @param name the name of the executor
|
||||
* @param core the minimum number of threads in the pool
|
||||
* @param max the maximum number of threads in the pool
|
||||
* @param keepAlive the time that spare threads above {@code core}
|
||||
* threads will be kept alive
|
||||
*/
|
||||
public ScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive) {
|
||||
this(name, core, max, keepAlive, "thread_pool." + name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a scaling executor builder; the settings will have the
|
||||
* specified key prefix.
|
||||
*
|
||||
* @param name the name of the executor
|
||||
* @param core the minimum number of threads in the pool
|
||||
* @param max the maximum number of threads in the pool
|
||||
* @param keepAlive the time that spare threads above {@code core}
|
||||
* threads will be kept alive
|
||||
* @param prefix the prefix for the settings keys
|
||||
*/
|
||||
public ScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive, final String prefix) {
|
||||
super(name);
|
||||
this.coreSetting =
|
||||
Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
|
||||
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
|
||||
this.keepAliveSetting =
|
||||
Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
|
||||
}
|
||||
|
||||
@Override
|
||||
List<Setting<?>> getRegisteredSettings() {
|
||||
return Arrays.asList(coreSetting, maxSetting, keepAliveSetting);
|
||||
}
|
||||
|
||||
@Override
|
||||
ScalingExecutorSettings getSettings(Settings settings) {
|
||||
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
|
||||
final int coreThreads = coreSetting.get(settings);
|
||||
final int maxThreads = maxSetting.get(settings);
|
||||
final TimeValue keepAlive = keepAliveSetting.get(settings);
|
||||
return new ScalingExecutorSettings(nodeName, coreThreads, maxThreads, keepAlive);
|
||||
}
|
||||
|
||||
ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final ThreadContext threadContext) {
|
||||
TimeValue keepAlive = settings.keepAlive;
|
||||
int core = settings.core;
|
||||
int max = settings.max;
|
||||
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
|
||||
final Executor executor =
|
||||
EsExecutors.newScaling(name(), core, max, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
|
||||
return new ThreadPool.ExecutorHolder(executor, info);
|
||||
}
|
||||
|
||||
@Override
|
||||
String formatInfo(ThreadPool.Info info) {
|
||||
return String.format(
|
||||
Locale.ROOT,
|
||||
"name [%s], core [%d], max [%d], keep alive [%s]",
|
||||
info.getName(),
|
||||
info.getMin(),
|
||||
info.getMax(),
|
||||
info.getKeepAlive());
|
||||
}
|
||||
|
||||
static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings {
|
||||
|
||||
private final int core;
|
||||
private final int max;
|
||||
private final TimeValue keepAlive;
|
||||
|
||||
public ScalingExecutorSettings(final String nodeName, final int core, final int max, final TimeValue keepAlive) {
|
||||
super(nodeName);
|
||||
this.core = core;
|
||||
this.max = max;
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -26,11 +26,8 @@ import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
|
||||
@ -45,31 +42,22 @@ import org.elasticsearch.node.Node;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.common.unit.SizeValue.parseSizeValue;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class ThreadPool extends AbstractComponent implements Closeable {
|
||||
|
||||
public static class Names {
|
||||
@ -146,164 +134,85 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
||||
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
private static void add(Map<String, Settings> executorSettings, ExecutorSettingsBuilder builder) {
|
||||
Settings settings = builder.build();
|
||||
String name = settings.get("name");
|
||||
executorSettings.put(name, settings);
|
||||
}
|
||||
|
||||
private static abstract class ExecutorSettingsBuilder<T extends ExecutorSettingsBuilder<T>> {
|
||||
|
||||
private final Settings.Builder builder;
|
||||
|
||||
protected ExecutorSettingsBuilder(String name, ThreadPoolType threadPoolType) {
|
||||
if (THREAD_POOL_TYPES.get(name) != threadPoolType) {
|
||||
throw new IllegalArgumentException("thread pool [" + name + "] must be of type [" + threadPoolType + "]");
|
||||
}
|
||||
builder = Settings.builder();
|
||||
builder.put("name", name);
|
||||
builder.put("type", threadPoolType.getType());
|
||||
}
|
||||
|
||||
public T keepAlive(String keepAlive) {
|
||||
return add("keep_alive", keepAlive);
|
||||
}
|
||||
|
||||
public T queueSize(int queueSize) {
|
||||
return add("queue_size", queueSize);
|
||||
}
|
||||
|
||||
protected T add(String setting, int value) {
|
||||
return add(setting, Integer.toString(value));
|
||||
}
|
||||
|
||||
|
||||
protected T add(String setting, String value) {
|
||||
builder.put(setting, value);
|
||||
@SuppressWarnings("unchecked") final T executor = (T)this;
|
||||
return executor;
|
||||
}
|
||||
|
||||
public final Settings build() { return builder.build(); }
|
||||
|
||||
}
|
||||
|
||||
private static class FixedExecutorSettingsBuilder extends ExecutorSettingsBuilder<FixedExecutorSettingsBuilder> {
|
||||
|
||||
public FixedExecutorSettingsBuilder(String name) {
|
||||
super(name, ThreadPoolType.FIXED);
|
||||
}
|
||||
|
||||
public FixedExecutorSettingsBuilder size(int size) {
|
||||
return add("size", Integer.toString(size));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class ScalingExecutorSettingsBuilder extends ExecutorSettingsBuilder<ScalingExecutorSettingsBuilder> {
|
||||
|
||||
public ScalingExecutorSettingsBuilder(String name) {
|
||||
super(name, ThreadPoolType.SCALING);
|
||||
}
|
||||
|
||||
public ScalingExecutorSettingsBuilder min(int min) {
|
||||
return add("min", min);
|
||||
}
|
||||
|
||||
|
||||
public ScalingExecutorSettingsBuilder size(int size) {
|
||||
return add("size", size);
|
||||
}
|
||||
}
|
||||
|
||||
public static final Setting<Settings> THREADPOOL_GROUP_SETTING =
|
||||
Setting.groupSetting("threadpool.", Property.Dynamic, Property.NodeScope);
|
||||
|
||||
private volatile Map<String, ExecutorHolder> executors;
|
||||
|
||||
private final Map<String, Settings> defaultExecutorTypeSettings;
|
||||
|
||||
private final Queue<ExecutorHolder> retiredExecutors = new ConcurrentLinkedQueue<>();
|
||||
private Map<String, ExecutorHolder> executors = new HashMap<>();
|
||||
|
||||
private final ScheduledThreadPoolExecutor scheduler;
|
||||
|
||||
private final EstimatedTimeThread estimatedTimeThread;
|
||||
|
||||
private final AtomicBoolean settingsListenerIsSet = new AtomicBoolean(false);
|
||||
|
||||
static final Executor DIRECT_EXECUTOR = command -> command.run();
|
||||
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
public ThreadPool(String name) {
|
||||
this(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build());
|
||||
private final Map<String, ExecutorBuilder> builders;
|
||||
|
||||
public Collection<ExecutorBuilder> builders() {
|
||||
return Collections.unmodifiableCollection(builders.values());
|
||||
}
|
||||
|
||||
public ThreadPool(Settings settings) {
|
||||
public static Setting<TimeValue> ESTIMATED_TIME_INTERVAL_SETTING =
|
||||
Setting.timeSetting("thread_pool.estimated_time_interval", TimeValue.timeValueMillis(200), Setting.Property.NodeScope);
|
||||
|
||||
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
|
||||
super(settings);
|
||||
|
||||
assert Node.NODE_NAME_SETTING.exists(settings) : "ThreadPool's settings should contain a name";
|
||||
threadContext = new ThreadContext(settings);
|
||||
Map<String, Settings> groupSettings = THREADPOOL_GROUP_SETTING.get(settings).getAsGroups();
|
||||
validate(groupSettings);
|
||||
assert Node.NODE_NAME_SETTING.exists(settings);
|
||||
|
||||
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
|
||||
int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
|
||||
int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
|
||||
Map<String, Settings> defaultExecutorTypeSettings = new HashMap<>();
|
||||
int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
|
||||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.GENERIC).min(4).size(genericThreadPoolMax).keepAlive("30s"));
|
||||
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200));
|
||||
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50));
|
||||
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000));
|
||||
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000));
|
||||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.MANAGEMENT).min(1).size(5).keepAlive("5m"));
|
||||
final Map<String, ExecutorBuilder> builders = new HashMap<>();
|
||||
final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
|
||||
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
|
||||
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
|
||||
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
|
||||
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
|
||||
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
|
||||
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 50));
|
||||
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
|
||||
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, ((availableProcessors * 3) / 2) + 1, 1000));
|
||||
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
|
||||
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
|
||||
// the assumption here is that the listeners should be very lightweight on the listeners side
|
||||
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10));
|
||||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FLUSH).min(1).size(halfProcMaxAt5).keepAlive("5m"));
|
||||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.REFRESH).min(1).size(halfProcMaxAt10).keepAlive("5m"));
|
||||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.WARMER).min(1).size(halfProcMaxAt5).keepAlive("5m"));
|
||||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.SNAPSHOT).min(1).size(halfProcMaxAt5).keepAlive("5m"));
|
||||
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
|
||||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).min(1).size(availableProcessors * 2).keepAlive("5m"));
|
||||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).min(1).size(availableProcessors * 2).keepAlive("5m"));
|
||||
|
||||
this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings);
|
||||
|
||||
Map<String, ExecutorHolder> executors = new HashMap<>();
|
||||
for (Map.Entry<String, Settings> executor : defaultExecutorTypeSettings.entrySet()) {
|
||||
executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue()));
|
||||
}
|
||||
|
||||
// Building custom thread pools
|
||||
for (Map.Entry<String, Settings> entry : groupSettings.entrySet()) {
|
||||
if (executors.containsKey(entry.getKey())) {
|
||||
continue;
|
||||
builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));
|
||||
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
|
||||
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
|
||||
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
|
||||
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
|
||||
builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
|
||||
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
|
||||
builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
|
||||
for (final ExecutorBuilder<?> builder : customBuilders) {
|
||||
if (builders.containsKey(builder.name())) {
|
||||
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
|
||||
}
|
||||
executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY));
|
||||
builders.put(builder.name(), builder);
|
||||
}
|
||||
this.builders = Collections.unmodifiableMap(builders);
|
||||
|
||||
threadContext = new ThreadContext(settings);
|
||||
|
||||
final Map<String, ExecutorHolder> executors = new HashMap<>();
|
||||
for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
|
||||
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
|
||||
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
|
||||
if (executors.containsKey(executorHolder.info.getName())) {
|
||||
throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
|
||||
}
|
||||
logger.debug("created thread pool: " + entry.getValue().formatInfo(executorHolder.info));
|
||||
executors.put(entry.getKey(), executorHolder);
|
||||
}
|
||||
|
||||
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
|
||||
this.executors = unmodifiableMap(executors);
|
||||
|
||||
this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
|
||||
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||
this.scheduler.setRemoveOnCancelPolicy(true);
|
||||
|
||||
TimeValue estimatedTimeInterval = settings.getAsTime("threadpool.estimated_time_interval", TimeValue.timeValueMillis(200));
|
||||
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
|
||||
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
|
||||
this.estimatedTimeThread.start();
|
||||
}
|
||||
|
||||
public void setClusterSettings(ClusterSettings clusterSettings) {
|
||||
if(settingsListenerIsSet.compareAndSet(false, true)) {
|
||||
clusterSettings.addSettingsUpdateConsumer(THREADPOOL_GROUP_SETTING, this::updateSettings, (s) -> validate(s.getAsGroups()));
|
||||
} else {
|
||||
throw new IllegalStateException("the node settings listener was set more then once");
|
||||
}
|
||||
}
|
||||
|
||||
public long estimatedTimeInMillis() {
|
||||
return estimatedTimeThread.estimatedTimeInMillis();
|
||||
}
|
||||
@ -440,12 +349,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
||||
((ThreadPoolExecutor) executor.executor()).shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
ExecutorHolder holder;
|
||||
while ((holder = retiredExecutors.poll()) != null) {
|
||||
ThreadPoolExecutor executor = (ThreadPoolExecutor) holder.executor();
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
@ -456,142 +359,10 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
ExecutorHolder holder;
|
||||
while ((holder = retiredExecutors.poll()) != null) {
|
||||
ThreadPoolExecutor executor = (ThreadPoolExecutor) holder.executor();
|
||||
result &= executor.awaitTermination(timeout, unit);
|
||||
}
|
||||
|
||||
estimatedTimeThread.join(unit.toMillis(timeout));
|
||||
return result;
|
||||
}
|
||||
|
||||
private ExecutorHolder build(String name, @Nullable Settings settings, Settings defaultSettings) {
|
||||
return rebuild(name, null, settings, defaultSettings);
|
||||
}
|
||||
|
||||
private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolder, @Nullable Settings settings, Settings defaultSettings) {
|
||||
if (Names.SAME.equals(name)) {
|
||||
// Don't allow to change the "same" thread executor
|
||||
return previousExecutorHolder;
|
||||
}
|
||||
if (settings == null) {
|
||||
settings = Settings.Builder.EMPTY_SETTINGS;
|
||||
}
|
||||
Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null;
|
||||
String type = settings.get("type", previousInfo != null ? previousInfo.getThreadPoolType().getType() : defaultSettings.get("type"));
|
||||
ThreadPoolType threadPoolType = ThreadPoolType.fromType(type);
|
||||
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name);
|
||||
if (ThreadPoolType.DIRECT == threadPoolType) {
|
||||
if (previousExecutorHolder != null) {
|
||||
logger.debug("updating thread pool [{}], type [{}]", name, type);
|
||||
} else {
|
||||
logger.debug("creating thread pool [{}], type [{}]", name, type);
|
||||
}
|
||||
return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType));
|
||||
} else if (ThreadPoolType.FIXED == threadPoolType) {
|
||||
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
|
||||
SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null));
|
||||
|
||||
if (previousExecutorHolder != null) {
|
||||
assert previousInfo != null;
|
||||
if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) {
|
||||
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
|
||||
if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) {
|
||||
int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax()));
|
||||
if (previousInfo.getMax() != updatedSize) {
|
||||
logger.debug("updating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
|
||||
// if you think this code is crazy: that's because it is!
|
||||
if (updatedSize > previousInfo.getMax()) {
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize);
|
||||
} else {
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
|
||||
}
|
||||
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedSize, updatedSize, null, updatedQueueSize));
|
||||
}
|
||||
return previousExecutorHolder;
|
||||
}
|
||||
}
|
||||
if (previousInfo.getMax() >= 0) {
|
||||
defaultSize = previousInfo.getMax();
|
||||
}
|
||||
defaultQueueSize = previousInfo.getQueueSize();
|
||||
}
|
||||
|
||||
int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize));
|
||||
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
|
||||
logger.debug("creating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
|
||||
Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory, threadContext);
|
||||
return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize));
|
||||
} else if (ThreadPoolType.SCALING == threadPoolType) {
|
||||
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
|
||||
int defaultMin = defaultSettings.getAsInt("min", 1);
|
||||
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
|
||||
final Integer queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", null));
|
||||
if (queueSize != null) {
|
||||
throw new IllegalArgumentException("thread pool [" + name + "] of type scaling can not have its queue re-sized but was [" + queueSize + "]");
|
||||
}
|
||||
if (previousExecutorHolder != null) {
|
||||
if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) {
|
||||
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
|
||||
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
|
||||
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
|
||||
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) {
|
||||
logger.debug("updating thread pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
|
||||
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
if (previousInfo.getMin() != updatedMin) {
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedMin);
|
||||
}
|
||||
if (previousInfo.getMax() != updatedSize) {
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
|
||||
}
|
||||
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedMin, updatedSize, updatedKeepAlive, null));
|
||||
}
|
||||
return previousExecutorHolder;
|
||||
}
|
||||
if (previousInfo.getKeepAlive() != null) {
|
||||
defaultKeepAlive = previousInfo.getKeepAlive();
|
||||
}
|
||||
if (previousInfo.getMin() >= 0) {
|
||||
defaultMin = previousInfo.getMin();
|
||||
}
|
||||
if (previousInfo.getMax() >= 0) {
|
||||
defaultSize = previousInfo.getMax();
|
||||
}
|
||||
}
|
||||
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
|
||||
int min = settings.getAsInt("min", defaultMin);
|
||||
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize));
|
||||
if (previousExecutorHolder != null) {
|
||||
logger.debug("updating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
|
||||
} else {
|
||||
logger.debug("creating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
|
||||
}
|
||||
Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
|
||||
return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null));
|
||||
}
|
||||
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");
|
||||
}
|
||||
|
||||
private int applyHardSizeLimit(String name, int size) {
|
||||
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
|
||||
if ((name.equals(Names.BULK) || name.equals(Names.INDEX)) && size > availableProcessors) {
|
||||
// We use a hard max size for the indexing pools, because if too many threads enter Lucene's IndexWriter, it means
|
||||
// too many segments written, too frequently, too much merging, etc:
|
||||
// TODO: I would love to be loud here (throw an exception if you ask for a too-big size), but I think this is dangerous
|
||||
// because on upgrade this setting could be in cluster state and hard for the user to correct?
|
||||
logger.warn("requested thread pool size [{}] for [{}] is too large; setting to maximum [{}] instead",
|
||||
size, name, availableProcessors);
|
||||
size = availableProcessors;
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constrains a value between minimum and maximum values
|
||||
* (inclusive).
|
||||
@ -618,92 +389,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
||||
return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
private void updateSettings(Settings settings) {
|
||||
Map<String, Settings> groupSettings = settings.getAsGroups();
|
||||
if (groupSettings.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (Map.Entry<String, Settings> executor : defaultExecutorTypeSettings.entrySet()) {
|
||||
Settings updatedSettings = groupSettings.get(executor.getKey());
|
||||
if (updatedSettings == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ExecutorHolder oldExecutorHolder = executors.get(executor.getKey());
|
||||
ExecutorHolder newExecutorHolder = rebuild(executor.getKey(), oldExecutorHolder, updatedSettings, executor.getValue());
|
||||
if (!oldExecutorHolder.equals(newExecutorHolder)) {
|
||||
Map<String, ExecutorHolder> newExecutors = new HashMap<>(executors);
|
||||
newExecutors.put(executor.getKey(), newExecutorHolder);
|
||||
executors = unmodifiableMap(newExecutors);
|
||||
if (!oldExecutorHolder.executor().equals(newExecutorHolder.executor()) && oldExecutorHolder.executor() instanceof EsThreadPoolExecutor) {
|
||||
retiredExecutors.add(oldExecutorHolder);
|
||||
((EsThreadPoolExecutor) oldExecutorHolder.executor()).shutdown(new ExecutorShutdownListener(oldExecutorHolder));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Building custom thread pools
|
||||
for (Map.Entry<String, Settings> entry : groupSettings.entrySet()) {
|
||||
if (defaultExecutorTypeSettings.containsKey(entry.getKey())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ExecutorHolder oldExecutorHolder = executors.get(entry.getKey());
|
||||
ExecutorHolder newExecutorHolder = rebuild(entry.getKey(), oldExecutorHolder, entry.getValue(), Settings.EMPTY);
|
||||
// Can't introduce new thread pools at runtime, because The oldExecutorHolder variable will be null in the
|
||||
// case the settings contains a thread pool not defined in the initial settings in the constructor. The if
|
||||
// statement will then fail and so this prevents the addition of new thread groups at runtime, which is desired.
|
||||
if (!newExecutorHolder.equals(oldExecutorHolder)) {
|
||||
Map<String, ExecutorHolder> newExecutors = new HashMap<>(executors);
|
||||
newExecutors.put(entry.getKey(), newExecutorHolder);
|
||||
executors = unmodifiableMap(newExecutors);
|
||||
if (!oldExecutorHolder.executor().equals(newExecutorHolder.executor()) && oldExecutorHolder.executor() instanceof EsThreadPoolExecutor) {
|
||||
retiredExecutors.add(oldExecutorHolder);
|
||||
((EsThreadPoolExecutor) oldExecutorHolder.executor()).shutdown(new ExecutorShutdownListener(oldExecutorHolder));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validate(Map<String, Settings> groupSettings) {
|
||||
for (String key : groupSettings.keySet()) {
|
||||
if (!THREAD_POOL_TYPES.containsKey(key)) {
|
||||
continue;
|
||||
}
|
||||
String type = groupSettings.get(key).get("type");
|
||||
ThreadPoolType correctThreadPoolType = THREAD_POOL_TYPES.get(key);
|
||||
// TODO: the type equality check can be removed after #3760/#6732 are addressed
|
||||
if (type != null && !correctThreadPoolType.getType().equals(type)) {
|
||||
throw new IllegalArgumentException("setting " + THREADPOOL_GROUP_SETTING.getKey() + key + ".type to " + type + " is not permitted; must be " + correctThreadPoolType.getType());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A thread pool size can also be unbounded and is represented by -1, which is not supported by SizeValue (which only supports positive numbers)
|
||||
*/
|
||||
private SizeValue getAsSizeOrUnbounded(Settings settings, String setting, SizeValue defaultValue) throws SettingsException {
|
||||
if ("-1".equals(settings.get(setting))) {
|
||||
return null;
|
||||
}
|
||||
return parseSizeValue(settings.get(setting), defaultValue);
|
||||
}
|
||||
|
||||
class ExecutorShutdownListener implements EsThreadPoolExecutor.ShutdownListener {
|
||||
|
||||
private ExecutorHolder holder;
|
||||
|
||||
public ExecutorShutdownListener(ExecutorHolder holder) {
|
||||
this.holder = holder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminated() {
|
||||
retiredExecutors.remove(holder);
|
||||
}
|
||||
}
|
||||
|
||||
class LoggingRunnable implements Runnable {
|
||||
|
||||
private final Runnable runnable;
|
||||
|
@ -20,20 +20,25 @@
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class ThreadPoolModule extends AbstractModule {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
public ThreadPoolModule(ThreadPool threadPool) {
|
||||
public ThreadPoolModule(final ThreadPool threadPool) {
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public void prepareSettings(SettingsModule settingsModule) {
|
||||
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
|
||||
builder.getRegisteredSettings().forEach(settingsModule::registerSetting);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(ThreadPool.class).toInstance(threadPool);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -45,12 +45,12 @@ public class RejectionActionIT extends ESIntegTestCase {
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("threadpool.search.size", 1)
|
||||
.put("threadpool.search.queue_size", 1)
|
||||
.put("threadpool.index.size", 1)
|
||||
.put("threadpool.index.queue_size", 1)
|
||||
.put("threadpool.get.size", 1)
|
||||
.put("threadpool.get.queue_size", 1)
|
||||
.put("thread_pool.search.size", 1)
|
||||
.put("thread_pool.search.queue_size", 1)
|
||||
.put("thread_pool.index.size", 1)
|
||||
.put("thread_pool.index.queue_size", 1)
|
||||
.put("thread_pool.get.size", 1)
|
||||
.put("thread_pool.get.queue_size", 1)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -42,6 +42,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
@ -72,7 +73,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool(TransportTasksActionTests.class.getSimpleName());
|
||||
threadPool = new TestThreadPool(TransportTasksActionTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -52,10 +52,10 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
// don't mess with this one! It's quite sensitive to a low queue size
|
||||
// (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
|
||||
//.put("threadpool.listener.queue_size", 1)
|
||||
.put("threadpool.get.queue_size", 1)
|
||||
//.put("thread_pool.listener.queue_size", 1)
|
||||
.put("thread_pool.get.queue_size", 1)
|
||||
// default is 50
|
||||
.put("threadpool.bulk.queue_size", 30)
|
||||
.put("thread_pool.bulk.queue_size", 30)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -38,6 +38,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
@ -63,7 +64,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool("TransportBulkActionTookTests");
|
||||
threadPool = new TestThreadPool("TransportBulkActionTookTests");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.support;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
|
||||
@ -31,7 +32,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
public class ListenableActionFutureTests extends ESTestCase {
|
||||
|
||||
public void testListenerIsCallableFromNetworkThreads() throws Throwable {
|
||||
ThreadPool threadPool = new ThreadPool("testListenerIsCallableFromNetworkThreads");
|
||||
ThreadPool threadPool = new TestThreadPool("testListenerIsCallableFromNetworkThreads");
|
||||
try {
|
||||
final PlainListenableActionFuture<Object> future = new PlainListenableActionFuture<>(threadPool);
|
||||
final CountDownLatch listenerCalled = new CountDownLatch(1);
|
||||
|
@ -55,6 +55,7 @@ import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
@ -182,7 +183,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
THREAD_POOL = new ThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName());
|
||||
THREAD_POOL = new TestThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -45,6 +45,7 @@ import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
@ -76,7 +77,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool("TransportMasterNodeActionTests");
|
||||
threadPool = new TestThreadPool("TransportMasterNodeActionTests");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
@ -162,7 +163,7 @@ public class TransportNodesActionTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
THREAD_POOL = new ThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName());
|
||||
THREAD_POOL = new TestThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -44,6 +44,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
@ -80,7 +81,7 @@ public class BroadcastReplicationTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool("BroadcastReplicationTests");
|
||||
threadPool = new TestThreadPool("BroadcastReplicationTests");
|
||||
circuitBreakerService = new NoneCircuitBreakerService();
|
||||
}
|
||||
|
||||
|
@ -57,6 +57,7 @@ import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
@ -109,7 +110,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool("ShardReplicationTests");
|
||||
threadPool = new TestThreadPool("ShardReplicationTests");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
@ -133,7 +134,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
THREAD_POOL = new ThreadPool(TransportInstanceSingleOperationActionTests.class.getSimpleName());
|
||||
THREAD_POOL = new TestThreadPool(TransportInstanceSingleOperationActionTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
@ -63,7 +64,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
|
||||
|
||||
TestIteration() {
|
||||
ClusterName clusterName = new ClusterName("test");
|
||||
threadPool = new ThreadPool("transport-client-nodes-service-tests");
|
||||
threadPool = new TestThreadPool("transport-client-nodes-service-tests");
|
||||
transport = new FailAndRetryMockTransport<TestResponse>(random(), clusterName) {
|
||||
@Override
|
||||
public List<String> getLocalAddresses() {
|
||||
|
@ -38,6 +38,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.NodeDisconnectedException;
|
||||
import org.elasticsearch.transport.NodeNotConnectedException;
|
||||
@ -97,7 +98,7 @@ public class ShardStateActionTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
THREAD_POOL = new ThreadPool("ShardStateActionTest");
|
||||
THREAD_POOL = new TestThreadPool("ShardStateActionTest");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
@ -72,7 +73,7 @@ public class ClusterStateHealthTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool("ClusterStateHealthTests");
|
||||
threadPool = new TestThreadPool("ClusterStateHealthTests");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -73,7 +74,7 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase {
|
||||
|
||||
@Before
|
||||
public void createDelayedAllocationService() {
|
||||
threadPool = new ThreadPool(getTestName());
|
||||
threadPool = new TestThreadPool(getTestName());
|
||||
clusterService = mock(ClusterService.class);
|
||||
allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
|
||||
delayedAllocationService = new TestDelayAllocationService(Settings.EMPTY, threadPool, clusterService, allocationService);
|
||||
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
@ -76,7 +77,7 @@ public class ClusterServiceTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void createThreadPool() {
|
||||
threadPool = new ThreadPool(ClusterServiceTests.class.getName());
|
||||
threadPool = new TestThreadPool(ClusterServiceTests.class.getName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -22,6 +22,7 @@ import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -244,7 +245,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testTimeoutCleanup() throws Exception {
|
||||
ThreadPool threadPool = new ThreadPool("test");
|
||||
ThreadPool threadPool = new TestThreadPool("test");
|
||||
final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler();
|
||||
final AtomicBoolean timeoutCalled = new AtomicBoolean();
|
||||
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder);
|
||||
|
@ -39,6 +39,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportConnectionListener;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
@ -85,7 +86,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
||||
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), new ByteSizeValue(0))
|
||||
.build();
|
||||
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool = new ThreadPool(getClass().getName());
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
clusterServiceA = createClusterService(threadPool);
|
||||
clusterServiceB = createClusterService(threadPool);
|
||||
circuitBreakerService = new HierarchyCircuitBreakerService(settings, clusterSettings);
|
||||
|
@ -43,6 +43,7 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
@ -81,7 +82,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool("ShardReplicationTests");
|
||||
threadPool = new TestThreadPool("ShardReplicationTests");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -37,6 +37,7 @@ import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportConnectionListener;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
@ -60,7 +61,7 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
int endPort = startPort + 10;
|
||||
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build();
|
||||
|
||||
ThreadPool threadPool = new ThreadPool(getClass().getName());
|
||||
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
ClusterName test = new ClusterName("test");
|
||||
ClusterName mismatch = new ClusterName("mismatch");
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
|
@ -47,6 +47,7 @@ import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BytesTransportRequest;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
@ -217,7 +218,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
threadPool = new ThreadPool(getClass().getName());
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -60,7 +61,7 @@ public class AsyncShardFetchTests extends ESTestCase {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
this.threadPool = new ThreadPool(getTestName());
|
||||
this.threadPool = new TestThreadPool(getTestName());
|
||||
this.test = new TestFetch(threadPool);
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
@ -70,7 +71,7 @@ public class NettyHttpChannelTests extends ESTestCase {
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
networkService = new NetworkService(Settings.EMPTY);
|
||||
threadPool = new ThreadPool("test");
|
||||
threadPool = new TestThreadPool("test");
|
||||
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
}
|
||||
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
@ -73,7 +74,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
networkService = new NetworkService(Settings.EMPTY);
|
||||
threadPool = new ThreadPool("test");
|
||||
threadPool = new TestThreadPool("test");
|
||||
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.http.netty.cors.CorsConfig;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.junit.After;
|
||||
@ -54,7 +55,7 @@ public class NettyHttpServerTransportTests extends ESTestCase {
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
networkService = new NetworkService(Settings.EMPTY);
|
||||
threadPool = new ThreadPool("test");
|
||||
threadPool = new TestThreadPool("test");
|
||||
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
}
|
||||
|
||||
|
@ -76,6 +76,7 @@ import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.test.TestSearchContext;
|
||||
import org.elasticsearch.test.engine.MockEngineFactory;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
@ -110,7 +111,7 @@ public class IndexModuleTests extends ESTestCase {
|
||||
|
||||
static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException {
|
||||
// TODO this can be used in other place too - lets first refactor the IndicesQueriesRegistry
|
||||
ThreadPool threadPool = new ThreadPool("test");
|
||||
ThreadPool threadPool = new TestThreadPool("test");
|
||||
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
||||
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
|
||||
Set<ScriptEngineService> scriptEngines = Collections.emptySet();
|
||||
|
@ -98,6 +98,7 @@ import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.junit.After;
|
||||
@ -173,7 +174,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||
.put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
|
||||
between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY)))
|
||||
.build()); // TODO randomize more settings
|
||||
threadPool = new ThreadPool(getClass().getName());
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
store = createStore();
|
||||
storeReplica = createStore();
|
||||
Lucene.cleanLuceneIndex(store.directory());
|
||||
|
@ -67,6 +67,7 @@ import org.elasticsearch.index.translog.TranslogConfig;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.junit.After;
|
||||
@ -126,7 +127,7 @@ public class ShadowEngineTests extends ESTestCase {
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.build()); // TODO randomize more settings
|
||||
|
||||
threadPool = new ThreadPool(getClass().getName());
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
dirPath = createTempDir();
|
||||
store = createStore(dirPath);
|
||||
storeReplica = createStore(dirPath);
|
||||
|
@ -55,6 +55,7 @@ import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Arrays;
|
||||
@ -177,7 +178,7 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase {
|
||||
}
|
||||
|
||||
private void doTestRequireDocValues(MappedFieldType ft) {
|
||||
ThreadPool threadPool = new ThreadPool("random_threadpool_name");
|
||||
ThreadPool threadPool = new TestThreadPool("random_threadpool_name");
|
||||
try {
|
||||
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null);
|
||||
IndexFieldDataService ifds = new IndexFieldDataService(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), cache, null, null);
|
||||
|
@ -35,6 +35,7 @@ import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
@ -64,7 +65,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void createThreadPool() {
|
||||
THREAD_POOL = new ThreadPool("DynamicMappingDisabledTests");
|
||||
THREAD_POOL = new TestThreadPool("DynamicMappingDisabledTests");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
@ -74,7 +75,7 @@ public class IndicesStoreTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool("ShardReplicationTests");
|
||||
threadPool = new TestThreadPool("ShardReplicationTests");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.monitor.jvm;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
@ -128,7 +129,7 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
|
||||
assert constructionShouldFail == (asserts == null);
|
||||
ThreadPool threadPool = null;
|
||||
try {
|
||||
threadPool = new ThreadPool(JvmGcMonitorServiceSettingsTests.class.getCanonicalName()) {
|
||||
threadPool = new TestThreadPool(JvmGcMonitorServiceSettingsTests.class.getCanonicalName()) {
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
|
||||
return scheduler.apply(command, interval);
|
||||
|
@ -55,9 +55,10 @@ public class NativeScriptTests extends ESTestCase {
|
||||
ScriptModule scriptModule = new ScriptModule();
|
||||
scriptModule.prepareSettings(settingsModule);
|
||||
scriptModule.registerScript("my", MyNativeScriptFactory.class);
|
||||
final ThreadPool threadPool = new ThreadPool(settings);
|
||||
Injector injector = new ModulesBuilder().add(
|
||||
new EnvironmentModule(new Environment(settings)),
|
||||
new ThreadPoolModule(new ThreadPool(settings)),
|
||||
new ThreadPoolModule(threadPool),
|
||||
new SettingsModule(settings),
|
||||
scriptModule).createInjector();
|
||||
|
||||
|
@ -36,8 +36,8 @@ public class SearchWithRejectionsIT extends ESIntegTestCase {
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put("threadpool.search.size", 1)
|
||||
.put("threadpool.search.queue_size", 1)
|
||||
.put("thread_pool.search.size", 1)
|
||||
.put("thread_pool.search.queue_size", 1)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,6 @@
|
||||
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
@ -42,8 +41,8 @@ public class FixedThreadPoolTests extends ESThreadPoolTestCase {
|
||||
final Settings nodeSettings =
|
||||
Settings.builder()
|
||||
.put("node.name", "testRejectedExecutionCounter")
|
||||
.put("threadpool." + threadPoolName + ".size", size)
|
||||
.put("threadpool." + threadPoolName + ".queue_size", queueSize)
|
||||
.put("thread_pool." + threadPoolName + ".size", size)
|
||||
.put("thread_pool." + threadPoolName + ".queue_size", queueSize)
|
||||
.build();
|
||||
try {
|
||||
threadPool = new ThreadPool(nodeSettings);
|
||||
@ -86,18 +85,6 @@ public class FixedThreadPoolTests extends ESThreadPoolTestCase {
|
||||
|
||||
assertThat(counter, equalTo(rejections));
|
||||
assertThat(stats(threadPool, threadPoolName).getRejected(), equalTo(rejections));
|
||||
|
||||
// the rejected execution count resets to zero when the
|
||||
// queue is resized
|
||||
final ClusterSettings clusterSettings =
|
||||
new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool.setClusterSettings(clusterSettings);
|
||||
clusterSettings.applySettings(
|
||||
Settings.builder()
|
||||
.put("threadpool." + threadPoolName + ".queue_size", queueSize + 1)
|
||||
.build());
|
||||
assertThat(stats(threadPool, threadPoolName).getRejected(), equalTo(0L));
|
||||
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
}
|
||||
|
@ -40,35 +40,35 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
||||
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
|
||||
final Settings.Builder builder = Settings.builder();
|
||||
|
||||
final int min;
|
||||
final int core;
|
||||
if (randomBoolean()) {
|
||||
min = randomIntBetween(0, 8);
|
||||
builder.put("threadpool." + threadPoolName + ".min", min);
|
||||
core = randomIntBetween(0, 8);
|
||||
builder.put("thread_pool." + threadPoolName + ".core", core);
|
||||
} else {
|
||||
min = "generic".equals(threadPoolName) ? 4 : 1; // the defaults
|
||||
core = "generic".equals(threadPoolName) ? 4 : 1; // the defaults
|
||||
}
|
||||
|
||||
final int sizeBasedOnNumberOfProcessors;
|
||||
final int maxBasedOnNumberOfProcessors;
|
||||
if (randomBoolean()) {
|
||||
final int processors = randomIntBetween(1, 64);
|
||||
sizeBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors);
|
||||
maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors);
|
||||
builder.put("processors", processors);
|
||||
} else {
|
||||
sizeBasedOnNumberOfProcessors = expectedSize(threadPoolName, Math.min(32, Runtime.getRuntime().availableProcessors()));
|
||||
maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, Math.min(32, Runtime.getRuntime().availableProcessors()));
|
||||
}
|
||||
|
||||
final int expectedSize;
|
||||
if (sizeBasedOnNumberOfProcessors < min || randomBoolean()) {
|
||||
expectedSize = randomIntBetween(Math.max(1, min), 16);
|
||||
builder.put("threadpool." + threadPoolName + ".size", expectedSize);
|
||||
final int expectedMax;
|
||||
if (maxBasedOnNumberOfProcessors < core || randomBoolean()) {
|
||||
expectedMax = randomIntBetween(Math.max(1, core), 16);
|
||||
builder.put("thread_pool." + threadPoolName + ".max", expectedMax);
|
||||
} else {
|
||||
expectedSize = sizeBasedOnNumberOfProcessors;
|
||||
expectedMax = maxBasedOnNumberOfProcessors;
|
||||
}
|
||||
|
||||
final long keepAlive;
|
||||
if (randomBoolean()) {
|
||||
keepAlive = randomIntBetween(1, 300);
|
||||
builder.put("threadpool." + threadPoolName + ".keep_alive", keepAlive + "s");
|
||||
builder.put("thread_pool." + threadPoolName + ".keep_alive", keepAlive + "s");
|
||||
} else {
|
||||
keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults
|
||||
}
|
||||
@ -88,10 +88,10 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
||||
assertNull(info.getQueueSize());
|
||||
assertThat(esThreadPoolExecutor.getQueue().remainingCapacity(), equalTo(Integer.MAX_VALUE));
|
||||
|
||||
assertThat(info.getMin(), equalTo(min));
|
||||
assertThat(esThreadPoolExecutor.getCorePoolSize(), equalTo(min));
|
||||
assertThat(info.getMax(), equalTo(expectedSize));
|
||||
assertThat(esThreadPoolExecutor.getMaximumPoolSize(), equalTo(expectedSize));
|
||||
assertThat(info.getMin(), equalTo(core));
|
||||
assertThat(esThreadPoolExecutor.getCorePoolSize(), equalTo(core));
|
||||
assertThat(info.getMax(), equalTo(expectedMax));
|
||||
assertThat(esThreadPoolExecutor.getMaximumPoolSize(), equalTo(expectedMax));
|
||||
});
|
||||
}
|
||||
|
||||
@ -113,23 +113,10 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
||||
return sizes.get(threadPoolName).size(numberOfProcessors);
|
||||
}
|
||||
|
||||
public void testValidDynamicKeepAlive() throws InterruptedException {
|
||||
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
|
||||
runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> {
|
||||
final Executor beforeExecutor = threadPool.executor(threadPoolName);
|
||||
final long seconds = randomIntBetween(1, 300);
|
||||
clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".keep_alive", seconds + "s"));
|
||||
final Executor afterExecutor = threadPool.executor(threadPoolName);
|
||||
assertSame(beforeExecutor, afterExecutor);
|
||||
final ThreadPool.Info info = info(threadPool, threadPoolName);
|
||||
assertThat(info.getKeepAlive().seconds(), equalTo(seconds));
|
||||
});
|
||||
}
|
||||
|
||||
public void testScalingThreadPoolIsBounded() throws InterruptedException {
|
||||
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
|
||||
final int size = randomIntBetween(32, 512);
|
||||
final Settings settings = Settings.builder().put("threadpool." + threadPoolName + ".size", size).build();
|
||||
final Settings settings = Settings.builder().put("thread_pool." + threadPoolName + ".max", size).build();
|
||||
runScalingThreadPoolTest(settings, (clusterSettings, threadPool) -> {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final int numberOfTasks = 2 * size;
|
||||
@ -161,8 +148,8 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
||||
final int min = "generic".equals(threadPoolName) ? 4 : 1;
|
||||
final Settings settings =
|
||||
Settings.builder()
|
||||
.put("threadpool." + threadPoolName + ".size", 128)
|
||||
.put("threadpool." + threadPoolName + ".keep_alive", "1ms")
|
||||
.put("thread_pool." + threadPoolName + ".max", 128)
|
||||
.put("thread_pool." + threadPoolName + ".keep_alive", "1ms")
|
||||
.build();
|
||||
runScalingThreadPoolTest(settings, ((clusterSettings, threadPool) -> {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
@ -197,40 +184,6 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
||||
}));
|
||||
}
|
||||
|
||||
public void testDynamicThreadPoolSize() throws InterruptedException {
|
||||
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
|
||||
runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> {
|
||||
final Executor beforeExecutor = threadPool.executor(threadPoolName);
|
||||
int expectedMin = "generic".equals(threadPoolName) ? 4 : 1;
|
||||
final int size = randomIntBetween(expectedMin, Integer.MAX_VALUE);
|
||||
clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".size", size));
|
||||
final Executor afterExecutor = threadPool.executor(threadPoolName);
|
||||
assertSame(beforeExecutor, afterExecutor);
|
||||
final ThreadPool.Info info = info(threadPool, threadPoolName);
|
||||
assertThat(info.getMin(), equalTo(expectedMin));
|
||||
assertThat(info.getMax(), equalTo(size));
|
||||
|
||||
assertThat(afterExecutor, instanceOf(EsThreadPoolExecutor.class));
|
||||
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor)afterExecutor;
|
||||
assertThat(executor.getCorePoolSize(), equalTo(expectedMin));
|
||||
assertThat(executor.getMaximumPoolSize(), equalTo(size));
|
||||
});
|
||||
}
|
||||
|
||||
public void testResizingScalingThreadPoolQueue() throws InterruptedException {
|
||||
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
|
||||
runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> {
|
||||
final int size = randomIntBetween(1, Integer.MAX_VALUE);
|
||||
final IllegalArgumentException e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".queue_size", size)));
|
||||
assertThat(e, hasToString(
|
||||
"java.lang.IllegalArgumentException: thread pool [" + threadPoolName +
|
||||
"] of type scaling can not have its queue re-sized but was [" +
|
||||
size + "]"));
|
||||
});
|
||||
}
|
||||
|
||||
public void runScalingThreadPoolTest(
|
||||
final Settings settings,
|
||||
final BiConsumer<ClusterSettings, ThreadPool> consumer) throws InterruptedException {
|
||||
@ -240,7 +193,6 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
||||
final Settings nodeSettings = Settings.builder().put(settings).put("node.name", test).build();
|
||||
threadPool = new ThreadPool(nodeSettings);
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool.setClusterSettings(clusterSettings);
|
||||
consumer.accept(clusterSettings, threadPool);
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
|
@ -125,63 +125,6 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpdatingThreadPoolSettings() throws Exception {
|
||||
internalCluster().startNodesAsync(2).get();
|
||||
ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class);
|
||||
// Check that settings are changed
|
||||
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(1000));
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("threadpool.search.queue_size", 2000).build()).execute().actionGet();
|
||||
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(2000));
|
||||
|
||||
// Make sure that threads continue executing when executor is replaced
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
Executor oldExecutor = threadPool.executor(Names.SEARCH);
|
||||
threadPool.executor(Names.SEARCH).execute(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (BrokenBarrierException ex) {
|
||||
//
|
||||
}
|
||||
});
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("threadpool.search.queue_size", 1000).build()).execute().actionGet();
|
||||
assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor)));
|
||||
assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true));
|
||||
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true));
|
||||
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false));
|
||||
barrier.await(10, TimeUnit.SECONDS);
|
||||
|
||||
// Make sure that new thread executor is functional
|
||||
threadPool.executor(Names.SEARCH).execute(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (BrokenBarrierException ex) {
|
||||
//
|
||||
}
|
||||
}
|
||||
);
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("threadpool.search.queue_size", 500)).execute().actionGet();
|
||||
barrier.await(10, TimeUnit.SECONDS);
|
||||
|
||||
// Check that node info is correct
|
||||
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet();
|
||||
assertEquals(2, nodesInfoResponse.getNodes().size());
|
||||
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
|
||||
boolean found = false;
|
||||
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
|
||||
if (info.getName().equals(Names.SEARCH)) {
|
||||
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertThat(found, equalTo(true));
|
||||
}
|
||||
}
|
||||
|
||||
public void testThreadPoolLeakingThreadsWithTribeNode() {
|
||||
Settings settings = Settings.builder()
|
||||
.put("node.name", "thread_pool_leaking_threads_tribe_node")
|
||||
|
@ -96,7 +96,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
|
||||
Settings settings = Settings.builder().put("node.name", "index").put("threadpool.index.queue_size", "-1").build();
|
||||
Settings settings = Settings.builder().put("node.name", "index").put("thread_pool.index.queue_size", "-1").build();
|
||||
ThreadPool threadPool = new ThreadPool(settings);
|
||||
assertThat(threadPool.info("index").getQueueSize(), is(nullValue()));
|
||||
terminate(threadPool);
|
||||
|
@ -19,32 +19,24 @@
|
||||
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
|
||||
public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException {
|
||||
@ -53,12 +45,12 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
ThreadPool threadPool = null;
|
||||
try {
|
||||
threadPool = new ThreadPool(Settings.builder()
|
||||
.put("node.name", "testCorrectThreadPoolTypePermittedInSettings")
|
||||
.put("threadpool." + threadPoolName + ".type", correctThreadPoolType.getType())
|
||||
.build());
|
||||
.put("node.name", "testCorrectThreadPoolTypePermittedInSettings")
|
||||
.put("thread_pool." + threadPoolName + ".type", correctThreadPoolType.getType())
|
||||
.build());
|
||||
ThreadPool.Info info = info(threadPool, threadPoolName);
|
||||
if (ThreadPool.Names.SAME.equals(threadPoolName)) {
|
||||
assertNull(info); // we don't report on the "same" threadpool
|
||||
assertNull(info); // we don't report on the "same" thread pool
|
||||
} else {
|
||||
// otherwise check we have the expected type
|
||||
assertEquals(info.getThreadPoolType(), correctThreadPoolType);
|
||||
@ -68,97 +60,31 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedException {
|
||||
String threadPoolName = randomThreadPoolName();
|
||||
ThreadPool.ThreadPoolType incorrectThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);
|
||||
ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
|
||||
ThreadPool threadPool = null;
|
||||
try {
|
||||
threadPool = new ThreadPool(
|
||||
Settings.builder()
|
||||
.put("node.name", "testThreadPoolCanNotOverrideThreadPoolType")
|
||||
.put("threadpool." + threadPoolName + ".type", incorrectThreadPoolType.getType())
|
||||
.build());
|
||||
terminate(threadPool);
|
||||
fail("expected IllegalArgumentException");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(
|
||||
e.getMessage(),
|
||||
is("setting threadpool." + threadPoolName + ".type to " + incorrectThreadPoolType.getType() + " is not permitted; must be " + correctThreadPoolType.getType()));
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
}
|
||||
}
|
||||
|
||||
public void testIndexingThreadPoolsMaxSize() throws InterruptedException {
|
||||
String threadPoolName = randomThreadPoolName();
|
||||
for (String name : new String[] {ThreadPool.Names.BULK, ThreadPool.Names.INDEX}) {
|
||||
ThreadPool threadPool = null;
|
||||
try {
|
||||
final String name = randomFrom(Names.BULK, Names.INDEX);
|
||||
final int maxSize = 1 + EsExecutors.boundedNumberOfProcessors(Settings.EMPTY);
|
||||
final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE);
|
||||
|
||||
int maxSize = EsExecutors.boundedNumberOfProcessors(Settings.EMPTY);
|
||||
// try to create a too big thread pool
|
||||
final IllegalArgumentException initial =
|
||||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> {
|
||||
ThreadPool tp = null;
|
||||
try {
|
||||
tp = new ThreadPool(Settings.builder()
|
||||
.put("node.name", "testIndexingThreadPoolsMaxSize")
|
||||
.put("thread_pool." + name + ".size", tooBig)
|
||||
.build());
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(tp);
|
||||
}
|
||||
});
|
||||
|
||||
// try to create a too-big (maxSize+1) thread pool
|
||||
threadPool = new ThreadPool(Settings.builder()
|
||||
.put("node.name", "testIndexingThreadPoolsMaxSize")
|
||||
.put("threadpool." + name + ".size", maxSize+1)
|
||||
.build());
|
||||
|
||||
// confirm it clipped us at the maxSize:
|
||||
assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
|
||||
|
||||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool.setClusterSettings(clusterSettings);
|
||||
|
||||
// update it to a tiny size:
|
||||
clusterSettings.applySettings(
|
||||
Settings.builder()
|
||||
.put("threadpool." + name + ".size", 1)
|
||||
.build()
|
||||
);
|
||||
|
||||
// confirm it worked:
|
||||
assertEquals(1, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
|
||||
|
||||
// try to update to too-big size:
|
||||
clusterSettings.applySettings(
|
||||
Settings.builder()
|
||||
.put("threadpool." + name + ".size", maxSize+1)
|
||||
.build()
|
||||
);
|
||||
|
||||
// confirm it clipped us at the maxSize:
|
||||
assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException {
|
||||
String threadPoolName = randomThreadPoolName();
|
||||
ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);
|
||||
ThreadPool.ThreadPoolType validThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
|
||||
ThreadPool threadPool = null;
|
||||
try {
|
||||
threadPool = new ThreadPool(Settings.builder().put("node.name", "testUpdateSettingsCanNotChangeThreadPoolType").build());
|
||||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool.setClusterSettings(clusterSettings);
|
||||
|
||||
clusterSettings.applySettings(
|
||||
Settings.builder()
|
||||
.put("threadpool." + threadPoolName + ".type", invalidThreadPoolType.getType())
|
||||
.build()
|
||||
);
|
||||
fail("expected IllegalArgumentException");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("illegal value can't update [threadpool.] from [{}] to [{" + threadPoolName + ".type=" + invalidThreadPoolType.getType() + "}]", e.getMessage());
|
||||
assertThat(
|
||||
e.getCause().getMessage(),
|
||||
is("setting threadpool." + threadPoolName + ".type to " + invalidThreadPoolType.getType() + " is not permitted; must be " + validThreadPoolType.getType()));
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
}
|
||||
assertThat(
|
||||
initial,
|
||||
hasToString(containsString(
|
||||
"Failed to parse value [" + tooBig + "] for setting [thread_pool." + name + ".size] must be ")));
|
||||
}
|
||||
|
||||
private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
|
||||
@ -174,17 +100,14 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
ThreadPool threadPool = null;
|
||||
|
||||
try {
|
||||
int expectedSize = getExpectedThreadPoolSize(Settings.EMPTY, threadPoolName, 15);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put("node.name", "testFixedExecutorType").build();
|
||||
.put("node.name", "testFixedExecutorType")
|
||||
.put("thread_pool." + threadPoolName + ".size", expectedSize)
|
||||
.build();
|
||||
threadPool = new ThreadPool(nodeSettings);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool.setClusterSettings(clusterSettings);
|
||||
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
|
||||
Settings settings = clusterSettings.applySettings(Settings.builder()
|
||||
.put("threadpool." + threadPoolName + ".size", "15")
|
||||
.build());
|
||||
|
||||
int expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 15);
|
||||
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
|
||||
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
|
||||
@ -193,37 +116,6 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
|
||||
// keep alive does not apply to fixed thread pools
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L));
|
||||
|
||||
// Put old type back
|
||||
settings = clusterSettings.applySettings(Settings.EMPTY);
|
||||
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
|
||||
// Make sure keep alive value is not used
|
||||
assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue());
|
||||
// Make sure keep pool size value were reused
|
||||
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
|
||||
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
|
||||
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
|
||||
|
||||
// Change size
|
||||
Executor oldExecutor = threadPool.executor(threadPoolName);
|
||||
settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".size", "10").build());
|
||||
|
||||
expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 10);
|
||||
|
||||
// Make sure size values changed
|
||||
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
|
||||
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
|
||||
// Make sure executor didn't change
|
||||
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
|
||||
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
|
||||
|
||||
// Change queue capacity
|
||||
clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".queue", "500")
|
||||
.build());
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
}
|
||||
@ -234,11 +126,10 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
ThreadPool threadPool = null;
|
||||
try {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put("threadpool." + threadPoolName + ".size", 10)
|
||||
.put("node.name", "testScalingExecutorType").build();
|
||||
.put("thread_pool." + threadPoolName + ".max", 10)
|
||||
.put("node.name", "testScalingExecutorType")
|
||||
.build();
|
||||
threadPool = new ThreadPool(nodeSettings);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool.setClusterSettings(clusterSettings);
|
||||
final int expectedMinimum = "generic".equals(threadPoolName) ? 4 : 1;
|
||||
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedMinimum));
|
||||
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
|
||||
@ -246,24 +137,6 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
assertThat(info(threadPool, threadPoolName).getKeepAlive().seconds(), equalTo(expectedKeepAlive));
|
||||
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
|
||||
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
|
||||
|
||||
// Change settings that doesn't require pool replacement
|
||||
Executor oldExecutor = threadPool.executor(threadPoolName);
|
||||
clusterSettings.applySettings(Settings.builder()
|
||||
.put("threadpool." + threadPoolName + ".keep_alive", "10m")
|
||||
.put("threadpool." + threadPoolName + ".min", "2")
|
||||
.put("threadpool." + threadPoolName + ".size", "15")
|
||||
.build());
|
||||
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
|
||||
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(2));
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
|
||||
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(2));
|
||||
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
|
||||
// Make sure keep alive value changed
|
||||
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
|
||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
|
||||
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
}
|
||||
@ -274,17 +147,18 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
ThreadPool threadPool = null;
|
||||
try {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put("threadpool." + threadPoolName + ".queue_size", 1000)
|
||||
.put("node.name", "testShutdownNowInterrupts").build();
|
||||
.put("thread_pool." + threadPoolName + ".queue_size", 1000)
|
||||
.put("node.name", "testShutdownNowInterrupts")
|
||||
.build();
|
||||
threadPool = new ThreadPool(nodeSettings);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool.setClusterSettings(clusterSettings);
|
||||
assertEquals(info(threadPool, threadPoolName).getQueueSize().getSingles(), 1000L);
|
||||
|
||||
final CountDownLatch shutDownLatch = new CountDownLatch(1);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(threadPoolName);
|
||||
threadPool.executor(threadPoolName).execute(() -> {
|
||||
try {
|
||||
shutDownLatch.countDown();
|
||||
new CountDownLatch(1).await();
|
||||
} catch (InterruptedException ex) {
|
||||
latch.countDown();
|
||||
@ -292,13 +166,11 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
}
|
||||
}
|
||||
);
|
||||
clusterSettings.applySettings(Settings.builder().put("threadpool." + threadPoolName + ".queue_size", 2000).build());
|
||||
assertThat(threadPool.executor(threadPoolName), not(sameInstance(oldExecutor)));
|
||||
shutDownLatch.await();
|
||||
threadPool.shutdownNow();
|
||||
latch.await(3, TimeUnit.SECONDS); // if this throws then ThreadPool#shutdownNow did not interrupt
|
||||
assertThat(oldExecutor.isShutdown(), equalTo(true));
|
||||
assertThat(oldExecutor.isTerminating(), equalTo(true));
|
||||
assertThat(oldExecutor.isTerminated(), equalTo(false));
|
||||
threadPool.shutdownNow(); // should interrupt the thread
|
||||
latch.await(3, TimeUnit.SECONDS); // If this throws then ThreadPool#shutdownNow didn't interrupt
|
||||
assertThat(oldExecutor.isTerminating() || oldExecutor.isTerminated(), equalTo(true));
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
}
|
||||
@ -307,18 +179,19 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
public void testCustomThreadPool() throws Exception {
|
||||
ThreadPool threadPool = null;
|
||||
try {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put("threadpool.my_pool1.type", "scaling")
|
||||
.put("threadpool.my_pool1.min", 1)
|
||||
.put("threadpool.my_pool1.size", EsExecutors.boundedNumberOfProcessors(Settings.EMPTY))
|
||||
.put("threadpool.my_pool1.keep_alive", "1m")
|
||||
.put("threadpool.my_pool2.type", "fixed")
|
||||
.put("threadpool.my_pool2.size", "1")
|
||||
.put("threadpool.my_pool2.queue_size", "1")
|
||||
.put("node.name", "testCustomThreadPool").build();
|
||||
threadPool = new ThreadPool(nodeSettings);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool.setClusterSettings(clusterSettings);
|
||||
|
||||
|
||||
final ScalingExecutorBuilder scaling =
|
||||
new ScalingExecutorBuilder(
|
||||
"my_pool1",
|
||||
1,
|
||||
EsExecutors.boundedNumberOfProcessors(Settings.EMPTY),
|
||||
TimeValue.timeValueMinutes(1));
|
||||
|
||||
final FixedExecutorBuilder fixed = new FixedExecutorBuilder(Settings.EMPTY, "my_pool2", 1, 1);
|
||||
|
||||
threadPool = new ThreadPool(Settings.builder().put("node.name", "testCustomThreadPool").build(), scaling, fixed);
|
||||
|
||||
ThreadPoolInfo groups = threadPool.info();
|
||||
boolean foundPool1 = false;
|
||||
boolean foundPool2 = false;
|
||||
@ -345,39 +218,6 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
}
|
||||
assertThat(foundPool1, is(true));
|
||||
assertThat(foundPool2, is(true));
|
||||
|
||||
// Updating my_pool2
|
||||
Settings settings = Settings.builder()
|
||||
.put("threadpool.my_pool2.size", "10")
|
||||
.build();
|
||||
clusterSettings.applySettings(settings);
|
||||
|
||||
groups = threadPool.info();
|
||||
foundPool1 = false;
|
||||
foundPool2 = false;
|
||||
outer:
|
||||
for (ThreadPool.Info info : groups) {
|
||||
if ("my_pool1".equals(info.getName())) {
|
||||
foundPool1 = true;
|
||||
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
|
||||
} else if ("my_pool2".equals(info.getName())) {
|
||||
foundPool2 = true;
|
||||
assertThat(info.getMax(), equalTo(10));
|
||||
assertThat(info.getMin(), equalTo(10));
|
||||
assertThat(info.getQueueSize().singles(), equalTo(1L));
|
||||
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
|
||||
} else {
|
||||
for (Field field : Names.class.getFields()) {
|
||||
if (info.getName().equalsIgnoreCase(field.getName())) {
|
||||
// This is ok it is a default thread pool
|
||||
continue outer;
|
||||
}
|
||||
}
|
||||
fail("Unexpected pool name: " + info.getName());
|
||||
}
|
||||
}
|
||||
assertThat(foundPool1, is(true));
|
||||
assertThat(foundPool2, is(true));
|
||||
} finally {
|
||||
terminateThreadPoolIfNeeded(threadPool);
|
||||
}
|
||||
@ -388,11 +228,4 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||
return randomFrom(threadPoolNames.toArray(new String[threadPoolNames.size()]));
|
||||
}
|
||||
|
||||
private ThreadPool.ThreadPoolType randomIncorrectThreadPoolType(String threadPoolName) {
|
||||
Set<ThreadPool.ThreadPoolType> set = new HashSet<>();
|
||||
set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values()));
|
||||
set.remove(ThreadPool.THREAD_POOL_TYPES.get(threadPoolName));
|
||||
return randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()]));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -71,7 +72,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
threadPool = new ThreadPool(getClass().getName());
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
serviceA = build(
|
||||
Settings.builder()
|
||||
.put("name", "TS_A")
|
||||
|
@ -64,7 +64,6 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
|
||||
@Before
|
||||
public void startThreadPool() {
|
||||
threadPool = new ThreadPool(settings);
|
||||
threadPool.setClusterSettings(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(),
|
||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.junit.After;
|
||||
@ -51,7 +52,7 @@ public class NettyTransportServiceHandshakeTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
threadPool = new ThreadPool(NettyTransportServiceHandshakeTests.class.getSimpleName());
|
||||
threadPool = new TestThreadPool(NettyTransportServiceHandshakeTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
private List<TransportService> transportServices = new ArrayList<>();
|
||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
@ -52,7 +53,7 @@ import static org.hamcrest.Matchers.greaterThan;
|
||||
*/
|
||||
public class NettyScheduledPingTests extends ESTestCase {
|
||||
public void testScheduledPing() throws Exception {
|
||||
ThreadPool threadPool = new ThreadPool(getClass().getName());
|
||||
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put(NettyTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
@ -56,7 +57,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||
.put("transport.profiles.client1.port", 0)
|
||||
.build();
|
||||
|
||||
ThreadPool threadPool = new ThreadPool("tst");
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
assertEquals(1, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
@ -72,7 +73,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||
.put("transport.profiles.client1.port", 0)
|
||||
.build();
|
||||
|
||||
ThreadPool threadPool = new ThreadPool("tst");
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
assertEquals(1, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
@ -89,7 +90,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||
.put("transport.profiles.client1.whatever", "foo")
|
||||
.build();
|
||||
|
||||
ThreadPool threadPool = new ThreadPool("tst");
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
@ -105,7 +106,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||
.put("transport.profiles.default.port", 0)
|
||||
.build();
|
||||
|
||||
ThreadPool threadPool = new ThreadPool("tst");
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
@ -123,7 +124,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||
.put("transport.profiles..port", 23) // will not actually bind to this
|
||||
.build();
|
||||
|
||||
ThreadPool threadPool = new ThreadPool("tst");
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.watcher;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
@ -33,7 +34,7 @@ import static org.hamcrest.Matchers.notNullValue;
|
||||
*/
|
||||
public class ResourceWatcherServiceTests extends ESTestCase {
|
||||
public void testSettings() throws Exception {
|
||||
ThreadPool threadPool = new ThreadPool("test");
|
||||
ThreadPool threadPool = new TestThreadPool("test");
|
||||
|
||||
// checking the defaults
|
||||
Settings settings = Settings.builder().build();
|
||||
@ -65,7 +66,7 @@ public class ResourceWatcherServiceTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testHandle() throws Exception {
|
||||
ThreadPool threadPool = new ThreadPool("test");
|
||||
ThreadPool threadPool = new TestThreadPool("test");
|
||||
Settings settings = Settings.builder().build();
|
||||
ResourceWatcherService service = new ResourceWatcherService(settings, threadPool);
|
||||
ResourceWatcher watcher = new ResourceWatcher() {
|
||||
|
@ -79,6 +79,19 @@ on the thread pool type, `keep_alive`, `queue_size`, etc.).
|
||||
The `suggest` threadpool has been removed, now suggest requests use the
|
||||
`search` threadpool.
|
||||
|
||||
The prefix on all thread pool settings has been changed from
|
||||
`threadpool` to `thread_pool`.
|
||||
|
||||
The minimum size setting for a scaling thread pool has been changed
|
||||
from `min` to `core`.
|
||||
|
||||
The maximum size setting for a scaling thread pool has been changed
|
||||
from `size` to `max`.
|
||||
|
||||
The queue size setting for a fixed thread pool must be `queue_size`
|
||||
(all other variants that were previously supported are no longer
|
||||
supported).
|
||||
|
||||
==== Analysis settings
|
||||
|
||||
The `index.analysis.analyzer.default_index` analyzer is not supported anymore.
|
||||
|
@ -17,7 +17,7 @@ There are several thread pools, but the important ones include:
|
||||
For index/delete operations. Thread pool type is `fixed`
|
||||
with a size of `# of available processors`,
|
||||
queue_size of `200`. The maximum size for this pool
|
||||
is `# of available processors`.
|
||||
is `1 + # of available processors`.
|
||||
|
||||
`search`::
|
||||
For count/search/suggest operations. Thread pool type is `fixed`
|
||||
@ -33,7 +33,7 @@ There are several thread pools, but the important ones include:
|
||||
For bulk operations. Thread pool type is `fixed`
|
||||
with a size of `# of available processors`,
|
||||
queue_size of `50`. The maximum size for this pool
|
||||
is `# of available processors`.
|
||||
is `1 + # of available processors`.
|
||||
|
||||
`percolate`::
|
||||
For percolate operations. Thread pool type is `fixed`
|
||||
@ -42,26 +42,26 @@ There are several thread pools, but the important ones include:
|
||||
|
||||
`snapshot`::
|
||||
For snapshot/restore operations. Thread pool type is `scaling` with a
|
||||
keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`.
|
||||
keep-alive of `5m` and a max of `min(5, (# of available processors)/2)`.
|
||||
|
||||
`warmer`::
|
||||
For segment warm-up operations. Thread pool type is `scaling` with a
|
||||
keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`.
|
||||
keep-alive of `5m` and a max of `min(5, (# of available processors)/2)`.
|
||||
|
||||
`refresh`::
|
||||
For refresh operations. Thread pool type is `scaling` with a
|
||||
keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`.
|
||||
keep-alive of `5m` and a max of `min(10, (# of available processors)/2)`.
|
||||
|
||||
`listener`::
|
||||
Mainly for java client executing of action when listener threaded is set to true.
|
||||
Thread pool type is `scaling` with a default size of `min(10, (# of available processors)/2)`.
|
||||
Thread pool type is `scaling` with a default max of `min(10, (# of available processors)/2)`.
|
||||
|
||||
Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `index`
|
||||
thread pool to have more threads:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
threadpool:
|
||||
thread_pool:
|
||||
index:
|
||||
size: 30
|
||||
--------------------------------------------------
|
||||
@ -91,7 +91,7 @@ full, it will abort the request.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
threadpool:
|
||||
thread_pool:
|
||||
index:
|
||||
size: 30
|
||||
queue_size: 1000
|
||||
@ -102,17 +102,17 @@ threadpool:
|
||||
|
||||
The `scaling` thread pool holds a dynamic number of threads. This
|
||||
number is proportional to the workload and varies between the value of
|
||||
the `min` and `size` parameters.
|
||||
the `core` and `max` parameters.
|
||||
|
||||
The `keep_alive` parameter determines how long a thread should be kept
|
||||
around in the thread pool without it doing any work.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
threadpool:
|
||||
thread_pool:
|
||||
warmer:
|
||||
min: 1
|
||||
size: 8
|
||||
core: 1
|
||||
max: 8
|
||||
keep_alive: 2m
|
||||
--------------------------------------------------
|
||||
|
||||
|
@ -108,10 +108,11 @@ public class TemplateQueryParserTests extends ESTestCase {
|
||||
// TODO: make this use a mock engine instead of mustache and it will no longer be messy!
|
||||
scriptModule.addScriptEngine(new ScriptEngineRegistry.ScriptEngineRegistration(MustacheScriptEngineService.class, MustacheScriptEngineService.NAME, true));
|
||||
settingsModule.registerSetting(InternalSettingsPlugin.VERSION_CREATED);
|
||||
final ThreadPool threadPool = new ThreadPool(settings);
|
||||
injector = new ModulesBuilder().add(
|
||||
new EnvironmentModule(new Environment(settings)),
|
||||
settingsModule,
|
||||
new ThreadPoolModule(new ThreadPool(settings)),
|
||||
new ThreadPoolModule(threadPool),
|
||||
new SearchModule(settings, new NamedWriteableRegistry()) {
|
||||
@Override
|
||||
protected void configureSearch() {
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.reindex;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -35,7 +36,7 @@ public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<
|
||||
|
||||
@Before
|
||||
public void setupForTest() {
|
||||
threadPool = new ThreadPool(getTestName());
|
||||
threadPool = new TestThreadPool(getTestName());
|
||||
task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID, 0);
|
||||
}
|
||||
|
||||
|
@ -72,6 +72,7 @@ import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.client.NoOpClient;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -126,7 +127,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
@Before
|
||||
public void setupForTest() {
|
||||
client = new MyMockClient(new NoOpClient(getTestName()));
|
||||
threadPool = new ThreadPool(getTestName());
|
||||
threadPool = new TestThreadPool(getTestName());
|
||||
firstSearchRequest = new SearchRequest();
|
||||
testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
|
||||
listener = new PlainActionFuture<>();
|
||||
@ -311,7 +312,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
public void testThreadPoolRejectionsAbortRequest() throws Exception {
|
||||
TimeValue expectedDelay = parseTimeValue(randomPositiveTimeValue(), "test");
|
||||
threadPool.shutdown();
|
||||
threadPool = new ThreadPool(getTestName()) {
|
||||
threadPool = new TestThreadPool(getTestName()) {
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
|
||||
assertEquals(expectedDelay, delay); // While we're here we can check that the sleep made it through
|
||||
@ -444,7 +445,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
AtomicReference<TimeValue> capturedDelay = new AtomicReference<>();
|
||||
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
|
||||
threadPool.shutdown();
|
||||
threadPool = new ThreadPool(getTestName()) {
|
||||
threadPool = new TestThreadPool(getTestName()) {
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
|
||||
capturedDelay.set(delay);
|
||||
@ -612,7 +613,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
* is a delay.
|
||||
*/
|
||||
threadPool.shutdown();
|
||||
threadPool = new ThreadPool(getTestName()) {
|
||||
threadPool = new TestThreadPool(getTestName()) {
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
|
||||
/*
|
||||
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Before;
|
||||
|
||||
@ -168,7 +169,7 @@ public class BulkByScrollTaskTests extends ESTestCase {
|
||||
task.rethrottle(originalRequestsPerSecond);
|
||||
TimeValue maxDelay = timeValueSeconds(between(1, 5));
|
||||
assertThat(maxDelay.nanos(), greaterThanOrEqualTo(0L));
|
||||
ThreadPool threadPool = new ThreadPool(getTestName()) {
|
||||
ThreadPool threadPool = new TestThreadPool(getTestName()) {
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
|
||||
assertThat(delay.nanos(), both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(maxDelay.nanos())));
|
||||
@ -220,7 +221,7 @@ public class BulkByScrollTaskTests extends ESTestCase {
|
||||
|
||||
public void testDelayNeverNegative() throws IOException {
|
||||
// Thread pool that returns a ScheduledFuture that claims to have a negative delay
|
||||
ThreadPool threadPool = new ThreadPool("test") {
|
||||
ThreadPool threadPool = new TestThreadPool("test") {
|
||||
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
|
||||
return new ScheduledFuture<Void>() {
|
||||
@Override
|
||||
|
@ -63,11 +63,11 @@ public class RetryTests extends ESSingleNodeTestCase {
|
||||
protected Settings nodeSettings() {
|
||||
Settings.Builder settings = Settings.builder().put(super.nodeSettings());
|
||||
// Use pools of size 1 so we can block them
|
||||
settings.put("threadpool.bulk.size", 1);
|
||||
settings.put("threadpool.search.size", 1);
|
||||
settings.put("thread_pool.bulk.size", 1);
|
||||
settings.put("thread_pool.search.size", 1);
|
||||
// Use queues of size 1 because size 0 is broken and because search requests need the queue to function
|
||||
settings.put("threadpool.bulk.queue_size", 1);
|
||||
settings.put("threadpool.search.queue_size", 1);
|
||||
settings.put("thread_pool.bulk.queue_size", 1);
|
||||
settings.put("thread_pool.search.queue_size", 1);
|
||||
return settings.build();
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.AfterClass;
|
||||
@ -53,7 +54,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void createThreadPool() {
|
||||
threadPool = new ThreadPool(Ec2DiscoveryTests.class.getName());
|
||||
threadPool = new TestThreadPool(Ec2DiscoveryTests.class.getName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
@ -72,7 +73,7 @@ public class GceDiscoveryTests extends ESTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void createThreadPool() {
|
||||
threadPool = new ThreadPool(GceDiscoveryTests.class.getName());
|
||||
threadPool = new TestThreadPool(GceDiscoveryTests.class.getName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -28,7 +28,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomInts;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
||||
import com.carrotsearch.randomizedtesting.rules.TestRuleAdapter;
|
||||
|
||||
import org.apache.lucene.uninverting.UninvertingReader;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
|
||||
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -34,7 +35,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class NoOpClient extends AbstractClient {
|
||||
|
||||
public NoOpClient(String testName) {
|
||||
super(Settings.EMPTY, new ThreadPool(testName));
|
||||
super(Settings.EMPTY, new TestThreadPool(testName));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.Node;
|
||||
|
||||
public class TestThreadPool extends ThreadPool {
|
||||
|
||||
public TestThreadPool(String name) {
|
||||
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build());
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user