Allow to control the number of processors sizes are based on

Sometimes, one wants to just control the number of processors our different size base calculations for thread pools and network workers are based on, and not use the reported available processor by the OS. Add processors setting, where it can be controlled.

closes #3643
This commit is contained in:
Shay Banon 2013-09-06 19:04:15 +02:00
parent c9a7bb26ba
commit 2767c081cd
5 changed files with 18 additions and 18 deletions

View File

@ -33,12 +33,12 @@ public class EsExecutors {
/** /**
* Returns the number of processors available but at most <tt>32</tt>. * Returns the number of processors available but at most <tt>32</tt>.
*/ */
public static int boundedNumberOfProcessors() { public static int boundedNumberOfProcessors(Settings settings) {
/* This relates to issues where machines with large number of cores /* This relates to issues where machines with large number of cores
* ie. >= 48 create too many threads and run into OOM see #3478 * ie. >= 48 create too many threads and run into OOM see #3478
* We just use an 32 core upper-bound here to not stress the system * We just use an 32 core upper-bound here to not stress the system
* too much with too many created threads */ * too much with too many created threads */
return Math.min(32, Runtime.getRuntime().availableProcessors()); return settings.getAsInt("processors", Math.min(32, Runtime.getRuntime().availableProcessors()));
} }
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) { public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) {

View File

@ -127,7 +127,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
this.resetCookies = componentSettings.getAsBoolean("reset_cookies", settings.getAsBoolean("http.reset_cookies", false)); this.resetCookies = componentSettings.getAsBoolean("reset_cookies", settings.getAsBoolean("http.reset_cookies", false));
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null); this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1); this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2); this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.blockingServer = settings.getAsBoolean("http.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false))); this.blockingServer = settings.getAsBoolean("http.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("http.port", "9200-9300")); this.port = componentSettings.get("port", settings.get("http.port", "9200-9300"));
this.bindHost = componentSettings.get("bind_host", settings.get("http.bind_host", settings.get("http.host"))); this.bindHost = componentSettings.get("bind_host", settings.get("http.bind_host", settings.get("http.host")));

View File

@ -187,7 +187,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
this.similarityService = similarityService; this.similarityService = similarityService;
this.codecService = codecService; this.codecService = codecService;
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush); this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors() * 0.65))); this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
this.versionMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); this.versionMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough... this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) { for (int i = 0; i < dirtyLocks.length; i++) {

View File

@ -99,7 +99,7 @@ public class ThreadPool extends AbstractComponent {
Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP); Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP);
int availableProcessors = EsExecutors.boundedNumberOfProcessors(); int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5); int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
defaultExecutorTypeSettings = ImmutableMap.<String, Settings>builder() defaultExecutorTypeSettings = ImmutableMap.<String, Settings>builder()
@ -296,7 +296,7 @@ public class ThreadPool extends AbstractComponent {
Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) { } else if ("fixed".equals(type)) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors()); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null)); SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));
if (previousExecutorHolder != null) { if (previousExecutorHolder != null) {
@ -327,7 +327,7 @@ public class ThreadPool extends AbstractComponent {
} else if ("scaling".equals(type)) { } else if ("scaling".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1); int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors()); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
if (previousExecutorHolder != null) { if (previousExecutorHolder != null) {
if ("scaling".equals(previousInfo.getType())) { if ("scaling".equals(previousInfo.getType())) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());

View File

@ -152,8 +152,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile BoundTransportAddress boundAddress; private volatile BoundTransportAddress boundAddress;
private final KeyedLock<String> connectionLock = new KeyedLock<String >(); private final KeyedLock<String> connectionLock = new KeyedLock<String>();
// this lock is here to make sure we close this transport and disconnect all the client nodes // this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
private final ReadWriteLock globalLock = new ReentrantReadWriteLock(); private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
@ -169,7 +169,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
System.setProperty("org.jboss.netty.epollBugWorkaround", "true"); System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
} }
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2); this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.bossCount = componentSettings.getAsInt("boss_count", 1); this.bossCount = componentSettings.getAsInt("boss_count", 1);
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false))); this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false))); this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
@ -591,7 +591,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
NodeChannels nodeChannels = connectedNodes.get(node); NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) { if (nodeChannels != null) {
return; return;
} }
connectionLock.acquire(node.id()); connectionLock.acquire(node.id());
try { try {
if (!lifecycle.started()) { if (!lifecycle.started()) {
@ -755,12 +755,12 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (nodeChannels != null) { if (nodeChannels != null) {
connectionLock.acquire(node.id()); connectionLock.acquire(node.id());
try { try {
try { try {
nodeChannels.close(); nodeChannels.close();
} finally { } finally {
logger.debug("disconnected from [{}]", node); logger.debug("disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node); transportServiceAdapter.raiseNodeDisconnected(node);
} }
} finally { } finally {
connectionLock.release(node.id()); connectionLock.release(node.id());
} }
@ -774,7 +774,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
NodeChannels nodeChannels = connectedNodes.get(node); NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) { if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id()); connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)){ //might have been removed in the meanwhile, safety check if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check
assert !connectedNodes.containsKey(node); assert !connectedNodes.containsKey(node);
} else { } else {
try { try {