ingest: remove ingest threadpool and use index threadpool instead.

To main concern with the dedicated ingest TP is that there are already many TPs and in the case with beefy nodes we would many more threads. In the case ingest isn't used the all these threads are just idle.
This commit is contained in:
Martijn van Groningen 2016-01-14 13:49:50 +01:00
parent 4d88da5ad5
commit 63ee2224f7
2 changed files with 2 additions and 7 deletions

View File

@ -42,7 +42,7 @@ public class PipelineExecutionService {
public void execute(IndexRequest request, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(request.pipeline());
threadPool.executor(ThreadPool.Names.INGEST).execute(() -> {
threadPool.executor(ThreadPool.Names.INDEX).execute(() -> {
try {
innerExecute(request, pipeline);
completionHandler.accept(true);
@ -54,7 +54,7 @@ public class PipelineExecutionService {
public void execute(Iterable<ActionRequest> actionRequests,
BiConsumer<IndexRequest, Throwable> itemFailureHandler, Consumer<Boolean> completionHandler) {
threadPool.executor(ThreadPool.Names.INGEST).execute(() -> {
threadPool.executor(ThreadPool.Names.INDEX).execute(() -> {
for (ActionRequest actionRequest : actionRequests) {
if ((actionRequest instanceof IndexRequest)) {
IndexRequest indexRequest = (IndexRequest) actionRequest;

View File

@ -88,7 +88,6 @@ public class ThreadPool extends AbstractComponent {
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
public static final String INGEST = "ingest"; //TODO(simonw): wow what is the reason for having yet another threadpool? I really think we should just use index for this.
}
public enum ThreadPoolType {
@ -147,7 +146,6 @@ public class ThreadPool extends AbstractComponent {
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
map.put(Names.INGEST, ThreadPoolType.FIXED);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
}
@ -237,9 +235,6 @@ public class ThreadPool extends AbstractComponent {
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m"));
if (IngestModule.isIngestEnabled(settings)) {
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INGEST).size(availableProcessors).queueSize(200));
}
this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings);