From 30510a83204d032c369826e8a400bc32cf9b3111 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 9 Feb 2011 23:47:38 +0200 Subject: [PATCH] add forkjoin TP --- .../forkjoin/ForkJoinThreadPool.java | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/forkjoin/ForkJoinThreadPool.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/forkjoin/ForkJoinThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/forkjoin/ForkJoinThreadPool.java new file mode 100644 index 00000000000..06247e0da3f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/forkjoin/ForkJoinThreadPool.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool.forkjoin; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.jsr166y.ForkJoinPool; +import org.elasticsearch.threadpool.support.AbstractThreadPool; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; +import static org.elasticsearch.common.unit.TimeValue.*; + +/** + * + */ +public class ForkjoinThreadPool extends AbstractThreadPool { + + final int parallelism; + final TimeValue keepAlive; + + final int scheduledSize; + + public ForkjoinThreadPool() { + this(EMPTY_SETTINGS); + } + + @Inject public ForkjoinThreadPool(Settings settings) { + super(settings); + this.parallelism = componentSettings.getAsInt("parallelism", Runtime.getRuntime().availableProcessors() * 5); + this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueMinutes(5)); + this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1); + logger.debug("Initializing {} thread pool with parallelism[{}], keep_alive[{}], scheduled_size[{}]", getType(), parallelism, keepAlive, scheduledSize); + scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); +// executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]")); + executorService = new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); + cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]")); + started = true; + } + + @Override public int getMinThreads() { + return parallelism; + } + + @Override public int getMaxThreads() { + return parallelism; + } + + @Override public int getSchedulerThreads() { + return scheduledSize; + } + + @Override public int getPoolSize() { + return -1; + } + + @Override public int getActiveCount() { + return -1; + } + + @Override public int getSchedulerPoolSize() { + return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); + } + + @Override public int getSchedulerActiveCount() { + return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); + } + + @Override public String getType() { + return "forkjoin"; + } +} \ No newline at end of file