From eebf8f8f60ddc7a079f3fa3aa4f2a9b5b395e482 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 24 Feb 2012 02:09:33 +0200 Subject: [PATCH] add active thread pool count to thread pool stats --- .../common/util/concurrent/EsExecutors.java | 14 +++--- .../util/concurrent/EsThreadPoolExecutor.java | 44 +++++++++++++++++++ .../elasticsearch/threadpool/ThreadPool.java | 18 +++++--- .../threadpool/ThreadPoolStats.java | 18 +++++++- 4 files changed, 81 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 6c32b4832ca..599e44146ae 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -29,21 +29,21 @@ import java.util.concurrent.*; */ public class EsExecutors { - public static ThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, - ThreadFactory threadFactory) { + public static EsThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory) { ExecutorScalingQueue queue = new ExecutorScalingQueue(); // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue - ThreadPoolExecutor executor = new ThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, + EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy()); queue.executor = executor; return executor; } - public static ThreadPoolExecutor newBlockingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, - ThreadFactory threadFactory, int capacity, - long waitTime, TimeUnit waitTimeUnit) { + public static EsThreadPoolExecutor newBlockingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory, int capacity, + long waitTime, TimeUnit waitTimeUnit) { ExecutorBlockingQueue queue = new ExecutorBlockingQueue(capacity); - ThreadPoolExecutor executor = new ThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, + EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, new TimedBlockingPolicy(waitTimeUnit.toMillis(waitTime))); queue.executor = executor; return executor; diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java new file mode 100644 index 00000000000..3ba3f387d90 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -0,0 +1,44 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import java.util.concurrent.*; + +/** + * An extension to thread pool executor, allowing (in the future) to add specific additional stats to it. + */ +public class EsThreadPoolExecutor extends ThreadPoolExecutor { + + public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } +} diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 6038dd8a89f..8bebb5287f8 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; @@ -132,9 +133,16 @@ public class ThreadPool extends AbstractComponent { if ("same".equals(name)) { continue; } - int threads = ((ThreadPoolExecutor) holder.executor).getPoolSize(); - int queue = ((ThreadPoolExecutor) holder.executor).getQueue().size(); - stats.add(new ThreadPoolStats.Stats(name, threads, queue)); + int threads = -1; + int queue = -1; + int active = -1; + if (holder.executor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor; + threads = threadPoolExecutor.getPoolSize(); + queue = threadPoolExecutor.getQueue().size(); + active = threadPoolExecutor.getActiveCount(); + } + stats.add(new ThreadPoolStats.Stats(name, threads, queue, active)); } return new ThreadPoolStats(stats); } @@ -210,7 +218,7 @@ public class ThreadPool extends AbstractComponent { } else if ("cached".equals(type)) { TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); - Executor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + Executor executor = new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAlive.millis(), TimeUnit.MILLISECONDS, new SynchronousQueue(), threadFactory); @@ -228,7 +236,7 @@ public class ThreadPool extends AbstractComponent { throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool"); } logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, capacity, rejectSetting); - Executor executor = new ThreadPoolExecutor(size, size, + Executor executor = new EsThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, capacity == null ? new LinkedTransferQueue() : new ArrayBlockingQueue((int) capacity.singles()), threadFactory, rejectedExecutionHandler); diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java b/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java index 564a0ba2847..d896dd6a0ea 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java @@ -40,15 +40,17 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable