Introduce analyze thread pool (#29541)
We want to remove the index thread pool as it is no longer needed since single-document indexing requests are executed as bulk requests now. Analyze requests are also executed on the index thread pool though and they need a thread pool to execute on. The bulk thread does not seem like the right thread pool, let us keep that thread pool conceptually for bulk requests and free for bulk requests. None of the existing thread pools make sense for analyze requests either. The generic thread pool would be a terrible choice since it has an unbounded queue and that is a bad idea for user-facing APIs. This commit introduces a small by default (size=1, queue_size=16) thread pool for analyze requests.
This commit is contained in:
parent
d223bcf7ab
commit
faa7fe86c5
|
@ -14,6 +14,7 @@ Which looks like:
|
|||
|
||||
[source,txt]
|
||||
--------------------------------------------------
|
||||
node-0 analyze 0 0 0
|
||||
node-0 bulk 0 0 0
|
||||
node-0 fetch_shard_started 0 0 0
|
||||
node-0 fetch_shard_store 0 0 0
|
||||
|
@ -43,6 +44,7 @@ The second column is the thread pool name
|
|||
[source,txt]
|
||||
--------------------------------------------------
|
||||
name
|
||||
analyze
|
||||
bulk
|
||||
fetch_shard_started
|
||||
fetch_shard_store
|
||||
|
|
|
@ -30,6 +30,9 @@ There are several thread pools, but the important ones include:
|
|||
with a size of `# of available processors`,
|
||||
queue_size of `1000`.
|
||||
|
||||
`analyze`::
|
||||
For analyze requests. Thread pool type is `fixed` with a size of 1, queue size of 16.
|
||||
|
||||
`bulk`::
|
||||
For bulk operations. Thread pool type is `fixed`
|
||||
with a size of `# of available processors`,
|
||||
|
|
|
@ -85,7 +85,7 @@ public class TransportAnalyzeAction extends TransportSingleShardAction<AnalyzeRe
|
|||
public TransportAnalyzeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
|
||||
IndicesService indicesService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Environment environment) {
|
||||
super(settings, AnalyzeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, AnalyzeRequest::new, ThreadPool.Names.INDEX);
|
||||
super(settings, AnalyzeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, AnalyzeRequest::new, ThreadPool.Names.ANALYZE);
|
||||
this.indicesService = indicesService;
|
||||
this.environment = environment;
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
|
|||
public static final String GENERIC = "generic";
|
||||
public static final String LISTENER = "listener";
|
||||
public static final String GET = "get";
|
||||
public static final String ANALYZE = "analyze";
|
||||
public static final String INDEX = "index";
|
||||
public static final String BULK = "bulk";
|
||||
public static final String SEARCH = "search";
|
||||
|
@ -124,6 +125,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
|
|||
map.put(Names.GENERIC, ThreadPoolType.SCALING);
|
||||
map.put(Names.LISTENER, ThreadPoolType.FIXED);
|
||||
map.put(Names.GET, ThreadPoolType.FIXED);
|
||||
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
|
||||
map.put(Names.INDEX, ThreadPoolType.FIXED);
|
||||
map.put(Names.BULK, ThreadPoolType.FIXED);
|
||||
map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
|
||||
|
@ -173,6 +175,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
|
|||
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
|
||||
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
|
||||
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
|
||||
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
|
||||
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
|
||||
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
|
||||
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
|
||||
|
|
Loading…
Reference in New Issue