add rejected metric to thread pool stats

This commit is contained in:
Shay Banon 2012-07-11 10:54:15 +02:00
parent 83323f2c88
commit de5068388e
6 changed files with 80 additions and 9 deletions

View File

@ -19,17 +19,24 @@
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.RejectedExecutionHandler;
import org.elasticsearch.common.metrics.CounterMetric;
import java.util.concurrent.ThreadPoolExecutor;
/**
*/
public class EsAbortPolicy implements RejectedExecutionHandler {
public class EsAbortPolicy implements XRejectedExecutionHandler {
public static final EsAbortPolicy INSTANCE = new EsAbortPolicy();
private final CounterMetric rejected = new CounterMetric();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of [" + r.getClass().getName() + "]");
}
@Override
public long rejected() {
return rejected.count();
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.util.concurrent;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.*;
@ -154,7 +155,9 @@ public class EsExecutors {
* queue, waiting if necessary up to the specified wait time for space to become
* available.
*/
static class TimedBlockingPolicy implements RejectedExecutionHandler {
static class TimedBlockingPolicy implements XRejectedExecutionHandler {
private final CounterMetric rejected = new CounterMetric();
private final long waitTime;
/**
@ -167,11 +170,18 @@ public class EsExecutors {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
if (!successful)
if (!successful) {
rejected.inc();
throw new EsRejectedExecutionException();
}
} catch (InterruptedException e) {
throw new EsRejectedExecutionException(e);
}
}
@Override
public long rejected() {
return rejected.count();
}
}
}

View File

@ -27,7 +27,7 @@ import java.util.concurrent.*;
public class EsThreadPoolExecutor extends ThreadPoolExecutor {
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, EsAbortPolicy.INSTANCE);
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy());
}
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

View File

@ -0,0 +1,32 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.RejectedExecutionHandler;
/**
*/
public interface XRejectedExecutionHandler extends RejectedExecutionHandler {
/**
* The number of rejected executions.
*/
long rejected();
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -141,13 +142,18 @@ public class ThreadPool extends AbstractComponent {
int threads = -1;
int queue = -1;
int active = -1;
long rejected = -1;
if (holder.executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor;
threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
active = threadPoolExecutor.getActiveCount();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active));
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected));
}
return new ThreadPoolStats(stats);
}
@ -234,7 +240,7 @@ public class ThreadPool extends AbstractComponent {
RejectedExecutionHandler rejectedExecutionHandler;
String rejectSetting = settings.get("reject_policy", defaultSettings.get("reject_policy", "abort"));
if ("abort".equals(rejectSetting)) {
rejectedExecutionHandler = EsAbortPolicy.INSTANCE;
rejectedExecutionHandler = new EsAbortPolicy();
} else if ("caller".equals(rejectSetting)) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
} else {

View File

@ -41,16 +41,18 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
private int threads;
private int queue;
private int active;
private long rejected;
Stats() {
}
public Stats(String name, int threads, int queue, int active) {
public Stats(String name, int threads, int queue, int active, long rejected) {
this.name = name;
this.threads = threads;
this.queue = queue;
this.active = active;
this.rejected = rejected;
}
public String name() {
@ -85,12 +87,21 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
return this.active;
}
public long rejected() {
return rejected;
}
public long getRejected() {
return rejected;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readUTF();
threads = in.readInt();
queue = in.readInt();
active = in.readInt();
rejected = in.readLong();
}
@Override
@ -99,6 +110,7 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
out.writeInt(threads);
out.writeInt(queue);
out.writeInt(active);
out.writeLong(rejected);
}
@Override
@ -113,6 +125,9 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
if (active != -1) {
builder.field(Fields.ACTIVE, active);
}
if (rejected != -1) {
builder.field(Fields.REJECTED, rejected);
}
builder.endObject();
return builder;
}
@ -163,6 +178,7 @@ public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadP
static final XContentBuilderString THREADS = new XContentBuilderString("threads");
static final XContentBuilderString QUEUE = new XContentBuilderString("queue");
static final XContentBuilderString ACTIVE = new XContentBuilderString("active");
static final XContentBuilderString REJECTED = new XContentBuilderString("rejected");
}
@Override