mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3885 - allow setting resjected task handler on thread pool executor
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1365943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6b305ba498
commit
e3fcf71ca6
|
@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -203,6 +204,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
private boolean nonBlockingRedelivery = false;
|
||||
|
||||
private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
|
||||
private RejectedExecutionHandler rejectedTaskHandler = null;
|
||||
|
||||
/**
|
||||
* Construct an <code>ActiveMQConnection</code>
|
||||
|
@ -985,9 +987,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
}
|
||||
|
||||
public TaskRunnerFactory getSessionTaskRunner() {
|
||||
System.out.println(maxThreadPoolSize);
|
||||
synchronized (this) {
|
||||
if (sessionTaskRunner == null) {
|
||||
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
|
||||
sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
|
||||
}
|
||||
}
|
||||
return sessionTaskRunner;
|
||||
|
@ -2595,4 +2599,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
this.queueOnlyConnection = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RejectedExecutionHandler getRejectedTaskHandler() {
|
||||
return rejectedTaskHandler;
|
||||
}
|
||||
|
||||
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
|
||||
this.rejectedTaskHandler = rejectedTaskHandler;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
|
@ -131,6 +132,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
private boolean nonBlockingRedelivery = false;
|
||||
private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
|
||||
private TaskRunnerFactory sessionTaskRunner;
|
||||
private RejectedExecutionHandler rejectedTaskHandler = null;
|
||||
|
||||
// /////////////////////////////////////////////
|
||||
//
|
||||
|
@ -344,6 +346,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
|
||||
connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
|
||||
connection.setSessionTaskRunner(getSessionTaskRunner());
|
||||
connection.setRejectedTaskHandler(getRejectedTaskHandler());
|
||||
if (transportListener != null) {
|
||||
connection.addTransportListener(transportListener);
|
||||
}
|
||||
|
@ -1116,4 +1119,12 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
|
||||
this.sessionTaskRunner = sessionTaskRunner;
|
||||
}
|
||||
|
||||
public RejectedExecutionHandler getRejectedTaskHandler() {
|
||||
return rejectedTaskHandler;
|
||||
}
|
||||
|
||||
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
|
||||
this.rejectedTaskHandler = rejectedTaskHandler;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.activemq.thread;
|
|||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -46,6 +46,7 @@ public class TaskRunnerFactory implements Executor {
|
|||
private boolean dedicatedTaskRunner;
|
||||
private AtomicBoolean initDone = new AtomicBoolean(false);
|
||||
private int maxThreadPoolSize = Integer.MAX_VALUE;
|
||||
private RejectedExecutionHandler rejectedTaskHandler = null;
|
||||
|
||||
public TaskRunnerFactory() {
|
||||
this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
|
||||
|
@ -118,6 +119,9 @@ public class TaskRunnerFactory implements Executor {
|
|||
return thread;
|
||||
}
|
||||
});
|
||||
if (rejectedTaskHandler != null) {
|
||||
rc.setRejectedExecutionHandler(rejectedTaskHandler);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -176,4 +180,12 @@ public class TaskRunnerFactory implements Executor {
|
|||
public void setMaxThreadPoolSize(int maxThreadPoolSize) {
|
||||
this.maxThreadPoolSize = maxThreadPoolSize;
|
||||
}
|
||||
|
||||
public RejectedExecutionHandler getRejectedTaskHandler() {
|
||||
return rejectedTaskHandler;
|
||||
}
|
||||
|
||||
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
|
||||
this.rejectedTaskHandler = rejectedTaskHandler;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue