Deprecate the listener thread pool (#53266)

The listener thread pool is being removed from use in the server
codebase. This commit deprecates configuring the listener thread pool.
This commit is contained in:
Jason Tedor 2020-03-09 16:50:32 -04:00
parent 62c8ac9993
commit 1860c57147
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
5 changed files with 75 additions and 9 deletions

View File

@ -65,3 +65,10 @@ for `gt` and `lte` boundaries, the same queries on `date_range` fields didn't
do this. The behavior is now the same for both field types like documented in do this. The behavior is now the same for both field types like documented in
<<range-query-date-math-rounding>>. <<range-query-date-math-rounding>>.
[float]
==== `thread_pool.listener.size` and `thread_pool.listener.queue_size` have been deprecated
The listener thread pool is no longer used internally by Elasticsearch.
Therefore, these settings have been deprecated. You can safely remove these
settings from the configuration of your nodes.

View File

@ -49,7 +49,26 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param queueSize the size of the backing queue, -1 for unbounded * @param queueSize the size of the backing queue, -1 for unbounded
*/ */
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) { FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
this(settings, name, size, queueSize, "thread_pool." + name); this(settings, name, size, queueSize, false);
}
/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param deprecated whether or not the thread pool is deprecated
*/
FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final boolean deprecated
) {
this(settings, name, size, queueSize, "thread_pool." + name, deprecated);
} }
/** /**
@ -62,16 +81,43 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param prefix the prefix for the settings keys * @param prefix the prefix for the settings keys
*/ */
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) { public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
this(settings, name, size, queueSize, prefix, false);
}
/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
* @param deprecated whether or not the thread pool is deprecated
*/
public FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final String prefix,
final boolean deprecated
) {
super(name); super(name);
final String sizeKey = settingsKey(prefix, "size"); final String sizeKey = settingsKey(prefix, "size");
final Setting.Property[] properties;
if (deprecated) {
properties = new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated};
} else {
properties = new Setting.Property[]{Setting.Property.NodeScope};
}
this.sizeSetting = this.sizeSetting =
new Setting<>( new Setting<>(
sizeKey, sizeKey,
s -> Integer.toString(size), s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope); properties);
final String queueSizeKey = settingsKey(prefix, "queue_size"); final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope); this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties);
} }
@Override @Override

View File

@ -66,7 +66,7 @@ public class ThreadPool implements Scheduler {
public static class Names { public static class Names {
public static final String SAME = "same"; public static final String SAME = "same";
public static final String GENERIC = "generic"; public static final String GENERIC = "generic";
public static final String LISTENER = "listener"; @Deprecated public static final String LISTENER = "listener";
public static final String GET = "get"; public static final String GET = "get";
public static final String ANALYZE = "analyze"; public static final String ANALYZE = "analyze";
public static final String WRITE = "write"; public static final String WRITE = "write";
@ -181,7 +181,7 @@ public class ThreadPool implements Scheduler {
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side // the assumption here is that the listeners should be very lightweight on the listeners side
builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1)); builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, true));
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

View File

@ -22,6 +22,7 @@ package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -88,6 +89,10 @@ public class FixedThreadPoolTests extends ESThreadPoolTestCase {
} finally { } finally {
terminateThreadPoolIfNeeded(threadPool); terminateThreadPoolIfNeeded(threadPool);
} }
if (Names.LISTENER.equals(threadPoolName)) {
assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.queue_size", "thread_pool.listener.size"});
}
} }
} }

View File

@ -118,6 +118,10 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
} finally { } finally {
terminateThreadPoolIfNeeded(threadPool); terminateThreadPoolIfNeeded(threadPool);
} }
if (Names.LISTENER.equals(threadPoolName)) {
assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.size"});
}
} }
public void testScalingExecutorType() throws InterruptedException { public void testScalingExecutorType() throws InterruptedException {
@ -173,6 +177,10 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
} finally { } finally {
terminateThreadPoolIfNeeded(threadPool); terminateThreadPoolIfNeeded(threadPool);
} }
if (Names.LISTENER.equals(threadPoolName)) {
assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.queue_size"});
}
} }
public void testCustomThreadPool() throws Exception { public void testCustomThreadPool() throws Exception {