Clarify available processors (#54907)

The use of available processors, the terminology, and the settings
around it have evolved over time. This commit cleans up some places in
the codes and in the docs to adjust to the current terminology.
This commit is contained in:
Jason Tedor 2020-04-10 08:38:00 -04:00
parent 51326432be
commit 9eeae59a83
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
17 changed files with 105 additions and 99 deletions

View File

@ -133,9 +133,9 @@ system:
`os.allocated_processors`:: `os.allocated_processors`::
The number of processors actually used to calculate thread pool size. This The number of processors actually used to calculate thread pool size. This
number can be set with the `processors` setting of a node and defaults to number can be set with the <<node.processors, `node.processors`>>
the number of processors reported by the OS. In both cases this number will setting of a node and defaults to the number of processors reported by
never be larger than 32. the OS.
The `process` flag can be set to retrieve information that concern the current The `process` flag can be set to retrieve information that concern the current
running process: running process:

View File

@ -25,7 +25,7 @@ The merge scheduler supports the following _dynamic_ setting:
The maximum number of threads on a single shard that may be merging at once. The maximum number of threads on a single shard that may be merging at once.
Defaults to Defaults to
`Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2))` `Math.max(1, Math.min(4, <<node.processors, node.processors>> / 2))` which
which works well for a good solid-state-disk (SSD). If your index is on works well for a good solid-state-disk (SSD). If your index is on spinning
spinning platter drives instead, decrease this to 1. platter drives instead, decrease this to 1.

View File

@ -15,8 +15,8 @@ There are several thread pools, but the important ones include:
`search`:: `search`::
For count/search/suggest operations. Thread pool type is For count/search/suggest operations. Thread pool type is
`fixed_auto_queue_size` with a size of `fixed_auto_queue_size` with a size of `int((`<<node.processors,
`int((# of available_processors * 3) / 2) + 1`, and initial queue_size of `# of available_processors`>>` * 3) / 2) + 1`, and initial queue_size of
`1000`. `1000`.
[[search-throttled]]`search_throttled`:: [[search-throttled]]`search_throttled`::
@ -26,7 +26,7 @@ There are several thread pools, but the important ones include:
`get`:: `get`::
For get operations. Thread pool type is `fixed` For get operations. Thread pool type is `fixed`
with a size of `# of available processors`, with a size of <<node.processors, `# of allocated processors`>>,
queue_size of `1000`. queue_size of `1000`.
`analyze`:: `analyze`::
@ -35,40 +35,45 @@ There are several thread pools, but the important ones include:
`write`:: `write`::
For single-document index/delete/update and bulk requests. Thread pool type For single-document index/delete/update and bulk requests. Thread pool type
is `fixed` with a size of `# of available processors`, queue_size of `200`. is `fixed` with a size of <<node.processors, `# of allocated processors`>>,
The maximum size for this pool is `1 + # of available processors`. queue_size of `200`. The maximum size for this pool is `1 + `
<<node.processors, `# of allocated processors`>>.
`snapshot`:: `snapshot`::
For snapshot/restore operations. Thread pool type is `scaling` with a For snapshot/restore operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a max of `min(5, (# of available processors)/2)`. keep-alive of `5m` and a max of `min(5, (`<<node.processors,
`# of allocated processors`>>`)/2)`.
`warmer`:: `warmer`::
For segment warm-up operations. Thread pool type is `scaling` with a For segment warm-up operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a max of `min(5, (# of available processors)/2)`. keep-alive of `5m` and a max of `min(5, (`<<node.processors,
`# of allocated processors`>>`)/2)`.
`refresh`:: `refresh`::
For refresh operations. Thread pool type is `scaling` with a For refresh operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a max of `min(10, (# of available processors)/2)`. keep-alive of `5m` and a max of `min(10, (`<<node.processors,
`# of allocated processors`>>`)/2)`.
`listener`:: `listener`::
Mainly for java client executing of action when listener threaded is set to Mainly for java client executing of action when listener threaded is set to
`true`. Thread pool type is `scaling` with a default max of `true`. Thread pool type is `scaling` with a default max of
`min(10, (# of available processors)/2)`. `min(10, (`<<node.processors, `# of allocated processors`>>`)/2)`.
`fetch_shard_started`:: `fetch_shard_started`::
For listing shard states. For listing shard states.
Thread pool type is `scaling` with keep-alive of `5m` and a default maximum Thread pool type is `scaling` with keep-alive of `5m` and a default maximum
size of `2 * # of available processors`. size of `2 * `<<node.processors, `# of allocated processors`>>.
`fetch_shard_store`:: `fetch_shard_store`::
For listing shard stores. For listing shard stores.
Thread pool type is `scaling` with keep-alive of `5m` and a default maximum Thread pool type is `scaling` with keep-alive of `5m` and a default maximum
size of `2 * # of available processors`. size of `2 * `<<node.processors, `# of allocated processors`>>.
`flush`:: `flush`::
For <<indices-flush,flush>>, <<indices-synced-flush-api,synced flush>>, and <<index-modules-translog, translog>> `fsync` operations. For <<indices-flush,flush>>, <<indices-synced-flush-api,synced flush>>, and
Thread pool type is `scaling` with a keep-alive of `5m` and a default <<index-modules-translog, translog>> `fsync` operations. Thread pool type is
maximum size of `min(5, (# of available processors)/2)`. `scaling` with a keep-alive of `5m` and a default maximum size of `min(5, (`
<<node.processors, `# of available processors`>>`) / 2)`.
`force_merge`:: `force_merge`::
For <<indices-forcemerge,force merge>> operations. For <<indices-forcemerge,force merge>> operations.
@ -189,13 +194,13 @@ thread_pool:
-------------------------------------------------- --------------------------------------------------
[float] [float]
[[processors]] [[node.processors]]
=== Processors setting === Allocated processors setting
The number of processors is automatically detected, and the thread pool The number of processors is automatically detected, and the thread pool settings
settings are automatically set based on it. In some cases it can be are automatically set based on it. In some cases it can be useful to override
useful to override the number of detected processors. This can be done the number of detected processors. This can be done by explicitly setting the
by explicitly setting the `node.processors` setting. `node.processors` setting.
[source,yaml] [source,yaml]
-------------------------------------------------- --------------------------------------------------

View File

@ -127,7 +127,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope); }, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope);
public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count", public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Integer.toString(EsExecutors.allocatedProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope); (s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE = public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE =

