diff --git a/docs/reference/cat.asciidoc b/docs/reference/cat.asciidoc index 3dff5abc52d..7a2262b7962 100644 --- a/docs/reference/cat.asciidoc +++ b/docs/reference/cat.asciidoc @@ -93,8 +93,8 @@ Responds with: // TESTRESPONSE[s/9300 27 sLBaIGK/\\d+ \\d+ .+/ _cat] You can also request multiple columns using simple wildcards like -`/_cat/thread_pool?h=ip,bulk.*` to get all headers (or aliases) starting -with `bulk.`. +`/_cat/thread_pool?h=ip,queue*` to get all headers (or aliases) starting +with `queue`. [float] [[numeric-formats]] diff --git a/docs/reference/cat/thread_pool.asciidoc b/docs/reference/cat/thread_pool.asciidoc index a5c71ac271b..306650feb95 100644 --- a/docs/reference/cat/thread_pool.asciidoc +++ b/docs/reference/cat/thread_pool.asciidoc @@ -15,7 +15,6 @@ Which looks like: [source,txt] -------------------------------------------------- node-0 analyze 0 0 0 -node-0 bulk 0 0 0 node-0 fetch_shard_started 0 0 0 node-0 fetch_shard_store 0 0 0 node-0 flush 0 0 0 @@ -28,6 +27,7 @@ node-0 refresh 0 0 0 node-0 search 0 0 0 node-0 snapshot 0 0 0 node-0 warmer 0 0 0 +node-0 write 0 0 0 -------------------------------------------------- // TESTRESPONSE[s/\d+/\\d+/ _cat] @@ -44,7 +44,6 @@ The second column is the thread pool name -------------------------------------------------- name analyze -bulk fetch_shard_started fetch_shard_store flush @@ -57,6 +56,7 @@ refresh search snapshot warmer +write -------------------------------------------------- diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index b85cda1aa36..515959e4ea5 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -27,11 +27,10 @@ There are several thread pools, but the important ones include: `analyze`:: For analyze requests. Thread pool type is `fixed` with a size of 1, queue size of 16. -`bulk`:: - For bulk operations. Thread pool type is `fixed` - with a size of `# of available processors`, - queue_size of `200`. The maximum size for this pool - is `1 + # of available processors`. +`write`:: + For single-document index/delete/update and bulk requests. Thread pool type + is `fixed` with a size of `# of available processors`, queue_size of `200`. + The maximum size for this pool is `1 + # of available processors`. `snapshot`:: For snapshot/restore operations. Thread pool type is `scaling` with a diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index da0dbf2aae3..131c959af8a 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -158,10 +158,10 @@ public class RetryTests extends ESIntegTestCase { final Settings nodeSettings = Settings.builder() // use pools of size 1 so we can block them - .put("thread_pool.bulk.size", 1) + .put("thread_pool.write.size", 1) .put("thread_pool.search.size", 1) // use queues of size 1 because size 0 is broken and because search requests need the queue to function - .put("thread_pool.bulk.queue_size", 1) + .put("thread_pool.write.queue_size", 1) .put("thread_pool.search.queue_size", 1) .put("node.attr.color", "blue") .build(); @@ -203,7 +203,7 @@ public class RetryTests extends ESIntegTestCase { assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L))); logger.info("Blocking bulk and unblocking search so we start to get bulk rejections"); - CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK, node); + CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node); initialSearchBlock.await(); logger.info("Waiting for bulk rejections"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml index d7d33c15ec1..1ce8468cb51 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml @@ -1,6 +1,5 @@ --- "Test cat thread_pool output": - - skip: version: " - 6.99.99" reason: this API was changed in a backwards-incompatible fashion in 7.0.0 so we need to skip in a mixed cluster @@ -33,29 +32,29 @@ - do: cat.thread_pool: - thread_pool_patterns: bulk,management,flush,generic,force_merge + thread_pool_patterns: write,management,flush,generic,force_merge h: id,name,active v: true - match: $body: | /^ id \s+ name \s+ active \n - (\S+\s+ bulk \s+ \d+ \n - \S+\s+ flush \s+ \d+ \n + (\S+\s+ flush \s+ \d+ \n \S+\s+ force_merge \s+ \d+ \n \S+\s+ generic \s+ \d+ \n - \S+\s+ management \s+ \d+ \n)+ $/ + \S+\s+ management \s+ \d+ \n + \S+\s+ write \s+ \d+ \n)+ $/ - do: cat.thread_pool: - thread_pool_patterns: bulk - h: id,name,type,active,pool_size,queue,queue_size,rejected,largest,completed,core,max,size,keep_alive + thread_pool_patterns: write + h: id,name,type,active,size,queue,queue_size,rejected,largest,completed,min,max,keep_alive v: true - match: $body: | - /^ id \s+ name \s+ type \s+ active \s+ pool_size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ core \s+ max \s+ size \s+ keep_alive \n - (\S+ \s+ bulk \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \d* \s+ \S* \n)+ $/ + /^ id \s+ name \s+ type \s+ active \s+ size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ max \s+ keep_alive \n + (\S+ \s+ write \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \S* \n)+ $/ - do: cat.thread_pool: @@ -71,11 +70,11 @@ - do: cat.thread_pool: - thread_pool_patterns: bulk,search + thread_pool_patterns: write,search size: "" - match: $body: | / #node_name name active queue rejected - ^ (\S+ \s+ bulk \s+ \d+ \s+ \d+ \s+ \d+ \n - \S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/ + ^ (\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n + \S+ \s+ write \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index f9b27a1e620..260c75692e1 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -83,7 +83,7 @@ public class TransportShardBulkAction extends TransportWriteAction actionRequests, BiConsumer itemFailureHandler, Consumer completionHandler) { - threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index 5404e7ac3de..3945042db50 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -48,7 +48,7 @@ public abstract class ExecutorBuilder { private final Setting sizeSetting; + private final Setting fallbackSizeSetting; private final Setting queueSizeSetting; + private final Setting fallbackQueueSizeSetting; /** * Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name. @@ -52,6 +55,19 @@ public final class FixedExecutorBuilder extends ExecutorBuilder( - sizeKey, - s -> Integer.toString(size), - s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), - Setting.Property.NodeScope); final String queueSizeKey = settingsKey(prefix, "queue_size"); - this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope); + if (fallbackName == null) { + assert fallbackPrefix == null; + final Setting.Property[] properties = {Setting.Property.NodeScope}; + this.sizeSetting = sizeSetting(settings, name, size, prefix, properties); + this.fallbackSizeSetting = null; + this.queueSizeSetting = queueSizeSetting(prefix, queueSize, properties); + this.fallbackQueueSizeSetting = null; + } else { + assert fallbackPrefix != null; + final Setting.Property[] properties = { Setting.Property.NodeScope }; + final Setting.Property[] fallbackProperties = { Setting.Property.NodeScope, Setting.Property.Deprecated }; + final Setting fallbackSizeSetting = sizeSetting(settings, fallbackName, size, fallbackPrefix, fallbackProperties); + this.sizeSetting = + new Setting<>( + new Setting.SimpleKey(sizeKey), + fallbackSizeSetting, + s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), + properties); + this.fallbackSizeSetting = fallbackSizeSetting; + final Setting fallbackQueueSizeSetting = queueSizeSetting(fallbackPrefix, queueSize, fallbackProperties); + this.queueSizeSetting = + new Setting<>( + new Setting.SimpleKey(queueSizeKey), + fallbackQueueSizeSetting, + s -> Setting.parseInt(s, Integer.MIN_VALUE, queueSizeKey), + properties); + this.fallbackQueueSizeSetting = fallbackQueueSizeSetting; + } + } + + private Setting sizeSetting( + final Settings settings, final String name, final int size, final String prefix, final Setting.Property[] properties) { + final String sizeKey = settingsKey(prefix, "size"); + return new Setting<>( + sizeKey, + s -> Integer.toString(size), + s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), + properties); + } + + private Setting queueSizeSetting(final String prefix, final int queueSize, final Setting.Property[] properties) { + return Setting.intSetting(settingsKey(prefix, "queue_size"), queueSize, properties); } @Override public List> getRegisteredSettings() { - return Arrays.asList(sizeSetting, queueSizeSetting); + if (fallbackSizeSetting == null && fallbackQueueSizeSetting == null) { + return Arrays.asList(sizeSetting, queueSizeSetting); + } else { + assert fallbackSizeSetting != null && fallbackQueueSizeSetting != null; + return Arrays.asList(sizeSetting, fallbackSizeSetting, queueSizeSetting, fallbackQueueSizeSetting); + } } @Override @@ -94,8 +170,14 @@ public final class FixedExecutorBuilder extends ExecutorBuilder stats = new ArrayList<>(); for (ExecutorHolder holder : executors.values()) { - String name = holder.info.getName(); + final String name = holder.info.getName(); // no need to have info on "same" thread pool if ("same".equals(name)) { continue; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 1a07eac1adb..4b96f3d1754 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -54,8 +54,8 @@ public class BulkProcessorRetryIT extends ESIntegTestCase { // (see also ThreadedActionListener which is happily spawning threads even when we already got rejected) //.put("thread_pool.listener.queue_size", 1) .put("thread_pool.get.queue_size", 1) - // default is 50 - .put("thread_pool.bulk.queue_size", 30) + // default is 200 + .put("thread_pool.write.queue_size", 30) .build(); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 5cd411c71b8..bcd16386df3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -124,7 +124,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService, TransportBulkActionIngestTests.this.clusterService, null, null, null, new ActionFilters(Collections.emptySet()), null, - IndexRequest::new, IndexRequest::new, ThreadPool.Names.BULK, bulkAction, null); + IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, bulkAction, null); } @Override diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java index b6110a85ece..33917ceff68 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java @@ -35,8 +35,8 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase { protected Settings nodeSettings() { return Settings.builder() .put("node.name", "es-thread-pool-executor-tests") - .put("thread_pool.bulk.size", 1) - .put("thread_pool.bulk.queue_size", 0) + .put("thread_pool.write.size", 1) + .put("thread_pool.write.queue_size", 0) .put("thread_pool.search.size", 1) .put("thread_pool.search.queue_size", 1) .build(); @@ -44,7 +44,7 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase { public void testRejectedExecutionExceptionContainsNodeName() { // we test a fixed and an auto-queue executor but not scaling since it does not reject - runThreadPoolExecutorTest(1, ThreadPool.Names.BULK); + runThreadPoolExecutorTest(1, ThreadPool.Names.WRITE); runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 9c6fbe72985..16a73f0fa71 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -542,7 +542,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase listener.onFailure(e); } }, - ThreadPool.Names.BULK, request); + ThreadPool.Names.WRITE, request); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 3d961d7f422..27d08b76c03 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -69,18 +69,18 @@ public class IndexShardOperationPermitsTests extends ESTestCase { @BeforeClass public static void setupThreadPool() { - int bulkThreadPoolSize = randomIntBetween(1, 2); - int bulkThreadPoolQueueSize = randomIntBetween(1, 2); + int writeThreadPoolSize = randomIntBetween(1, 2); + int writeThreadPoolQueueSize = randomIntBetween(1, 2); threadPool = new TestThreadPool("IndexShardOperationsLockTests", Settings.builder() - .put("thread_pool." + ThreadPool.Names.BULK + ".size", bulkThreadPoolSize) - .put("thread_pool." + ThreadPool.Names.BULK + ".queue_size", bulkThreadPoolQueueSize) + .put("thread_pool." + ThreadPool.Names.WRITE + ".size", writeThreadPoolSize) + .put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", writeThreadPoolQueueSize) .build()); - assertThat(threadPool.executor(ThreadPool.Names.BULK), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getCorePoolSize(), equalTo(bulkThreadPoolSize)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getMaximumPoolSize(), equalTo(bulkThreadPoolSize)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getQueue().remainingCapacity(), - equalTo(bulkThreadPoolQueueSize)); + assertThat(threadPool.executor(ThreadPool.Names.WRITE), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getCorePoolSize(), equalTo(writeThreadPoolSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getMaximumPoolSize(), equalTo(writeThreadPoolSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getQueue().remainingCapacity(), + equalTo(writeThreadPoolQueueSize)); } @AfterClass @@ -110,8 +110,8 @@ public class IndexShardOperationPermitsTests extends ESTestCase { CountDownLatch latch = new CountDownLatch(numThreads / 4); boolean forceExecution = randomBoolean(); for (int i = 0; i < numThreads; i++) { - // the bulk thread pool uses a bounded size and can get rejections, see setupThreadPool - String threadPoolName = randomFrom(ThreadPool.Names.BULK, ThreadPool.Names.GENERIC); + // the write thread pool uses a bounded size and can get rejections, see setupThreadPool + String threadPoolName = randomFrom(ThreadPool.Names.WRITE, ThreadPool.Names.GENERIC); boolean failingListener = randomBoolean(); PlainActionFuture future = new PlainActionFuture() { @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 59a35009988..9bec4d3cdbf 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -284,14 +284,14 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); try { - indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, ""); + indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, ""); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } try { indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null, - ThreadPool.Names.BULK, ""); + ThreadPool.Names.WRITE, ""); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected @@ -302,7 +302,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () -> indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.BULK, "")); + SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.WRITE, "")); closeShards(indexShard); } @@ -342,7 +342,7 @@ public class IndexShardTests extends IndexShardTestCase { throw new RuntimeException(e); } }, - ThreadPool.Names.BULK, id); + ThreadPool.Names.WRITE, id); }); thread.start(); threads.add(thread); @@ -393,7 +393,7 @@ public class IndexShardTests extends IndexShardTestCase { throw new RuntimeException(e); } }, - ThreadPool.Names.BULK, id); + ThreadPool.Names.WRITE, id); }); thread.start(); delayedThreads.add(thread); @@ -589,7 +589,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, indexShard.getActiveOperationsCount()); if (indexShard.routingEntry().isRelocationTarget() == false) { try { - indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.BULK, ""); + indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.WRITE, ""); fail("shard shouldn't accept operations as replica"); } catch (IllegalStateException ignored) { @@ -608,14 +608,14 @@ public class IndexShardTests extends IndexShardTestCase { private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, ""); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); return fut.get(); } private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.BULK, ""); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.WRITE, ""); return fut.get(); } @@ -663,7 +663,7 @@ public class IndexShardTests extends IndexShardTestCase { if (shardRouting.primary() == false) { final IllegalStateException e = expectThrows(IllegalStateException.class, - () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, "")); + () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "")); assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary"))); } @@ -700,7 +700,7 @@ public class IndexShardTests extends IndexShardTestCase { }; indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired, - ThreadPool.Names.BULK, ""); + ThreadPool.Names.WRITE, ""); assertFalse(onResponse.get()); assertTrue(onFailure.get()); @@ -1020,7 +1020,7 @@ public class IndexShardTests extends IndexShardTestCase { latch.countDown(); } }, - ThreadPool.Names.BULK, ""); + ThreadPool.Names.WRITE, ""); }; final long firstIncrement = 1 + (randomBoolean() ? 0 : 1); @@ -1381,7 +1381,7 @@ public class IndexShardTests extends IndexShardTestCase { super.onResponse(releasable); } }; - shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.BULK, "i_" + i); + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i); onLockAcquiredActions.add(onLockAcquired); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index c71ccdfba8c..3bfcfdd3ab1 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -113,7 +113,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); PlainActionFuture fut = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, ""); + shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); try (Releasable operationLock = fut.get()) { SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java index 3830d10f69b..546017f807a 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java @@ -87,9 +87,9 @@ public class ThreadPoolSerializationTests extends ESTestCase { } public void testThatNegativeSettingAllowsToStart() throws InterruptedException { - Settings settings = Settings.builder().put("node.name", "bulk").put("thread_pool.bulk.queue_size", "-1").build(); + Settings settings = Settings.builder().put("node.name", "write").put("thread_pool.write.queue_size", "-1").build(); ThreadPool threadPool = new ThreadPool(settings); - assertThat(threadPool.info("bulk").getQueueSize(), is(nullValue())); + assertThat(threadPool.info("write").getQueueSize(), is(nullValue())); terminate(threadPool); } diff --git a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index 31142fe9e45..ea281f7d9ae 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -60,7 +60,8 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase { } } - public void testBulkThreadPoolsMaxSize() { + public void testWriteThreadPoolsMaxSize() throws InterruptedException { + final String name = Names.WRITE; final int maxSize = 1 + EsExecutors.numberOfProcessors(Settings.EMPTY); final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE); @@ -73,7 +74,7 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase { try { tp = new ThreadPool(Settings.builder() .put("node.name", "testIndexingThreadPoolsMaxSize") - .put("thread_pool." + Names.BULK + ".size", tooBig) + .put("thread_pool." + Names.WRITE + ".size", tooBig) .build()); } finally { terminateThreadPoolIfNeeded(tp); @@ -83,11 +84,11 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase { assertThat( initial, hasToString(containsString( - "Failed to parse value [" + tooBig + "] for setting [thread_pool." + Names.BULK + ".size] must be "))); + "Failed to parse value [" + tooBig + "] for setting [thread_pool." + Names.WRITE + ".size] must be "))); } private static int getExpectedThreadPoolSize(Settings settings, String name, int size) { - if (name.equals(ThreadPool.Names.BULK)) { + if (name.equals(ThreadPool.Names.WRITE)) { return Math.min(size, EsExecutors.numberOfProcessors(settings)); } else { return size;