git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@551880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonas B. Lim 2007-06-29 13:03:53 +00:00
parent eb9f1d02c7
commit 08b638da13
1 changed files with 60 additions and 52 deletions

View File

@ -70,77 +70,85 @@ public class InactivityMonitor extends TransportFilter {
private void writeCheck() {
if( inSend.get() ) {
log.trace("A send is in progress");
return;
}
if( !commandSent.get() ) {
log.trace("No message sent since last write check, sending a KeepAliveInfo");
try {
next.oneway(new KeepAliveInfo());
} catch (IOException e) {
onException(e);
synchronized(writeChecker) {
if( inSend.get() ) {
log.trace("A send is in progress");
return;
}
} else {
log.trace("Message sent since last write check, resetting flag");
if( !commandSent.get() ) {
log.trace("No message sent since last write check, sending a KeepAliveInfo");
try {
next.oneway(new KeepAliveInfo());
} catch (IOException e) {
onException(e);
}
} else {
log.trace("Message sent since last write check, resetting flag");
}
commandSent.set(false);
}
commandSent.set(false);
}
private void readCheck() {
if( inReceive.get() ) {
log.trace("A receive is in progress");
return;
synchronized(readChecker) {
if( inReceive.get() ) {
log.trace("A receive is in progress");
return;
}
if( !commandReceived.get() ) {
log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
onException(new InactivityIOException("Channel was inactive for too long."));
} else {
log.trace("Message received since last read check, resetting flag: ");
}
commandReceived.set(false);
}
if( !commandReceived.get() ) {
log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
onException(new InactivityIOException("Channel was inactive for too long."));
} else {
log.trace("Message received since last read check, resetting flag: ");
}
commandReceived.set(false);
}
public void onCommand(Object command) {
inReceive.set(true);
try {
if( command.getClass() == WireFormatInfo.class ) {
synchronized( this ) {
remoteWireFormatInfo = (WireFormatInfo) command;
try {
startMonitorThreads();
} catch (IOException e) {
onException(e);
synchronized(readChecker) {
inReceive.set(true);
try {
if( command.getClass() == WireFormatInfo.class ) {
synchronized( this ) {
remoteWireFormatInfo = (WireFormatInfo) command;
try {
startMonitorThreads();
} catch (IOException e) {
onException(e);
}
}
}
transportListener.onCommand(command);
} finally {
inReceive.set(false);
commandReceived.set(true);
}
transportListener.onCommand(command);
} finally {
inReceive.set(false);
commandReceived.set(true);
}
}
public void oneway(Object o) throws IOException {
// Disable inactivity monitoring while processing a command.
inSend.set(true);
commandSent.set(true);
try {
if( o.getClass() == WireFormatInfo.class ) {
synchronized( this ) {
localWireFormatInfo = (WireFormatInfo) o;
startMonitorThreads();
synchronized(writeChecker) {
// Disable inactivity monitoring while processing a command.
inSend.set(true);
commandSent.set(true);
try {
if( o.getClass() == WireFormatInfo.class ) {
synchronized( this ) {
localWireFormatInfo = (WireFormatInfo) o;
startMonitorThreads();
}
}
next.oneway(o);
} finally {
inSend.set(false);
}
next.oneway(o);
} finally {
inSend.set(false);
}
}