View File

@ -83,7 +83,7 @@ public class Netty4Transport extends TcpTransport {
public static final Setting<Integer> WORKER_COUNT = public static final Setting<Integer> WORKER_COUNT =
new Setting<>("transport.netty.worker_count", new Setting<>("transport.netty.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Integer.toString(EsExecutors.allocatedProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"), Property.NodeScope); (s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"), Property.NodeScope);
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting( public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting(

View File

@ -57,7 +57,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin {
public static final Setting<Integer> NIO_WORKER_COUNT = public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count", new Setting<>("transport.nio.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Integer.toString(EsExecutors.allocatedProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope); (s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope);
public static final Setting<Integer> NIO_HTTP_WORKER_COUNT = public static final Setting<Integer> NIO_HTTP_WORKER_COUNT =
intSetting("http.nio.worker_count", 0, 0, Setting.Property.NodeScope); intSetting("http.nio.worker_count", 0, 0, Setting.Property.NodeScope);

View File

@ -43,7 +43,7 @@ import java.util.function.LongSupplier;
public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> { public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {
private final int availableProcessors; private final int allocatedProcessors;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ClusterService clusterService; private final ClusterService clusterService;
private final LongSupplier relativeTimeProvider; private final LongSupplier relativeTimeProvider;
@ -55,18 +55,18 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
super(MultiSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<MultiSearchRequest>) MultiSearchRequest::new); super(MultiSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<MultiSearchRequest>) MultiSearchRequest::new);
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.availableProcessors = EsExecutors.numberOfProcessors(settings); this.allocatedProcessors = EsExecutors.allocatedProcessors(settings);
this.relativeTimeProvider = System::nanoTime; this.relativeTimeProvider = System::nanoTime;
this.client = client; this.client = client;
} }
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService, TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, int availableProcessors, ClusterService clusterService, int allocatedProcessors,
LongSupplier relativeTimeProvider, NodeClient client) { LongSupplier relativeTimeProvider, NodeClient client) {
super(MultiSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<MultiSearchRequest>) MultiSearchRequest::new); super(MultiSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<MultiSearchRequest>) MultiSearchRequest::new);
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.availableProcessors = availableProcessors; this.allocatedProcessors = allocatedProcessors;
this.relativeTimeProvider = relativeTimeProvider; this.relativeTimeProvider = relativeTimeProvider;
this.client = client; this.client = client;
} }
@ -80,7 +80,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
int maxConcurrentSearches = request.maxConcurrentSearchRequests(); int maxConcurrentSearches = request.maxConcurrentSearchRequests();
if (maxConcurrentSearches == MultiSearchRequest.MAX_CONCURRENT_SEARCH_REQUESTS_DEFAULT) { if (maxConcurrentSearches == MultiSearchRequest.MAX_CONCURRENT_SEARCH_REQUESTS_DEFAULT) {
maxConcurrentSearches = defaultMaxConcurrentSearches(availableProcessors, clusterState); maxConcurrentSearches = defaultMaxConcurrentSearches(allocatedProcessors, clusterState);
} }
Queue<SearchRequestSlot> searchRequestSlots = new ConcurrentLinkedQueue<>(); Queue<SearchRequestSlot> searchRequestSlots = new ConcurrentLinkedQueue<>();
@ -104,11 +104,10 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
* that shard of the indices the search requests go to are more or less evenly distributed across all nodes in the cluster. But I think * that shard of the indices the search requests go to are more or less evenly distributed across all nodes in the cluster. But I think
* it is a good enough default for most cases, if not then the default should be overwritten in the request itself. * it is a good enough default for most cases, if not then the default should be overwritten in the request itself.
*/ */
static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState state) { static int defaultMaxConcurrentSearches(final int allocatedProcessors, final ClusterState state) {
int numDateNodes = state.getNodes().getDataNodes().size(); int numDateNodes = state.getNodes().getDataNodes().size();
// availableProcessors will never be larger than 32, so max defaultMaxConcurrentSearches will never be larger than 49, // we bound the default concurrency to preserve some search thread pool capacity for other searches
// but we don't know about about other search requests that are being executed so lets cap at 10 per node final int defaultSearchThreadPoolSize = Math.min(ThreadPool.searchThreadPoolSize(allocatedProcessors), 10);
int defaultSearchThreadPoolSize = Math.min(ThreadPool.searchThreadPoolSize(availableProcessors), 10);
return Math.max(1, numDateNodes * defaultSearchThreadPoolSize); return Math.max(1, numDateNodes * defaultSearchThreadPoolSize);
} }

