applied patch for AMQ-1146

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@546482 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-06-12 13:42:52 +00:00
parent 0eae2880cc
commit a6cb80ceb0
1 changed files with 78 additions and 70 deletions

View File

@ -27,14 +27,14 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Used to make sure that commands are arriving periodically from the peer of the transport. * Used to make sure that commands are arriving periodically from the peer of the transport.
* *
* @version $Revision$ * @version $Revision$
*/ */
public class InactivityMonitor extends TransportFilter { public class InactivityMonitor extends TransportFilter {
private final Log log = LogFactory.getLog(InactivityMonitor.class); private final Log log = LogFactory.getLog(InactivityMonitor.class);
private WireFormatInfo localWireFormatInfo; private WireFormatInfo localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo; private WireFormatInfo remoteWireFormatInfo;
private final AtomicBoolean monitorStarted= new AtomicBoolean(false); private final AtomicBoolean monitorStarted= new AtomicBoolean(false);
@ -44,20 +44,20 @@ public class InactivityMonitor extends TransportFilter {
private final AtomicBoolean commandReceived=new AtomicBoolean(true); private final AtomicBoolean commandReceived=new AtomicBoolean(true);
private final AtomicBoolean inReceive=new AtomicBoolean(false); private final AtomicBoolean inReceive=new AtomicBoolean(false);
private final Runnable readChecker = new Runnable() { private final Runnable readChecker = new Runnable() {
public void run() { public void run() {
readCheck(); readCheck();
} }
}; };
private final Runnable writeChecker = new Runnable() { private final Runnable writeChecker = new Runnable() {
public void run() { public void run() {
writeCheck(); writeCheck();
} }
}; };
public InactivityMonitor(Transport next) { public InactivityMonitor(Transport next) {
super(next); super(next);
} }
@ -67,108 +67,116 @@ public class InactivityMonitor extends TransportFilter {
next.stop(); next.stop();
} }
private void writeCheck() { private void writeCheck() {
if( inSend.get() ) { synchronized(writeChecker) {
log.trace("A send is in progress"); if( inSend.get() ) {
return; log.trace("A send is in progress");
} return;
if( !commandSent.get() ) {
log.trace("No message sent since last write check, sending a KeepAliveInfo");
try {
next.oneway(new KeepAliveInfo());
} catch (IOException e) {
onException(e);
} }
} else {
log.trace("Message sent since last write check, resetting flag"); if( !commandSent.get() ) {
log.trace("No message sent since last write check, sending a KeepAliveInfo");
try {
next.oneway(new KeepAliveInfo());
} catch (IOException e) {
onException(e);
}
} else {
log.trace("Message sent since last write check, resetting flag");
}
commandSent.set(false);
} }
commandSent.set(false);
} }
private void readCheck() { private void readCheck() {
if( inReceive.get() ) { synchronized(readChecker) {
log.trace("A receive is in progress"); if( inReceive.get() ) {
return; log.trace("A receive is in progress");
return;
}
if( !commandReceived.get() ) {
log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
onException(new InactivityIOException("Channel was inactive for too long."));
} else {
log.trace("Message received since last read check, resetting flag: ");
}
commandReceived.set(false);
} }
if( !commandReceived.get() ) {
log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
onException(new InactivityIOException("Channel was inactive for too long."));
} else {
log.trace("Message received since last read check, resetting flag: ");
}
commandReceived.set(false);
} }
public void onCommand(Object command) { public void onCommand(Object command) {
inReceive.set(true); synchronized(readChecker) {
try { inReceive.set(true);
if( command.getClass() == WireFormatInfo.class ) { try {
synchronized( this ) { if( command.getClass() == WireFormatInfo.class ) {
remoteWireFormatInfo = (WireFormatInfo) command; synchronized( this ) {
try { remoteWireFormatInfo = (WireFormatInfo) command;
startMonitorThreads(); try {
} catch (IOException e) { startMonitorThreads();
onException(e); } catch (IOException e) {
onException(e);
}
} }
} }
transportListener.onCommand(command);
} finally {
inReceive.set(false);
commandReceived.set(true);
} }
transportListener.onCommand(command);
} finally {
inReceive.set(false);
commandReceived.set(true);
} }
} }
public void oneway(Object o) throws IOException { public void oneway(Object o) throws IOException {
// Disable inactivity monitoring while processing a command. synchronized(writeChecker) {
inSend.set(true); // Disable inactivity monitoring while processing a command.
commandSent.set(true); inSend.set(true);
try { commandSent.set(true);
if( o.getClass() == WireFormatInfo.class ) { try {
synchronized( this ) { if( o.getClass() == WireFormatInfo.class ) {
localWireFormatInfo = (WireFormatInfo) o; synchronized( this ) {
startMonitorThreads(); localWireFormatInfo = (WireFormatInfo) o;
startMonitorThreads();
}
} }
next.oneway(o);
} finally {
inSend.set(false);
} }
next.oneway(o);
} finally {
inSend.set(false);
} }
} }
public void onException(IOException error) { public void onException(IOException error) {
if( monitorStarted.get() ) { if( monitorStarted.get() ) {
stopMonitorThreads(); stopMonitorThreads();
} }
getTransportListener().onException(error); getTransportListener().onException(error);
} }
synchronized private void startMonitorThreads() throws IOException { synchronized private void startMonitorThreads() throws IOException {
if( monitorStarted.get() ) if( monitorStarted.get() )
return; return;
if( localWireFormatInfo == null ) if( localWireFormatInfo == null )
return; return;
if( remoteWireFormatInfo == null ) if( remoteWireFormatInfo == null )
return; return;
long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
if( l > 0 ) { if( l > 0 ) {
monitorStarted.set(true); monitorStarted.set(true);
Scheduler.executePeriodically(writeChecker, l/2); Scheduler.executePeriodically(writeChecker, l/2);
Scheduler.executePeriodically(readChecker, l); Scheduler.executePeriodically(readChecker, l);
} }
} }
/** /**
* *
*/ */
synchronized private void stopMonitorThreads() { synchronized private void stopMonitorThreads() {
if( monitorStarted.compareAndSet(true, false) ) { if( monitorStarted.compareAndSet(true, false) ) {
@ -176,6 +184,6 @@ public class InactivityMonitor extends TransportFilter {
Scheduler.cancel(writeChecker); Scheduler.cancel(writeChecker);
} }
} }
} }