Return 503 when threadpool limit is reached, closes #2048.

This commit is contained in:
Shay Banon 2012-06-23 17:26:50 +02:00
parent efe85f322a
commit 3163499aef
5 changed files with 50 additions and 42 deletions

View File

@ -0,0 +1,35 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
*/
public class EsAbortPolicy implements RejectedExecutionHandler {
public static final EsAbortPolicy INSTANCE = new EsAbortPolicy();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new EsRejectedExecutionException();
}
}

View File

@ -150,7 +150,7 @@ public class EsExecutors {
executor.getQueue().put(r);
} catch (InterruptedException e) {
//should never happen since we never wait
throw new RejectedExecutionException(e);
throw new EsRejectedExecutionException(e);
}
}
}
@ -174,10 +174,9 @@ public class EsExecutors {
try {
boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
if (!successful)
throw new RejectedExecutionException("Rejected execution after waiting "
+ waitTime + " ms for task [" + r.getClass() + "] to be executed.");
throw new EsRejectedExecutionException();
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
throw new EsRejectedExecutionException(e);
}
}
}

View File

@ -17,21 +17,25 @@
* under the License.
*/
package org.elasticsearch.threadpool;
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.rest.RestStatus;
/**
*/
public class ThreadPoolRejectedException extends ElasticSearchException {
public class EsRejectedExecutionException extends ElasticSearchException {
public ThreadPoolRejectedException() {
super("rejected");
public EsRejectedExecutionException() {
super(null);
}
public EsRejectedExecutionException(Throwable e) {
super(null, e);
}
@Override
public RestStatus status() {
return RestStatus.FORBIDDEN;
return RestStatus.SERVICE_UNAVAILABLE;
}
}

View File

@ -26,16 +26,8 @@ import java.util.concurrent.*;
*/
public class EsThreadPoolExecutor extends ThreadPoolExecutor {
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, EsAbortPolicy.INSTANCE);
}
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.xcontent.ToXContent;
@ -233,7 +234,7 @@ public class ThreadPool extends AbstractComponent {
RejectedExecutionHandler rejectedExecutionHandler;
String rejectSetting = settings.get("reject_policy", defaultSettings.get("reject_policy", "abort"));
if ("abort".equals(rejectSetting)) {
rejectedExecutionHandler = new AbortPolicy();
rejectedExecutionHandler = EsAbortPolicy.INSTANCE;
} else if ("caller".equals(rejectSetting)) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
} else {
@ -367,29 +368,6 @@ public class ThreadPool extends AbstractComponent {
}
}
/**
* A handler for rejected tasks that throws a
* <tt>RejectedExecutionException</tt>.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an <tt>AbortPolicy</tt>.
*/
public AbortPolicy() {
}
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new ThreadPoolRejectedException();
}
}
static class ExecutorHolder {
public final Executor executor;
public final Info info;