add active thread pool count to thread pool stats

This commit is contained in:
Shay Banon 2012-02-24 02:09:33 +02:00
parent 7ed68a5c30
commit eebf8f8f60
4 changed files with 81 additions and 13 deletions

View File

@ -29,21 +29,21 @@ import java.util.concurrent.*;
*/ */
public class EsExecutors { public class EsExecutors {
public static ThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, public static EsThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) { ThreadFactory threadFactory) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<Runnable>(); ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<Runnable>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
ThreadPoolExecutor executor = new ThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory,
new ForceQueuePolicy()); new ForceQueuePolicy());
queue.executor = executor; queue.executor = executor;
return executor; return executor;
} }
public static ThreadPoolExecutor newBlockingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, public static EsThreadPoolExecutor newBlockingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, int capacity, ThreadFactory threadFactory, int capacity,
long waitTime, TimeUnit waitTimeUnit) { long waitTime, TimeUnit waitTimeUnit) {
ExecutorBlockingQueue<Runnable> queue = new ExecutorBlockingQueue<Runnable>(capacity); ExecutorBlockingQueue<Runnable> queue = new ExecutorBlockingQueue<Runnable>(capacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory,
new TimedBlockingPolicy(waitTimeUnit.toMillis(waitTime))); new TimedBlockingPolicy(waitTimeUnit.toMillis(waitTime)));
queue.executor = executor; queue.executor = executor;
return executor; return executor;

View File

@ -0,0 +1,44 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.*;
/**
* An extension to thread pool executor, allowing (in the future) to add specific additional stats to it.
*/
public class EsThreadPoolExecutor extends ThreadPoolExecutor {
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -132,9 +133,16 @@ public class ThreadPool extends AbstractComponent {
if ("same".equals(name)) { if ("same".equals(name)) {
continue; continue;
} }
int threads = ((ThreadPoolExecutor) holder.executor).getPoolSize(); int threads = -1;
int queue = ((ThreadPoolExecutor) holder.executor).getQueue().size(); int queue = -1;
stats.add(new ThreadPoolStats.Stats(name, threads, queue)); int active = -1;
if (holder.executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor;
threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
active = threadPoolExecutor.getActiveCount();
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active));
} }
return new ThreadPoolStats(stats); return new ThreadPoolStats(stats);
} }
@ -210,7 +218,7 @@ public class ThreadPool extends AbstractComponent {
} else if ("cached".equals(type)) { } else if ("cached".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
Executor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, Executor executor = new EsThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS, keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new SynchronousQueue<Runnable>(),
threadFactory); threadFactory);
@ -228,7 +236,7 @@ public class ThreadPool extends AbstractComponent {
throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool"); throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool");
} }
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, capacity, rejectSetting); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, capacity, rejectSetting);
Executor executor = new ThreadPoolExecutor(size, size, Executor executor = new EsThreadPoolExecutor(size, size,
0L, TimeUnit.MILLISECONDS, 0L, TimeUnit.MILLISECONDS,
capacity == null ? new LinkedTransferQueue<Runnable>() : new ArrayBlockingQueue<Runnable>((int) capacity.singles()), capacity == null ? new LinkedTransferQueue<Runnable>() : new ArrayBlockingQueue<Runnable>((int) capacity.singles()),
threadFactory, rejectedExecutionHandler); threadFactory, rejectedExecutionHandler);

View File

@ -40,15 +40,17 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
private String name; private String name;
private int threads; private int threads;
private int queue; private int queue;
private int active;
Stats() { Stats() {
} }
public Stats(String name, int threads, int queue) { public Stats(String name, int threads, int queue, int active) {
this.name = name; this.name = name;
this.threads = threads; this.threads = threads;
this.queue = queue; this.queue = queue;
this.active = active;
} }
public String name() { public String name() {
@ -75,11 +77,20 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
return this.queue; return this.queue;
} }
public int active() {
return this.active;
}
public int getActive() {
return this.active;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
name = in.readUTF(); name = in.readUTF();
threads = in.readInt(); threads = in.readInt();
queue = in.readInt(); queue = in.readInt();
active = in.readInt();
} }
@Override @Override
@ -87,6 +98,7 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
out.writeUTF(name); out.writeUTF(name);
out.writeInt(threads); out.writeInt(threads);
out.writeInt(queue); out.writeInt(queue);
out.writeInt(active);
} }
@Override @Override
@ -98,6 +110,9 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
if (queue != -1) { if (queue != -1) {
builder.field(Fields.QUEUE, queue); builder.field(Fields.QUEUE, queue);
} }
if (active != -1) {
builder.field(Fields.ACTIVE, active);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -147,6 +162,7 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
static final XContentBuilderString THREAD_POOL = new XContentBuilderString("thread_pool"); static final XContentBuilderString THREAD_POOL = new XContentBuilderString("thread_pool");
static final XContentBuilderString THREADS = new XContentBuilderString("threads"); static final XContentBuilderString THREADS = new XContentBuilderString("threads");
static final XContentBuilderString QUEUE = new XContentBuilderString("queue"); static final XContentBuilderString QUEUE = new XContentBuilderString("queue");
static final XContentBuilderString ACTIVE = new XContentBuilderString("active");
} }
@Override @Override