From 89ac4d108abab7d6c14bf3b53f266e18aa6deed9 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 10 Feb 2011 00:05:08 +0200 Subject: [PATCH] make forkjoin an option in fixed thread pool --- .../threadpool/fixed/FixedThreadPool.java | 24 +++-- .../forkjoin/ForkJoinThreadPool.java | 93 ------------------- 2 files changed, 18 insertions(+), 99 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/forkjoin/ForkJoinThreadPool.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPool.java index 274dcd8b6c7..a08546106fd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPool.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.jsr166y.ForkJoinPool; import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.threadpool.support.AbstractThreadPool; @@ -54,10 +55,15 @@ public class FixedThreadPool extends AbstractThreadPool { this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1); logger.debug("Initializing {} thread pool with [{}] threads, keep_alive[{}], scheduled_size[{}]", getType(), size, keepAlive, scheduledSize); scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); - executorService = new ThreadPoolExecutor(size, size, - 0L, TimeUnit.MILLISECONDS, - new LinkedTransferQueue(), - EsExecutors.daemonThreadFactory(settings, "[tp]")); + String type = componentSettings.get("type"); + if ("forkjoin".equalsIgnoreCase(type)) { + executorService = new ForkJoinPool(size, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); + } else { + executorService = new ThreadPoolExecutor(size, size, + 0L, TimeUnit.MILLISECONDS, + new LinkedTransferQueue(), + EsExecutors.daemonThreadFactory(settings, "[tp]")); + } cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]")); started = true; @@ -76,11 +82,17 @@ public class FixedThreadPool extends AbstractThreadPool { } @Override public int getPoolSize() { - return ((ThreadPoolExecutor) executorService).getPoolSize(); + if (executorService instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor) executorService).getPoolSize(); + } + return -1; } @Override public int getActiveCount() { - return ((ThreadPoolExecutor) executorService).getActiveCount(); + if (executorService instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor) executorService).getActiveCount(); + } + return -1; } @Override public int getSchedulerPoolSize() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/forkjoin/ForkJoinThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/forkjoin/ForkJoinThreadPool.java deleted file mode 100644 index 06247e0da3f..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/forkjoin/ForkJoinThreadPool.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.forkjoin; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.jsr166y.ForkJoinPool; -import org.elasticsearch.threadpool.support.AbstractThreadPool; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; - -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; -import static org.elasticsearch.common.unit.TimeValue.*; - -/** - * - */ -public class ForkjoinThreadPool extends AbstractThreadPool { - - final int parallelism; - final TimeValue keepAlive; - - final int scheduledSize; - - public ForkjoinThreadPool() { - this(EMPTY_SETTINGS); - } - - @Inject public ForkjoinThreadPool(Settings settings) { - super(settings); - this.parallelism = componentSettings.getAsInt("parallelism", Runtime.getRuntime().availableProcessors() * 5); - this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueMinutes(5)); - this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1); - logger.debug("Initializing {} thread pool with parallelism[{}], keep_alive[{}], scheduled_size[{}]", getType(), parallelism, keepAlive, scheduledSize); - scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); -// executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]")); - executorService = new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); - cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]")); - started = true; - } - - @Override public int getMinThreads() { - return parallelism; - } - - @Override public int getMaxThreads() { - return parallelism; - } - - @Override public int getSchedulerThreads() { - return scheduledSize; - } - - @Override public int getPoolSize() { - return -1; - } - - @Override public int getActiveCount() { - return -1; - } - - @Override public int getSchedulerPoolSize() { - return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); - } - - @Override public int getSchedulerActiveCount() { - return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); - } - - @Override public String getType() { - return "forkjoin"; - } -} \ No newline at end of file