mirror of https://github.com/apache/activemq.git
handle the inactivity actions in an async thread so that the schedular thread does not get blocked.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@599993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d83969f6a8
commit
1ba3532365
|
@ -74,13 +74,18 @@ public class InactivityMonitor extends TransportFilter {
|
||||||
|
|
||||||
if (!commandSent.get()) {
|
if (!commandSent.get()) {
|
||||||
LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
|
LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
|
||||||
try {
|
// TODO: use a thread pool for this..
|
||||||
synchronized (writeChecker) {
|
Thread thread = new Thread("ActiveMQ: Activity Generator: "+next.getRemoteAddress()) {
|
||||||
next.oneway(new KeepAliveInfo());
|
public void run() {
|
||||||
}
|
try {
|
||||||
} catch (IOException e) {
|
oneway(new KeepAliveInfo());
|
||||||
onException(e);
|
} catch (IOException e) {
|
||||||
}
|
onException(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
thread.setDaemon(true);
|
||||||
|
thread.start();
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("Message sent since last write check, resetting flag");
|
LOG.trace("Message sent since last write check, resetting flag");
|
||||||
}
|
}
|
||||||
|
@ -96,9 +101,18 @@ public class InactivityMonitor extends TransportFilter {
|
||||||
|
|
||||||
if (!commandReceived.get()) {
|
if (!commandReceived.get()) {
|
||||||
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
|
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
|
||||||
synchronized (readChecker) {
|
|
||||||
onException(new InactivityIOException("Channel was inactive for too long."));
|
// TODO: use a thread pool for this..
|
||||||
}
|
Thread thread = new Thread("ActiveMQ: Inactivity Handler: "+next.getRemoteAddress()) {
|
||||||
|
public void run() {
|
||||||
|
synchronized (readChecker) {
|
||||||
|
onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
thread.setDaemon(true);
|
||||||
|
thread.start();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("Message received since last read check, resetting flag: ");
|
LOG.trace("Message received since last read check, resetting flag: ");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue