Rename the bulk thread pool to write thread pool (#29593)
This commit renames the bulk thread pool to the write thread pool. This is to better reflect the fact that the underlying thread pool is used to execute any document write request (single-document index/delete/update requests, and bulk requests). With this change, we add support for fallback settings thread_pool.bulk.* which will be supported until 7.0.0. We also add a system property so that the display name of the thread pool remains as "bulk" if needed to avoid breaking users.
This commit is contained in:
parent
fa1052017c
commit
c12c2a6cc9
|
@ -93,8 +93,8 @@ Responds with:
|
||||||
// TESTRESPONSE[s/9300 27 sLBaIGK/\\d+ \\d+ .+/ _cat]
|
// TESTRESPONSE[s/9300 27 sLBaIGK/\\d+ \\d+ .+/ _cat]
|
||||||
|
|
||||||
You can also request multiple columns using simple wildcards like
|
You can also request multiple columns using simple wildcards like
|
||||||
`/_cat/thread_pool?h=ip,bulk.*` to get all headers (or aliases) starting
|
`/_cat/thread_pool?h=ip,queue*` to get all headers (or aliases) starting
|
||||||
with `bulk.`.
|
with `queue`.
|
||||||
|
|
||||||
[float]
|
[float]
|
||||||
[[numeric-formats]]
|
[[numeric-formats]]
|
||||||
|
|
|
@ -15,7 +15,6 @@ Which looks like:
|
||||||
[source,txt]
|
[source,txt]
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
node-0 analyze 0 0 0
|
node-0 analyze 0 0 0
|
||||||
node-0 bulk 0 0 0
|
|
||||||
node-0 fetch_shard_started 0 0 0
|
node-0 fetch_shard_started 0 0 0
|
||||||
node-0 fetch_shard_store 0 0 0
|
node-0 fetch_shard_store 0 0 0
|
||||||
node-0 flush 0 0 0
|
node-0 flush 0 0 0
|
||||||
|
@ -28,6 +27,7 @@ node-0 refresh 0 0 0
|
||||||
node-0 search 0 0 0
|
node-0 search 0 0 0
|
||||||
node-0 snapshot 0 0 0
|
node-0 snapshot 0 0 0
|
||||||
node-0 warmer 0 0 0
|
node-0 warmer 0 0 0
|
||||||
|
node-0 write 0 0 0
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
// TESTRESPONSE[s/\d+/\\d+/ _cat]
|
// TESTRESPONSE[s/\d+/\\d+/ _cat]
|
||||||
|
|
||||||
|
@ -44,7 +44,6 @@ The second column is the thread pool name
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
name
|
name
|
||||||
analyze
|
analyze
|
||||||
bulk
|
|
||||||
fetch_shard_started
|
fetch_shard_started
|
||||||
fetch_shard_store
|
fetch_shard_store
|
||||||
flush
|
flush
|
||||||
|
@ -57,6 +56,7 @@ refresh
|
||||||
search
|
search
|
||||||
snapshot
|
snapshot
|
||||||
warmer
|
warmer
|
||||||
|
write
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -27,11 +27,10 @@ There are several thread pools, but the important ones include:
|
||||||
`analyze`::
|
`analyze`::
|
||||||
For analyze requests. Thread pool type is `fixed` with a size of 1, queue size of 16.
|
For analyze requests. Thread pool type is `fixed` with a size of 1, queue size of 16.
|
||||||
|
|
||||||
`bulk`::
|
`write`::
|
||||||
For bulk operations. Thread pool type is `fixed`
|
For single-document index/delete/update and bulk requests. Thread pool type
|
||||||
with a size of `# of available processors`,
|
is `fixed` with a size of `# of available processors`, queue_size of `200`.
|
||||||
queue_size of `200`. The maximum size for this pool
|
The maximum size for this pool is `1 + # of available processors`.
|
||||||
is `1 + # of available processors`.
|
|
||||||
|
|
||||||
`snapshot`::
|
`snapshot`::
|
||||||
For snapshot/restore operations. Thread pool type is `scaling` with a
|
For snapshot/restore operations. Thread pool type is `scaling` with a
|
||||||
|
|
|
@ -158,10 +158,10 @@ public class RetryTests extends ESIntegTestCase {
|
||||||
|
|
||||||
final Settings nodeSettings = Settings.builder()
|
final Settings nodeSettings = Settings.builder()
|
||||||
// use pools of size 1 so we can block them
|
// use pools of size 1 so we can block them
|
||||||
.put("thread_pool.bulk.size", 1)
|
.put("thread_pool.write.size", 1)
|
||||||
.put("thread_pool.search.size", 1)
|
.put("thread_pool.search.size", 1)
|
||||||
// use queues of size 1 because size 0 is broken and because search requests need the queue to function
|
// use queues of size 1 because size 0 is broken and because search requests need the queue to function
|
||||||
.put("thread_pool.bulk.queue_size", 1)
|
.put("thread_pool.write.queue_size", 1)
|
||||||
.put("thread_pool.search.queue_size", 1)
|
.put("thread_pool.search.queue_size", 1)
|
||||||
.put("node.attr.color", "blue")
|
.put("node.attr.color", "blue")
|
||||||
.build();
|
.build();
|
||||||
|
@ -203,7 +203,7 @@ public class RetryTests extends ESIntegTestCase {
|
||||||
assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L)));
|
assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L)));
|
||||||
|
|
||||||
logger.info("Blocking bulk and unblocking search so we start to get bulk rejections");
|
logger.info("Blocking bulk and unblocking search so we start to get bulk rejections");
|
||||||
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK, node);
|
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node);
|
||||||
initialSearchBlock.await();
|
initialSearchBlock.await();
|
||||||
|
|
||||||
logger.info("Waiting for bulk rejections");
|
logger.info("Waiting for bulk rejections");
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
---
|
---
|
||||||
"Test cat thread_pool output":
|
"Test cat thread_pool output":
|
||||||
|
|
||||||
- skip:
|
- skip:
|
||||||
version: " - 6.99.99"
|
version: " - 6.99.99"
|
||||||
reason: this API was changed in a backwards-incompatible fashion in 7.0.0 so we need to skip in a mixed cluster
|
reason: this API was changed in a backwards-incompatible fashion in 7.0.0 so we need to skip in a mixed cluster
|
||||||
|
@ -33,29 +32,29 @@
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
cat.thread_pool:
|
cat.thread_pool:
|
||||||
thread_pool_patterns: bulk,management,flush,generic,force_merge
|
thread_pool_patterns: write,management,flush,generic,force_merge
|
||||||
h: id,name,active
|
h: id,name,active
|
||||||
v: true
|
v: true
|
||||||
|
|
||||||
- match:
|
- match:
|
||||||
$body: |
|
$body: |
|
||||||
/^ id \s+ name \s+ active \n
|
/^ id \s+ name \s+ active \n
|
||||||
(\S+\s+ bulk \s+ \d+ \n
|
(\S+\s+ flush \s+ \d+ \n
|
||||||
\S+\s+ flush \s+ \d+ \n
|
|
||||||
\S+\s+ force_merge \s+ \d+ \n
|
\S+\s+ force_merge \s+ \d+ \n
|
||||||
\S+\s+ generic \s+ \d+ \n
|
\S+\s+ generic \s+ \d+ \n
|
||||||
\S+\s+ management \s+ \d+ \n)+ $/
|
\S+\s+ management \s+ \d+ \n
|
||||||
|
\S+\s+ write \s+ \d+ \n)+ $/
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
cat.thread_pool:
|
cat.thread_pool:
|
||||||
thread_pool_patterns: bulk
|
thread_pool_patterns: write
|
||||||
h: id,name,type,active,pool_size,queue,queue_size,rejected,largest,completed,core,max,size,keep_alive
|
h: id,name,type,active,size,queue,queue_size,rejected,largest,completed,min,max,keep_alive
|
||||||
v: true
|
v: true
|
||||||
|
|
||||||
- match:
|
- match:
|
||||||
$body: |
|
$body: |
|
||||||
/^ id \s+ name \s+ type \s+ active \s+ pool_size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ core \s+ max \s+ size \s+ keep_alive \n
|
/^ id \s+ name \s+ type \s+ active \s+ size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ max \s+ keep_alive \n
|
||||||
(\S+ \s+ bulk \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \d* \s+ \S* \n)+ $/
|
(\S+ \s+ write \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \S* \n)+ $/
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
cat.thread_pool:
|
cat.thread_pool:
|
||||||
|
@ -71,11 +70,11 @@
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
cat.thread_pool:
|
cat.thread_pool:
|
||||||
thread_pool_patterns: bulk,search
|
thread_pool_patterns: write,search
|
||||||
size: ""
|
size: ""
|
||||||
|
|
||||||
- match:
|
- match:
|
||||||
$body: |
|
$body: |
|
||||||
/ #node_name name active queue rejected
|
/ #node_name name active queue rejected
|
||||||
^ (\S+ \s+ bulk \s+ \d+ \s+ \d+ \s+ \d+ \n
|
^ (\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n
|
||||||
\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
|
\S+ \s+ write \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
|
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
|
||||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||||
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.BULK);
|
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE);
|
||||||
this.updateHelper = updateHelper;
|
this.updateHelper = updateHelper;
|
||||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class TransportDeleteAction extends TransportSingleItemBulkWriteAction<De
|
||||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
||||||
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
||||||
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.BULK,
|
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.WRITE,
|
||||||
bulkAction, shardBulkAction);
|
bulkAction, shardBulkAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class TransportIndexAction extends TransportSingleItemBulkWriteAction<Ind
|
||||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
||||||
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
||||||
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.BULK,
|
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE,
|
||||||
bulkAction, shardBulkAction);
|
bulkAction, shardBulkAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
|
||||||
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||||
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK);
|
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -86,7 +86,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String executor() {
|
protected String executor() {
|
||||||
return ThreadPool.Names.BULK;
|
return ThreadPool.Names.WRITE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class PipelineExecutionService implements ClusterStateApplier {
|
||||||
public void executeBulkRequest(Iterable<DocWriteRequest> actionRequests,
|
public void executeBulkRequest(Iterable<DocWriteRequest> actionRequests,
|
||||||
BiConsumer<IndexRequest, Exception> itemFailureHandler,
|
BiConsumer<IndexRequest, Exception> itemFailureHandler,
|
||||||
Consumer<Exception> completionHandler) {
|
Consumer<Exception> completionHandler) {
|
||||||
threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() {
|
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
|
|
|
@ -48,7 +48,7 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int applyHardSizeLimit(final Settings settings, final String name) {
|
protected int applyHardSizeLimit(final Settings settings, final String name) {
|
||||||
if (name.equals(ThreadPool.Names.BULK)) {
|
if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE)) {
|
||||||
return 1 + EsExecutors.numberOfProcessors(settings);
|
return 1 + EsExecutors.numberOfProcessors(settings);
|
||||||
} else {
|
} else {
|
||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.threadpool;
|
package org.elasticsearch.threadpool;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.Booleans;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.SizeValue;
|
import org.elasticsearch.common.unit.SizeValue;
|
||||||
|
@ -38,7 +39,9 @@ import java.util.concurrent.ThreadFactory;
|
||||||
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {
|
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {
|
||||||
|
|
||||||
private final Setting<Integer> sizeSetting;
|
private final Setting<Integer> sizeSetting;
|
||||||
|
private final Setting<Integer> fallbackSizeSetting;
|
||||||
private final Setting<Integer> queueSizeSetting;
|
private final Setting<Integer> queueSizeSetting;
|
||||||
|
private final Setting<Integer> fallbackQueueSizeSetting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
|
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
|
||||||
|
@ -52,6 +55,19 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
|
||||||
this(settings, name, size, queueSize, "thread_pool." + name);
|
this(settings, name, size, queueSize, "thread_pool." + name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
|
||||||
|
*
|
||||||
|
* @param settings the node-level settings
|
||||||
|
* @param name the name of the executor
|
||||||
|
* @param fallbackName the fallback name of the executor (used for transitioning the name of a setting)
|
||||||
|
* @param size the fixed number of threads
|
||||||
|
* @param queueSize the size of the backing queue, -1 for unbounded
|
||||||
|
*/
|
||||||
|
FixedExecutorBuilder(final Settings settings, final String name, final String fallbackName, final int size, final int queueSize) {
|
||||||
|
this(settings, name, fallbackName, size, queueSize, "thread_pool." + name, "thread_pool." + fallbackName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a fixed executor builder.
|
* Construct a fixed executor builder.
|
||||||
*
|
*
|
||||||
|
@ -62,21 +78,81 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
|
||||||
* @param prefix the prefix for the settings keys
|
* @param prefix the prefix for the settings keys
|
||||||
*/
|
*/
|
||||||
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
|
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
|
||||||
|
this(settings, name, null, size, queueSize, prefix, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a fixed executor builder.
|
||||||
|
*
|
||||||
|
* @param settings the node-level settings
|
||||||
|
* @param name the name of the executor
|
||||||
|
* @param size the fixed number of threads
|
||||||
|
* @param queueSize the size of the backing queue, -1 for unbounded
|
||||||
|
* @param prefix the prefix for the settings keys
|
||||||
|
*/
|
||||||
|
private FixedExecutorBuilder(
|
||||||
|
final Settings settings,
|
||||||
|
final String name,
|
||||||
|
final String fallbackName,
|
||||||
|
final int size,
|
||||||
|
final int queueSize,
|
||||||
|
final String prefix,
|
||||||
|
final String fallbackPrefix) {
|
||||||
super(name);
|
super(name);
|
||||||
final String sizeKey = settingsKey(prefix, "size");
|
final String sizeKey = settingsKey(prefix, "size");
|
||||||
|
final String queueSizeKey = settingsKey(prefix, "queue_size");
|
||||||
|
if (fallbackName == null) {
|
||||||
|
assert fallbackPrefix == null;
|
||||||
|
final Setting.Property[] properties = {Setting.Property.NodeScope};
|
||||||
|
this.sizeSetting = sizeSetting(settings, name, size, prefix, properties);
|
||||||
|
this.fallbackSizeSetting = null;
|
||||||
|
this.queueSizeSetting = queueSizeSetting(prefix, queueSize, properties);
|
||||||
|
this.fallbackQueueSizeSetting = null;
|
||||||
|
} else {
|
||||||
|
assert fallbackPrefix != null;
|
||||||
|
final Setting.Property[] properties = { Setting.Property.NodeScope };
|
||||||
|
final Setting.Property[] fallbackProperties = { Setting.Property.NodeScope, Setting.Property.Deprecated };
|
||||||
|
final Setting<Integer> fallbackSizeSetting = sizeSetting(settings, fallbackName, size, fallbackPrefix, fallbackProperties);
|
||||||
this.sizeSetting =
|
this.sizeSetting =
|
||||||
new Setting<>(
|
new Setting<>(
|
||||||
|
new Setting.SimpleKey(sizeKey),
|
||||||
|
fallbackSizeSetting,
|
||||||
|
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
|
||||||
|
properties);
|
||||||
|
this.fallbackSizeSetting = fallbackSizeSetting;
|
||||||
|
final Setting<Integer> fallbackQueueSizeSetting = queueSizeSetting(fallbackPrefix, queueSize, fallbackProperties);
|
||||||
|
this.queueSizeSetting =
|
||||||
|
new Setting<>(
|
||||||
|
new Setting.SimpleKey(queueSizeKey),
|
||||||
|
fallbackQueueSizeSetting,
|
||||||
|
s -> Setting.parseInt(s, Integer.MIN_VALUE, queueSizeKey),
|
||||||
|
properties);
|
||||||
|
this.fallbackQueueSizeSetting = fallbackQueueSizeSetting;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Setting<Integer> sizeSetting(
|
||||||
|
final Settings settings, final String name, final int size, final String prefix, final Setting.Property[] properties) {
|
||||||
|
final String sizeKey = settingsKey(prefix, "size");
|
||||||
|
return new Setting<>(
|
||||||
sizeKey,
|
sizeKey,
|
||||||
s -> Integer.toString(size),
|
s -> Integer.toString(size),
|
||||||
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
|
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
|
||||||
Setting.Property.NodeScope);
|
properties);
|
||||||
final String queueSizeKey = settingsKey(prefix, "queue_size");
|
}
|
||||||
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
|
|
||||||
|
private Setting<Integer> queueSizeSetting(final String prefix, final int queueSize, final Setting.Property[] properties) {
|
||||||
|
return Setting.intSetting(settingsKey(prefix, "queue_size"), queueSize, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Setting<?>> getRegisteredSettings() {
|
public List<Setting<?>> getRegisteredSettings() {
|
||||||
|
if (fallbackSizeSetting == null && fallbackQueueSizeSetting == null) {
|
||||||
return Arrays.asList(sizeSetting, queueSizeSetting);
|
return Arrays.asList(sizeSetting, queueSizeSetting);
|
||||||
|
} else {
|
||||||
|
assert fallbackSizeSetting != null && fallbackQueueSizeSetting != null;
|
||||||
|
return Arrays.asList(sizeSetting, fallbackSizeSetting, queueSizeSetting, fallbackQueueSizeSetting);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -94,8 +170,14 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
|
||||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
|
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
|
||||||
final ExecutorService executor =
|
final ExecutorService executor =
|
||||||
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
|
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
|
||||||
|
final String name;
|
||||||
|
if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) {
|
||||||
|
name = "bulk";
|
||||||
|
} else {
|
||||||
|
name = name();
|
||||||
|
}
|
||||||
final ThreadPool.Info info =
|
final ThreadPool.Info info =
|
||||||
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
|
new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
|
||||||
return new ThreadPool.ExecutorHolder(executor, info);
|
return new ThreadPool.ExecutorHolder(executor, info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
|
||||||
public static final String LISTENER = "listener";
|
public static final String LISTENER = "listener";
|
||||||
public static final String GET = "get";
|
public static final String GET = "get";
|
||||||
public static final String ANALYZE = "analyze";
|
public static final String ANALYZE = "analyze";
|
||||||
public static final String BULK = "bulk";
|
public static final String WRITE = "write";
|
||||||
public static final String SEARCH = "search";
|
public static final String SEARCH = "search";
|
||||||
public static final String MANAGEMENT = "management";
|
public static final String MANAGEMENT = "management";
|
||||||
public static final String FLUSH = "flush";
|
public static final String FLUSH = "flush";
|
||||||
|
@ -125,7 +125,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
|
||||||
map.put(Names.LISTENER, ThreadPoolType.FIXED);
|
map.put(Names.LISTENER, ThreadPoolType.FIXED);
|
||||||
map.put(Names.GET, ThreadPoolType.FIXED);
|
map.put(Names.GET, ThreadPoolType.FIXED);
|
||||||
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
|
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
|
||||||
map.put(Names.BULK, ThreadPoolType.FIXED);
|
map.put(Names.WRITE, ThreadPoolType.FIXED);
|
||||||
map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
|
map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
|
||||||
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
|
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
|
||||||
map.put(Names.FLUSH, ThreadPoolType.SCALING);
|
map.put(Names.FLUSH, ThreadPoolType.SCALING);
|
||||||
|
@ -170,7 +170,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
|
||||||
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
|
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
|
||||||
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
|
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
|
||||||
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
|
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
|
||||||
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
|
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
|
||||||
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
|
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
|
||||||
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
|
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
|
||||||
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
|
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
|
||||||
|
@ -264,7 +264,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
|
||||||
public ThreadPoolStats stats() {
|
public ThreadPoolStats stats() {
|
||||||
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
|
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
|
||||||
for (ExecutorHolder holder : executors.values()) {
|
for (ExecutorHolder holder : executors.values()) {
|
||||||
String name = holder.info.getName();
|
final String name = holder.info.getName();
|
||||||
// no need to have info on "same" thread pool
|
// no need to have info on "same" thread pool
|
||||||
if ("same".equals(name)) {
|
if ("same".equals(name)) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -54,8 +54,8 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
|
||||||
// (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
|
// (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
|
||||||
//.put("thread_pool.listener.queue_size", 1)
|
//.put("thread_pool.listener.queue_size", 1)
|
||||||
.put("thread_pool.get.queue_size", 1)
|
.put("thread_pool.get.queue_size", 1)
|
||||||
// default is 50
|
// default is 200
|
||||||
.put("thread_pool.bulk.queue_size", 30)
|
.put("thread_pool.write.queue_size", 30)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||||
super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
|
super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
|
||||||
TransportBulkActionIngestTests.this.clusterService,
|
TransportBulkActionIngestTests.this.clusterService,
|
||||||
null, null, null, new ActionFilters(Collections.emptySet()), null,
|
null, null, null, new ActionFilters(Collections.emptySet()), null,
|
||||||
IndexRequest::new, IndexRequest::new, ThreadPool.Names.BULK, bulkAction, null);
|
IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, bulkAction, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -35,8 +35,8 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase {
|
||||||
protected Settings nodeSettings() {
|
protected Settings nodeSettings() {
|
||||||
return Settings.builder()
|
return Settings.builder()
|
||||||
.put("node.name", "es-thread-pool-executor-tests")
|
.put("node.name", "es-thread-pool-executor-tests")
|
||||||
.put("thread_pool.bulk.size", 1)
|
.put("thread_pool.write.size", 1)
|
||||||
.put("thread_pool.bulk.queue_size", 0)
|
.put("thread_pool.write.queue_size", 0)
|
||||||
.put("thread_pool.search.size", 1)
|
.put("thread_pool.search.size", 1)
|
||||||
.put("thread_pool.search.queue_size", 1)
|
.put("thread_pool.search.queue_size", 1)
|
||||||
.build();
|
.build();
|
||||||
|
@ -44,7 +44,7 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
public void testRejectedExecutionExceptionContainsNodeName() {
|
public void testRejectedExecutionExceptionContainsNodeName() {
|
||||||
// we test a fixed and an auto-queue executor but not scaling since it does not reject
|
// we test a fixed and an auto-queue executor but not scaling since it does not reject
|
||||||
runThreadPoolExecutorTest(1, ThreadPool.Names.BULK);
|
runThreadPoolExecutorTest(1, ThreadPool.Names.WRITE);
|
||||||
runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH);
|
runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -542,7 +542,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ThreadPool.Names.BULK, request);
|
ThreadPool.Names.WRITE, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -69,18 +69,18 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupThreadPool() {
|
public static void setupThreadPool() {
|
||||||
int bulkThreadPoolSize = randomIntBetween(1, 2);
|
int writeThreadPoolSize = randomIntBetween(1, 2);
|
||||||
int bulkThreadPoolQueueSize = randomIntBetween(1, 2);
|
int writeThreadPoolQueueSize = randomIntBetween(1, 2);
|
||||||
threadPool = new TestThreadPool("IndexShardOperationsLockTests",
|
threadPool = new TestThreadPool("IndexShardOperationsLockTests",
|
||||||
Settings.builder()
|
Settings.builder()
|
||||||
.put("thread_pool." + ThreadPool.Names.BULK + ".size", bulkThreadPoolSize)
|
.put("thread_pool." + ThreadPool.Names.WRITE + ".size", writeThreadPoolSize)
|
||||||
.put("thread_pool." + ThreadPool.Names.BULK + ".queue_size", bulkThreadPoolQueueSize)
|
.put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", writeThreadPoolQueueSize)
|
||||||
.build());
|
.build());
|
||||||
assertThat(threadPool.executor(ThreadPool.Names.BULK), instanceOf(EsThreadPoolExecutor.class));
|
assertThat(threadPool.executor(ThreadPool.Names.WRITE), instanceOf(EsThreadPoolExecutor.class));
|
||||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getCorePoolSize(), equalTo(bulkThreadPoolSize));
|
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getCorePoolSize(), equalTo(writeThreadPoolSize));
|
||||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getMaximumPoolSize(), equalTo(bulkThreadPoolSize));
|
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getMaximumPoolSize(), equalTo(writeThreadPoolSize));
|
||||||
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getQueue().remainingCapacity(),
|
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getQueue().remainingCapacity(),
|
||||||
equalTo(bulkThreadPoolQueueSize));
|
equalTo(writeThreadPoolQueueSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -110,8 +110,8 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
||||||
CountDownLatch latch = new CountDownLatch(numThreads / 4);
|
CountDownLatch latch = new CountDownLatch(numThreads / 4);
|
||||||
boolean forceExecution = randomBoolean();
|
boolean forceExecution = randomBoolean();
|
||||||
for (int i = 0; i < numThreads; i++) {
|
for (int i = 0; i < numThreads; i++) {
|
||||||
// the bulk thread pool uses a bounded size and can get rejections, see setupThreadPool
|
// the write thread pool uses a bounded size and can get rejections, see setupThreadPool
|
||||||
String threadPoolName = randomFrom(ThreadPool.Names.BULK, ThreadPool.Names.GENERIC);
|
String threadPoolName = randomFrom(ThreadPool.Names.WRITE, ThreadPool.Names.GENERIC);
|
||||||
boolean failingListener = randomBoolean();
|
boolean failingListener = randomBoolean();
|
||||||
PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
|
PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -284,14 +284,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
closeShards(indexShard);
|
closeShards(indexShard);
|
||||||
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
|
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
|
||||||
try {
|
try {
|
||||||
indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, "");
|
indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "");
|
||||||
fail("we should not be able to increment anymore");
|
fail("we should not be able to increment anymore");
|
||||||
} catch (IndexShardClosedException e) {
|
} catch (IndexShardClosedException e) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null,
|
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null,
|
||||||
ThreadPool.Names.BULK, "");
|
ThreadPool.Names.WRITE, "");
|
||||||
fail("we should not be able to increment anymore");
|
fail("we should not be able to increment anymore");
|
||||||
} catch (IndexShardClosedException e) {
|
} catch (IndexShardClosedException e) {
|
||||||
// expected
|
// expected
|
||||||
|
@ -302,7 +302,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
IndexShard indexShard = newShard(false);
|
IndexShard indexShard = newShard(false);
|
||||||
expectThrows(IndexShardNotStartedException.class, () ->
|
expectThrows(IndexShardNotStartedException.class, () ->
|
||||||
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
|
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
|
||||||
SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.BULK, ""));
|
SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.WRITE, ""));
|
||||||
closeShards(indexShard);
|
closeShards(indexShard);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,7 +342,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ThreadPool.Names.BULK, id);
|
ThreadPool.Names.WRITE, id);
|
||||||
});
|
});
|
||||||
thread.start();
|
thread.start();
|
||||||
threads.add(thread);
|
threads.add(thread);
|
||||||
|
@ -393,7 +393,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ThreadPool.Names.BULK, id);
|
ThreadPool.Names.WRITE, id);
|
||||||
});
|
});
|
||||||
thread.start();
|
thread.start();
|
||||||
delayedThreads.add(thread);
|
delayedThreads.add(thread);
|
||||||
|
@ -589,7 +589,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||||
if (indexShard.routingEntry().isRelocationTarget() == false) {
|
if (indexShard.routingEntry().isRelocationTarget() == false) {
|
||||||
try {
|
try {
|
||||||
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.BULK, "");
|
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.WRITE, "");
|
||||||
fail("shard shouldn't accept operations as replica");
|
fail("shard shouldn't accept operations as replica");
|
||||||
} catch (IllegalStateException ignored) {
|
} catch (IllegalStateException ignored) {
|
||||||
|
|
||||||
|
@ -608,14 +608,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
|
|
||||||
private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
|
private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
|
||||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||||
indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, "");
|
indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, "");
|
||||||
return fut.get();
|
return fut.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
|
private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
|
||||||
throws ExecutionException, InterruptedException {
|
throws ExecutionException, InterruptedException {
|
||||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||||
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.BULK, "");
|
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.WRITE, "");
|
||||||
return fut.get();
|
return fut.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -663,7 +663,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
if (shardRouting.primary() == false) {
|
if (shardRouting.primary() == false) {
|
||||||
final IllegalStateException e =
|
final IllegalStateException e =
|
||||||
expectThrows(IllegalStateException.class,
|
expectThrows(IllegalStateException.class,
|
||||||
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, ""));
|
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, ""));
|
||||||
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
|
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -700,7 +700,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
};
|
};
|
||||||
|
|
||||||
indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired,
|
indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired,
|
||||||
ThreadPool.Names.BULK, "");
|
ThreadPool.Names.WRITE, "");
|
||||||
|
|
||||||
assertFalse(onResponse.get());
|
assertFalse(onResponse.get());
|
||||||
assertTrue(onFailure.get());
|
assertTrue(onFailure.get());
|
||||||
|
@ -1020,7 +1020,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ThreadPool.Names.BULK, "");
|
ThreadPool.Names.WRITE, "");
|
||||||
};
|
};
|
||||||
|
|
||||||
final long firstIncrement = 1 + (randomBoolean() ? 0 : 1);
|
final long firstIncrement = 1 + (randomBoolean() ? 0 : 1);
|
||||||
|
@ -1381,7 +1381,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
super.onResponse(releasable);
|
super.onResponse(releasable);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.BULK, "i_" + i);
|
shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i);
|
||||||
onLockAcquiredActions.add(onLockAcquired);
|
onLockAcquiredActions.add(onLockAcquired);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -113,7 +113,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
|
||||||
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
|
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
|
||||||
final ShardId shardId = shard.shardId();
|
final ShardId shardId = shard.shardId();
|
||||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||||
shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, "");
|
shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, "");
|
||||||
try (Releasable operationLock = fut.get()) {
|
try (Releasable operationLock = fut.get()) {
|
||||||
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
|
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
|
||||||
flushService.attemptSyncedFlush(shardId, listener);
|
flushService.attemptSyncedFlush(shardId, listener);
|
||||||
|
|
|
@ -87,9 +87,9 @@ public class ThreadPoolSerializationTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
|
public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
|
||||||
Settings settings = Settings.builder().put("node.name", "bulk").put("thread_pool.bulk.queue_size", "-1").build();
|
Settings settings = Settings.builder().put("node.name", "write").put("thread_pool.write.queue_size", "-1").build();
|
||||||
ThreadPool threadPool = new ThreadPool(settings);
|
ThreadPool threadPool = new ThreadPool(settings);
|
||||||
assertThat(threadPool.info("bulk").getQueueSize(), is(nullValue()));
|
assertThat(threadPool.info("write").getQueueSize(), is(nullValue()));
|
||||||
terminate(threadPool);
|
terminate(threadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,8 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBulkThreadPoolsMaxSize() {
|
public void testWriteThreadPoolsMaxSize() throws InterruptedException {
|
||||||
|
final String name = Names.WRITE;
|
||||||
final int maxSize = 1 + EsExecutors.numberOfProcessors(Settings.EMPTY);
|
final int maxSize = 1 + EsExecutors.numberOfProcessors(Settings.EMPTY);
|
||||||
final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE);
|
final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
@ -73,7 +74,7 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||||
try {
|
try {
|
||||||
tp = new ThreadPool(Settings.builder()
|
tp = new ThreadPool(Settings.builder()
|
||||||
.put("node.name", "testIndexingThreadPoolsMaxSize")
|
.put("node.name", "testIndexingThreadPoolsMaxSize")
|
||||||
.put("thread_pool." + Names.BULK + ".size", tooBig)
|
.put("thread_pool." + Names.WRITE + ".size", tooBig)
|
||||||
.build());
|
.build());
|
||||||
} finally {
|
} finally {
|
||||||
terminateThreadPoolIfNeeded(tp);
|
terminateThreadPoolIfNeeded(tp);
|
||||||
|
@ -83,11 +84,11 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
|
||||||
assertThat(
|
assertThat(
|
||||||
initial,
|
initial,
|
||||||
hasToString(containsString(
|
hasToString(containsString(
|
||||||
"Failed to parse value [" + tooBig + "] for setting [thread_pool." + Names.BULK + ".size] must be ")));
|
"Failed to parse value [" + tooBig + "] for setting [thread_pool." + Names.WRITE + ".size] must be ")));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
|
private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
|
||||||
if (name.equals(ThreadPool.Names.BULK)) {
|
if (name.equals(ThreadPool.Names.WRITE)) {
|
||||||
return Math.min(size, EsExecutors.numberOfProcessors(settings));
|
return Math.min(size, EsExecutors.numberOfProcessors(settings));
|
||||||
} else {
|
} else {
|
||||||
return size;
|
return size;
|
||||||
|
|
Loading…
Reference in New Issue