mirror of https://github.com/apache/activemq.git
AMQ-4067: Prefix thread names with ActiveMQ
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1387989 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6f2ac637cd
commit
c6595066e9
|
@ -1181,7 +1181,7 @@ public class BrokerService implements Service {
|
||||||
|
|
||||||
public TaskRunnerFactory getTaskRunnerFactory() {
|
public TaskRunnerFactory getTaskRunnerFactory() {
|
||||||
if (this.taskRunnerFactory == null) {
|
if (this.taskRunnerFactory == null) {
|
||||||
this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
|
this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
|
||||||
isDedicatedTaskRunner());
|
isDedicatedTaskRunner());
|
||||||
}
|
}
|
||||||
return this.taskRunnerFactory;
|
return this.taskRunnerFactory;
|
||||||
|
@ -2550,7 +2550,7 @@ public class BrokerService implements Service {
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable runnable) {
|
public Thread newThread(Runnable runnable) {
|
||||||
this.i++;
|
this.i++;
|
||||||
Thread thread = new Thread(runnable, "BrokerService.worker." + this.i);
|
Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
|
||||||
thread.setDaemon(true);
|
thread.setDaemon(true);
|
||||||
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -94,6 +94,11 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
|
||||||
lastRunTime = now;
|
lastRunTime = now;
|
||||||
readCheck();
|
readCheck();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ReadChecker";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private boolean allowReadCheck(long elapsed) {
|
private boolean allowReadCheck(long elapsed) {
|
||||||
|
@ -111,6 +116,11 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
|
||||||
lastRunTime = now;
|
lastRunTime = now;
|
||||||
writeCheck();
|
writeCheck();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "WriteChecker";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
|
public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
|
||||||
|
@ -142,6 +152,9 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
|
||||||
}
|
}
|
||||||
ASYNC_TASKS.execute(new Runnable() {
|
ASYNC_TASKS.execute(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Running {}", this);
|
||||||
|
}
|
||||||
if (monitorStarted.get()) {
|
if (monitorStarted.get()) {
|
||||||
try {
|
try {
|
||||||
// If we can't get the lock it means another write beat us into the
|
// If we can't get the lock it means another write beat us into the
|
||||||
|
@ -159,6 +172,11 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "WriteCheck[" + getRemoteAddress() + "]";
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
@ -185,7 +203,15 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
|
||||||
}
|
}
|
||||||
ASYNC_TASKS.execute(new Runnable() {
|
ASYNC_TASKS.execute(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Running {}", this);
|
||||||
|
}
|
||||||
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
|
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ReadCheck[" + getRemoteAddress() + "]";
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
@ -332,8 +358,8 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
|
||||||
synchronized(AbstractInactivityMonitor.class) {
|
synchronized(AbstractInactivityMonitor.class) {
|
||||||
if( CHECKER_COUNTER == 0 ) {
|
if( CHECKER_COUNTER == 0 ) {
|
||||||
ASYNC_TASKS = createExecutor();
|
ASYNC_TASKS = createExecutor();
|
||||||
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
|
READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer",true);
|
||||||
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
|
WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer",true);
|
||||||
}
|
}
|
||||||
CHECKER_COUNTER++;
|
CHECKER_COUNTER++;
|
||||||
if (readCheckTime > 0) {
|
if (readCheckTime > 0) {
|
||||||
|
@ -374,13 +400,15 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
|
||||||
|
|
||||||
private ThreadFactory factory = new ThreadFactory() {
|
private ThreadFactory factory = new ThreadFactory() {
|
||||||
public Thread newThread(Runnable runnable) {
|
public Thread newThread(Runnable runnable) {
|
||||||
Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
|
Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
|
||||||
thread.setDaemon(true);
|
thread.setDaemon(true);
|
||||||
return thread;
|
return thread;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private ThreadPoolExecutor createExecutor() {
|
private ThreadPoolExecutor createExecutor() {
|
||||||
|
// TODO: This value of 10 seconds seems to low, see discussion at
|
||||||
|
// http://activemq.2283324.n4.nabble.com/InactivityMonitor-Creating-too-frequent-threads-tp4656752.html;cid=1348142445209-351
|
||||||
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
|
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
|
||||||
exec.allowCoreThreadTimeOut(true);
|
exec.allowCoreThreadTimeOut(true);
|
||||||
return exec;
|
return exec;
|
||||||
|
|
Loading…
Reference in New Issue