Add node.processors setting in favor of processors (#45885)
This commit namespaces the existing processors setting under the "node" namespace. In doing so, we deprecate the existing processors setting in favor of node.processors.
This commit is contained in:
parent
bdfd90560f
commit
de6b6fd338
|
@ -152,7 +152,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
|||
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
|
||||
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
|
||||
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
|
||||
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
|
||||
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
|
||||
|
||||
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
|
||||
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
|
||||
|
|
|
@ -112,7 +112,7 @@ public class Netty4Transport extends TcpTransport {
|
|||
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
|
||||
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
|
||||
this.workerCount = WORKER_COUNT.get(settings);
|
||||
|
||||
// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
|
||||
|
|
|
@ -447,6 +447,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
Client.CLIENT_TYPE_SETTING_S,
|
||||
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
|
||||
EsExecutors.PROCESSORS_SETTING,
|
||||
EsExecutors.NODE_PROCESSORS_SETTING,
|
||||
ThreadContext.DEFAULT_HEADERS_SETTING,
|
||||
Loggers.LOG_DEFAULT_LEVEL_SETTING,
|
||||
Loggers.LOG_LEVEL_SETTING,
|
||||
|
|
|
@ -44,6 +44,7 @@ import java.util.concurrent.ThreadFactory;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class EsExecutors {
|
||||
|
@ -56,19 +57,33 @@ public class EsExecutors {
|
|||
public static final Setting<Integer> PROCESSORS_SETTING = new Setting<>(
|
||||
"processors",
|
||||
s -> Integer.toString(Runtime.getRuntime().availableProcessors()),
|
||||
s -> {
|
||||
final int value = Setting.parseInt(s, 1, "processors");
|
||||
processorsParser("processors"),
|
||||
Property.Deprecated,
|
||||
Property.NodeScope);
|
||||
|
||||
/**
|
||||
* Setting to manually set the number of available processors. This setting is used to adjust thread pool sizes per node.
|
||||
*/
|
||||
public static final Setting<Integer> NODE_PROCESSORS_SETTING = new Setting<>(
|
||||
"node.processors",
|
||||
PROCESSORS_SETTING,
|
||||
processorsParser("node.processors"),
|
||||
Property.NodeScope);
|
||||
|
||||
private static Function<String, Integer> processorsParser(final String name) {
|
||||
return s -> {
|
||||
final int value = Setting.parseInt(s, 1, name);
|
||||
final int availableProcessors = Runtime.getRuntime().availableProcessors();
|
||||
if (value > availableProcessors) {
|
||||
deprecationLogger.deprecatedAndMaybeLog(
|
||||
"processors",
|
||||
"setting processors to value [{}] which is more than available processors [{}] is deprecated",
|
||||
"setting [" + name + "] to value [{}] which is more than available processors [{}] is deprecated",
|
||||
value,
|
||||
availableProcessors);
|
||||
}
|
||||
return value;
|
||||
},
|
||||
Property.NodeScope);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of available processors. Defaults to
|
||||
|
@ -79,7 +94,7 @@ public class EsExecutors {
|
|||
* @return the number of available processors
|
||||
*/
|
||||
public static int numberOfProcessors(final Settings settings) {
|
||||
return PROCESSORS_SETTING.get(settings);
|
||||
return NODE_PROCESSORS_SETTING.get(settings);
|
||||
}
|
||||
|
||||
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory,
|
||||
|
|
|
@ -203,7 +203,7 @@ public class ClusterStatsIT extends ESIntegTestCase {
|
|||
|
||||
public void testAllocatedProcessors() throws Exception {
|
||||
// start one node with 7 processors.
|
||||
internalCluster().startNode(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build());
|
||||
internalCluster().startNode(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
|
||||
waitForNodes(1);
|
||||
|
||||
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
|
||||
|
|
|
@ -19,10 +19,12 @@
|
|||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -388,4 +390,32 @@ public class EsExecutorsTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testNodeProcessorsBound() {
|
||||
runProcessorsBoundTest(EsExecutors.NODE_PROCESSORS_SETTING);
|
||||
}
|
||||
|
||||
public void testProcessorsBound() {
|
||||
runProcessorsBoundTest(EsExecutors.PROCESSORS_SETTING);
|
||||
}
|
||||
|
||||
private void runProcessorsBoundTest(final Setting<Integer> processorsSetting) {
|
||||
final int available = Runtime.getRuntime().availableProcessors();
|
||||
final int processors = randomIntBetween(available + 1, Integer.MAX_VALUE);
|
||||
final Settings settings = Settings.builder().put(processorsSetting.getKey(), processors).build();
|
||||
processorsSetting.get(settings);
|
||||
final Setting<?>[] deprecatedSettings;
|
||||
if (processorsSetting.getProperties().contains(Setting.Property.Deprecated)) {
|
||||
deprecatedSettings = new Setting<?>[]{processorsSetting};
|
||||
} else {
|
||||
deprecatedSettings = new Setting<?>[0];
|
||||
}
|
||||
final String expectedWarning = String.format(
|
||||
Locale.ROOT,
|
||||
"setting [%s] to value [%d] which is more than available processors [%d] is deprecated",
|
||||
processorsSetting.getKey(),
|
||||
processors,
|
||||
available);
|
||||
assertSettingDeprecationsAndWarnings(deprecatedSettings, expectedWarning);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.elasticsearch.common.logging.Loggers;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.PROCESSORS_SETTING;
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.NODE_PROCESSORS_SETTING;
|
||||
import static org.elasticsearch.index.IndexSettingsTests.newIndexMeta;
|
||||
import static org.elasticsearch.index.MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING;
|
||||
import static org.elasticsearch.index.MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING;
|
||||
|
@ -139,7 +139,7 @@ public class MergeSchedulerSettingsTests extends ESTestCase {
|
|||
builder.put(MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount);
|
||||
}
|
||||
if (numProc != -1) {
|
||||
builder.put(PROCESSORS_SETTING.getKey(), numProc);
|
||||
builder.put(NODE_PROCESSORS_SETTING.getKey(), numProc);
|
||||
}
|
||||
return newIndexMeta("index", builder.build());
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class IndicesServiceCloseTests extends ESTestCase {
|
|||
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
|
||||
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
|
||||
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
|
||||
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
||||
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
||||
.put("transport.type", getTestTransportType())
|
||||
.put(Node.NODE_DATA_SETTING.getKey(), true)
|
||||
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
|
||||
|
|
|
@ -114,8 +114,8 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
|
|||
|
||||
public void testAllocatedProcessors() throws Exception {
|
||||
List<String> nodesIds = internalCluster().startNodes(
|
||||
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 3).build(),
|
||||
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 6).build()
|
||||
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 3).build(),
|
||||
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 6).build()
|
||||
);
|
||||
|
||||
final String node_1 = nodesIds.get(0);
|
||||
|
|
|
@ -54,7 +54,7 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
|||
if (randomBoolean()) {
|
||||
final int processors = randomIntBetween(1, 64);
|
||||
maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors);
|
||||
builder.put("processors", processors);
|
||||
builder.put("node.processors", processors);
|
||||
processorsUsed = processors;
|
||||
} else {
|
||||
maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, availableProcessors);
|
||||
|
@ -99,7 +99,7 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
|||
});
|
||||
|
||||
if (processorsUsed > availableProcessors) {
|
||||
assertWarnings("setting processors to value [" + processorsUsed +
|
||||
assertWarnings("setting node.processors to value [" + processorsUsed +
|
||||
"] which is more than available processors [" + availableProcessors + "] is deprecated");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -197,7 +197,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
|
||||
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
|
||||
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
|
||||
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
||||
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
||||
.put("transport.type", getTestTransportType())
|
||||
.put(TransportSettings.PORT.getKey(), ESTestCase.getPortRange())
|
||||
.put(Node.NODE_DATA_SETTING.getKey(), true)
|
||||
|
|
|
@ -470,7 +470,9 @@ public final class InternalTestCluster extends TestCluster {
|
|||
builder.put(SearchService.DEFAULT_KEEPALIVE_SETTING.getKey(), timeValueSeconds(100 + random.nextInt(5 * 60)).getStringRep());
|
||||
}
|
||||
|
||||
builder.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1 + random.nextInt(Math.min(4, Runtime.getRuntime().availableProcessors())));
|
||||
builder.put(
|
||||
EsExecutors.NODE_PROCESSORS_SETTING.getKey(),
|
||||
1 + random.nextInt(Math.min(4, Runtime.getRuntime().availableProcessors())));
|
||||
if (random.nextBoolean()) {
|
||||
if (random.nextBoolean()) {
|
||||
builder.put("indices.fielddata.cache.size", 1 + random.nextInt(1000), ByteSizeUnit.MB);
|
||||
|
|
Loading…
Reference in New Issue