View File

@ -74,7 +74,7 @@ public class PageCacheRecycler {
public PageCacheRecycler(Settings settings) { public PageCacheRecycler(Settings settings) {
final Type type = TYPE_SETTING.get(settings); final Type type = TYPE_SETTING.get(settings);
final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes(); final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes();
final int availableProcessors = EsExecutors.numberOfProcessors(settings); final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
// We have a global amount of memory that we need to divide across data types. // We have a global amount of memory that we need to divide across data types.
// Since some types are more useful than other ones we give them different weights. // Since some types are more useful than other ones we give them different weights.
@ -98,7 +98,7 @@ public class PageCacheRecycler {
final int maxPageCount = (int) Math.min(Integer.MAX_VALUE, limit / PAGE_SIZE_IN_BYTES); final int maxPageCount = (int) Math.min(Integer.MAX_VALUE, limit / PAGE_SIZE_IN_BYTES);
final int maxBytePageCount = (int) (bytesWeight * maxPageCount / totalWeight); final int maxBytePageCount = (int) (bytesWeight * maxPageCount / totalWeight);
bytePage = build(type, maxBytePageCount, availableProcessors, new AbstractRecyclerC<byte[]>() { bytePage = build(type, maxBytePageCount, allocatedProcessors, new AbstractRecyclerC<byte[]>() {
@Override @Override
public byte[] newInstance() { public byte[] newInstance() {
return new byte[BYTE_PAGE_SIZE]; return new byte[BYTE_PAGE_SIZE];
@ -110,7 +110,7 @@ public class PageCacheRecycler {
}); });
final int maxIntPageCount = (int) (intsWeight * maxPageCount / totalWeight); final int maxIntPageCount = (int) (intsWeight * maxPageCount / totalWeight);
intPage = build(type, maxIntPageCount, availableProcessors, new AbstractRecyclerC<int[]>() { intPage = build(type, maxIntPageCount, allocatedProcessors, new AbstractRecyclerC<int[]>() {
@Override @Override
public int[] newInstance() { public int[] newInstance() {
return new int[INT_PAGE_SIZE]; return new int[INT_PAGE_SIZE];
@ -122,7 +122,7 @@ public class PageCacheRecycler {
}); });
final int maxLongPageCount = (int) (longsWeight * maxPageCount / totalWeight); final int maxLongPageCount = (int) (longsWeight * maxPageCount / totalWeight);
longPage = build(type, maxLongPageCount, availableProcessors, new AbstractRecyclerC<long[]>() { longPage = build(type, maxLongPageCount, allocatedProcessors, new AbstractRecyclerC<long[]>() {
@Override @Override
public long[] newInstance() { public long[] newInstance() {
return new long[LONG_PAGE_SIZE]; return new long[LONG_PAGE_SIZE];
@ -134,7 +134,7 @@ public class PageCacheRecycler {
}); });
final int maxObjectPageCount = (int) (objectsWeight * maxPageCount / totalWeight); final int maxObjectPageCount = (int) (objectsWeight * maxPageCount / totalWeight);
objectPage = build(type, maxObjectPageCount, availableProcessors, new AbstractRecyclerC<Object[]>() { objectPage = build(type, maxObjectPageCount, allocatedProcessors, new AbstractRecyclerC<Object[]>() {
@Override @Override
public Object[] newInstance() { public Object[] newInstance() {
return new Object[OBJECT_PAGE_SIZE]; return new Object[OBJECT_PAGE_SIZE];

View File

@ -62,7 +62,9 @@ public class EsExecutors {
Property.NodeScope); Property.NodeScope);
/** /**
* Setting to manually set the number of available processors. This setting is used to adjust thread pool sizes per node. * Setting to manually control the number of allocated processors. This setting is used to adjust thread pool sizes per node. The
* default value is {@link Runtime#availableProcessors()} but should be manually controlled if not all processors on the machine are
* available to Elasticsearch (e.g., because of CPU limits).
*/ */
public static final Setting<Integer> NODE_PROCESSORS_SETTING = new Setting<>( public static final Setting<Integer> NODE_PROCESSORS_SETTING = new Setting<>(
"node.processors", "node.processors",
@ -86,14 +88,13 @@ public class EsExecutors {
} }
/** /**
* Returns the number of available processors. Defaults to * Returns the number of allocated processors. Defaults to {@link Runtime#availableProcessors()} but can be overridden by passing a
* {@link Runtime#availableProcessors()} but can be overridden by passing a {@link Settings} * {@link Settings} instance with the key {@code node.processors} set to the desired value.
* instance with the key "processors" set to the desired value.
* *
* @param settings a {@link Settings} instance from which to derive the available processors * @param settings a {@link Settings} instance from which to derive the allocated processors
* @return the number of available processors * @return the number of allocated processors
*/ */
public static int numberOfProcessors(final Settings settings) { public static int allocatedProcessors(final Settings settings) {
return NODE_PROCESSORS_SETTING.get(settings); return NODE_PROCESSORS_SETTING.get(settings);
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index;
import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
/** /**
@ -36,7 +37,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
* <li> <code>index.merge.scheduler.max_thread_count</code>: * <li> <code>index.merge.scheduler.max_thread_count</code>:
* *
* The maximum number of threads that may be merging at once. Defaults to * The maximum number of threads that may be merging at once. Defaults to
* <code>Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2))</code> * <code>Math.max(1, Math.min(4, {@link EsExecutors#allocatedProcessors(Settings)} / 2))</code>
* which works well for a good solid-state-disk (SSD). If your index is on * which works well for a good solid-state-disk (SSD). If your index is on
* spinning platter drives instead, decrease this to 1. * spinning platter drives instead, decrease this to 1.
* *
@ -54,7 +55,7 @@ public final class MergeSchedulerConfig {
public static final Setting<Integer> MAX_THREAD_COUNT_SETTING = public static final Setting<Integer> MAX_THREAD_COUNT_SETTING =
new Setting<>("index.merge.scheduler.max_thread_count", new Setting<>("index.merge.scheduler.max_thread_count",
(s) -> Integer.toString(Math.max(1, Math.min(4, EsExecutors.numberOfProcessors(s) / 2))), (s) -> Integer.toString(Math.max(1, Math.min(4, EsExecutors.allocatedProcessors(s) / 2))),
(s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_thread_count"), Property.Dynamic, (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_thread_count"), Property.Dynamic,
Property.IndexScope); Property.IndexScope);
public static final Setting<Integer> MAX_MERGE_COUNT_SETTING = public static final Setting<Integer> MAX_MERGE_COUNT_SETTING =

View File

@ -45,7 +45,7 @@ public class OsService {
public OsService(Settings settings) throws IOException { public OsService(Settings settings) throws IOException {
this.probe = OsProbe.getInstance(); this.probe = OsProbe.getInstance();
TimeValue refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); TimeValue refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
this.info = probe.osInfo(refreshInterval.millis(), EsExecutors.numberOfProcessors(settings)); this.info = probe.osInfo(refreshInterval.millis(), EsExecutors.allocatedProcessors(settings));
this.osStatsCache = new OsStatsCache(refreshInterval, probe.osStats()); this.osStatsCache = new OsStatsCache(refreshInterval, probe.osStats());
logger.debug("using refresh_interval [{}]", refreshInterval); logger.debug("using refresh_interval [{}]", refreshInterval);
} }

View File

@ -49,7 +49,7 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
protected int applyHardSizeLimit(final Settings settings, final String name) { protected int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE)) { if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE)) {
return 1 + EsExecutors.numberOfProcessors(settings); return 1 + EsExecutors.allocatedProcessors(settings);
} else { } else {
return Integer.MAX_VALUE; return Integer.MAX_VALUE;
} }

View File

@ -166,16 +166,16 @@ public class ThreadPool implements Scheduler {
assert Node.NODE_NAME_SETTING.exists(settings); assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>(); final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int availableProcessors = EsExecutors.numberOfProcessors(settings); final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200)); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 200));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, 1000, 1000, 2000));
builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings, builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200)); Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
@ -187,10 +187,10 @@ public class ThreadPool implements Scheduler {
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED, builders.put(Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
builders.put(Names.FETCH_SHARD_STORE, builders.put(Names.FETCH_SHARD_STORE,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
for (final ExecutorBuilder<?> builder : customBuilders) { for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) { if (builders.containsKey(builder.name())) {
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
@ -436,20 +436,20 @@ public class ThreadPool implements Scheduler {
return Math.min(max, Math.max(min, value)); return Math.min(max, Math.max(min, value));
} }
static int halfNumberOfProcessorsMaxFive(int numberOfProcessors) { static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((numberOfProcessors + 1) / 2, 1, 5); return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
} }
static int halfNumberOfProcessorsMaxTen(int numberOfProcessors) { static int halfAllocatedProcessorsMaxTen(final int allocatedProcessors) {
return boundedBy((numberOfProcessors + 1) / 2, 1, 10); return boundedBy((allocatedProcessors + 1) / 2, 1, 10);
} }
static int twiceNumberOfProcessors(int numberOfProcessors) { static int twiceAllocatedProcessors(final int allocatedProcessors) {
return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE); return boundedBy(2 * allocatedProcessors, 2, Integer.MAX_VALUE);
} }
public static int searchThreadPoolSize(int availableProcessors) { public static int searchThreadPoolSize(final int allocatedProcessors) {
return ((availableProcessors * 3) / 2) + 1; return ((allocatedProcessors * 3) / 2) + 1;
} }
class LoggingRunnable implements Runnable { class LoggingRunnable implements Runnable {

View File

@ -34,7 +34,7 @@ public class FixedThreadPoolTests extends ESThreadPoolTestCase {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
// some of the fixed thread pool are bound by the number of // some of the fixed thread pool are bound by the number of
// cores so we can not exceed that // cores so we can not exceed that
final int size = randomIntBetween(1, EsExecutors.numberOfProcessors(Settings.EMPTY)); final int size = randomIntBetween(1, EsExecutors.allocatedProcessors(Settings.EMPTY));
final int queueSize = randomIntBetween(1, 16); final int queueSize = randomIntBetween(1, 16);
final long rejections = randomIntBetween(1, 16); final long rejections = randomIntBetween(1, 16);

View File

@ -108,12 +108,12 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
final Map<String, Function<Integer, Integer>> sizes = new HashMap<>(); final Map<String, Function<Integer, Integer>> sizes = new HashMap<>();
sizes.put(ThreadPool.Names.GENERIC, n -> ThreadPool.boundedBy(4 * n, 128, 512)); sizes.put(ThreadPool.Names.GENERIC, n -> ThreadPool.boundedBy(4 * n, 128, 512));
sizes.put(ThreadPool.Names.MANAGEMENT, n -> 5); sizes.put(ThreadPool.Names.MANAGEMENT, n -> 5);
sizes.put(ThreadPool.Names.FLUSH, ThreadPool::halfNumberOfProcessorsMaxFive); sizes.put(ThreadPool.Names.FLUSH, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfNumberOfProcessorsMaxTen); sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfNumberOfProcessorsMaxFive); sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfNumberOfProcessorsMaxFive); sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceNumberOfProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors);
sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceNumberOfProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors);
return sizes.get(threadPoolName).apply(numberOfProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors);
} }

View File

@ -61,7 +61,7 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
} }
public void testWriteThreadPoolsMaxSize() throws InterruptedException { public void testWriteThreadPoolsMaxSize() throws InterruptedException {
final int maxSize = 1 + EsExecutors.numberOfProcessors(Settings.EMPTY); final int maxSize = 1 + EsExecutors.allocatedProcessors(Settings.EMPTY);
final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE); final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE);
// try to create a too big thread pool // try to create a too big thread pool
@ -88,7 +88,7 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
private static int getExpectedThreadPoolSize(Settings settings, String name, int size) { private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
if (name.equals(ThreadPool.Names.WRITE)) { if (name.equals(ThreadPool.Names.WRITE)) {
return Math.min(size, EsExecutors.numberOfProcessors(settings)); return Math.min(size, EsExecutors.allocatedProcessors(settings));
} else { } else {
return size; return size;
} }
@ -192,7 +192,7 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
new ScalingExecutorBuilder( new ScalingExecutorBuilder(
"my_pool1", "my_pool1",
1, 1,
EsExecutors.numberOfProcessors(Settings.EMPTY), EsExecutors.allocatedProcessors(Settings.EMPTY),
TimeValue.timeValueMinutes(1)); TimeValue.timeValueMinutes(1));
final FixedExecutorBuilder fixed = new FixedExecutorBuilder(Settings.EMPTY, "my_pool2", 1, 1); final FixedExecutorBuilder fixed = new FixedExecutorBuilder(Settings.EMPTY, "my_pool2", 1, 1);

View File

@ -537,12 +537,12 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
* @return A number between 5 and the number of processors * @return A number between 5 and the number of processors
*/ */
static int getWatcherThreadPoolSize(final Settings settings) { static int getWatcherThreadPoolSize(final Settings settings) {
return getWatcherThreadPoolSize(Node.NODE_DATA_SETTING.get(settings), EsExecutors.numberOfProcessors(settings)); return getWatcherThreadPoolSize(Node.NODE_DATA_SETTING.get(settings), EsExecutors.allocatedProcessors(settings));
} }
static int getWatcherThreadPoolSize(final boolean isDataNode, final int numberOfProcessors) { static int getWatcherThreadPoolSize(final boolean isDataNode, final int allocatedProcessors) {
if (isDataNode) { if (isDataNode) {
final long size = Math.max(Math.min(5 * numberOfProcessors, 50), numberOfProcessors); final long size = Math.max(Math.min(5 * allocatedProcessors, 50), allocatedProcessors);
return Math.toIntExact(size); return Math.toIntExact(size);
} else { } else {
return 1; return 1;