Remove bulk fallback for write thread pool (#29609)
The name of the bulk thread pool was renamed to "write" with "bulk" as a fallback name. This change was made in 6.x for BWC reasons yet in 7.0.0 we are removing this fallback. This commit removes this fallback for the write thread pool.
This commit is contained in:
parent
113d1d3eab
commit
5d767e449a
|
@ -9,6 +9,9 @@
|
|||
|
||||
=== Breaking Changes
|
||||
|
||||
<<write-thread-pool-fallback, Removed `thread_pool.bulk.*` settings and
|
||||
`es.thread_pool.write.use_bulk_as_display_name` system property>> ({pull}29609[#29609])
|
||||
|
||||
=== Breaking Java Changes
|
||||
|
||||
=== Deprecations
|
||||
|
|
|
@ -14,3 +14,14 @@
|
|||
executed on the bulk thread pool. As such, the indexing thread pool is no
|
||||
longer needed and has been removed. As such, the settings
|
||||
`thread_pool.index.size` and `thread_pool.index.queue_size` have been removed.
|
||||
|
||||
[[write-thread-pool-fallback]]
|
||||
==== Write thread pool fallback
|
||||
|
||||
* The bulk thread pool was replaced by the write thread pool in 6.3.0. However,
|
||||
for backwards compatibility reasons the name `bulk` was still usable as fallback
|
||||
settings `thread_pool.bulk.size` and `thread_pool.bulk.queue_size` for
|
||||
`thread_pool.write.size` and `thread_pool.write.queue_size`, respectively, and
|
||||
the system property `es.thread_pool.write.use_bulk_as_display_name` was
|
||||
available to keep the display output in APIs as `bulk` instead of `write`.
|
||||
These fallback settings and this system property have been removed.
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.threadpool;
|
||||
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
|
@ -39,9 +38,7 @@ import java.util.concurrent.ThreadFactory;
|
|||
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {
|
||||
|
||||
private final Setting<Integer> sizeSetting;
|
||||
private final Setting<Integer> fallbackSizeSetting;
|
||||
private final Setting<Integer> queueSizeSetting;
|
||||
private final Setting<Integer> fallbackQueueSizeSetting;
|
||||
|
||||
/**
|
||||
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
|
||||
|
@ -55,19 +52,6 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
|
|||
this(settings, name, size, queueSize, "thread_pool." + name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
|
||||
*
|
||||
* @param settings the node-level settings
|
||||
* @param name the name of the executor
|
||||
* @param fallbackName the fallback name of the executor (used for transitioning the name of a setting)
|
||||
* @param size the fixed number of threads
|
||||
* @param queueSize the size of the backing queue, -1 for unbounded
|
||||
*/
|
||||
FixedExecutorBuilder(final Settings settings, final String name, final String fallbackName, final int size, final int queueSize) {
|
||||
this(settings, name, fallbackName, size, queueSize, "thread_pool." + name, "thread_pool." + fallbackName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a fixed executor builder.
|
||||
*
|
||||
|
@ -78,81 +62,21 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
|
|||
* @param prefix the prefix for the settings keys
|
||||
*/
|
||||
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
|
||||
this(settings, name, null, size, queueSize, prefix, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a fixed executor builder.
|
||||
*
|
||||
* @param settings the node-level settings
|
||||
* @param name the name of the executor
|
||||
* @param size the fixed number of threads
|
||||
* @param queueSize the size of the backing queue, -1 for unbounded
|
||||
* @param prefix the prefix for the settings keys
|
||||
*/
|
||||
private FixedExecutorBuilder(
|
||||
final Settings settings,
|
||||
final String name,
|
||||
final String fallbackName,
|
||||
final int size,
|
||||
final int queueSize,
|
||||
final String prefix,
|
||||
final String fallbackPrefix) {
|
||||
super(name);
|
||||
final String sizeKey = settingsKey(prefix, "size");
|
||||
this.sizeSetting =
|
||||
new Setting<>(
|
||||
sizeKey,
|
||||
s -> Integer.toString(size),
|
||||
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
|
||||
Setting.Property.NodeScope);
|
||||
final String queueSizeKey = settingsKey(prefix, "queue_size");
|
||||
if (fallbackName == null) {
|
||||
assert fallbackPrefix == null;
|
||||
final Setting.Property[] properties = {Setting.Property.NodeScope};
|
||||
this.sizeSetting = sizeSetting(settings, name, size, prefix, properties);
|
||||
this.fallbackSizeSetting = null;
|
||||
this.queueSizeSetting = queueSizeSetting(prefix, queueSize, properties);
|
||||
this.fallbackQueueSizeSetting = null;
|
||||
} else {
|
||||
assert fallbackPrefix != null;
|
||||
final Setting.Property[] properties = { Setting.Property.NodeScope };
|
||||
final Setting.Property[] fallbackProperties = { Setting.Property.NodeScope, Setting.Property.Deprecated };
|
||||
final Setting<Integer> fallbackSizeSetting = sizeSetting(settings, fallbackName, size, fallbackPrefix, fallbackProperties);
|
||||
this.sizeSetting =
|
||||
new Setting<>(
|
||||
new Setting.SimpleKey(sizeKey),
|
||||
fallbackSizeSetting,
|
||||
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
|
||||
properties);
|
||||
this.fallbackSizeSetting = fallbackSizeSetting;
|
||||
final Setting<Integer> fallbackQueueSizeSetting = queueSizeSetting(fallbackPrefix, queueSize, fallbackProperties);
|
||||
this.queueSizeSetting =
|
||||
new Setting<>(
|
||||
new Setting.SimpleKey(queueSizeKey),
|
||||
fallbackQueueSizeSetting,
|
||||
s -> Setting.parseInt(s, Integer.MIN_VALUE, queueSizeKey),
|
||||
properties);
|
||||
this.fallbackQueueSizeSetting = fallbackQueueSizeSetting;
|
||||
}
|
||||
}
|
||||
|
||||
private Setting<Integer> sizeSetting(
|
||||
final Settings settings, final String name, final int size, final String prefix, final Setting.Property[] properties) {
|
||||
final String sizeKey = settingsKey(prefix, "size");
|
||||
return new Setting<>(
|
||||
sizeKey,
|
||||
s -> Integer.toString(size),
|
||||
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
|
||||
properties);
|
||||
}
|
||||
|
||||
private Setting<Integer> queueSizeSetting(final String prefix, final int queueSize, final Setting.Property[] properties) {
|
||||
return Setting.intSetting(settingsKey(prefix, "queue_size"), queueSize, properties);
|
||||
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getRegisteredSettings() {
|
||||
if (fallbackSizeSetting == null && fallbackQueueSizeSetting == null) {
|
||||
return Arrays.asList(sizeSetting, queueSizeSetting);
|
||||
} else {
|
||||
assert fallbackSizeSetting != null && fallbackQueueSizeSetting != null;
|
||||
return Arrays.asList(sizeSetting, fallbackSizeSetting, queueSizeSetting, fallbackQueueSizeSetting);
|
||||
}
|
||||
return Arrays.asList(sizeSetting, queueSizeSetting);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -170,14 +94,8 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
|
|||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
|
||||
final ExecutorService executor =
|
||||
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
|
||||
final String name;
|
||||
if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) {
|
||||
name = "bulk";
|
||||
} else {
|
||||
name = name();
|
||||
}
|
||||
final ThreadPool.Info info =
|
||||
new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
|
||||
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
|
||||
return new ThreadPool.ExecutorHolder(executor, info);
|
||||
}
|
||||
|
||||
|
|
|
@ -170,7 +170,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
|
|||
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
|
||||
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
|
||||
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
|
||||
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
|
||||
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200));
|
||||
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
|
||||
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
|
||||
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
|
||||
|
|
|
@ -45,8 +45,8 @@ public class RejectionActionIT extends ESIntegTestCase {
|
|||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("thread_pool.search.size", 1)
|
||||
.put("thread_pool.search.queue_size", 1)
|
||||
.put("thread_pool.bulk.size", 1)
|
||||
.put("thread_pool.bulk.queue_size", 1)
|
||||
.put("thread_pool.write.size", 1)
|
||||
.put("thread_pool.write.queue_size", 1)
|
||||
.put("thread_pool.get.size", 1)
|
||||
.put("thread_pool.get.queue_size", 1)
|
||||
.build();
|
||||
|
|
Loading…
Reference in New Issue