diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 0047dcfc87..1481456e53 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -18,9 +18,11 @@ package org.apache.activemq.transport; import java.io.IOException; +import org.apache.activemq.command.Command; import org.apache.activemq.command.KeepAliveInfo; -import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.thread.Scheduler; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; @@ -31,25 +33,26 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; */ public class InactivityMonitor extends TransportFilter implements Runnable { + private final Log log = LogFactory.getLog(InactivityMonitor.class); + private final long maxInactivityDuration; private final AtomicBoolean cancled = new AtomicBoolean(false); - private byte runIteration=0; + private byte readCheckIteration=0; - private long lastReadCount; - private long lastWriteCount; - private final CountStatisticImpl readCounter; - private final CountStatisticImpl writeCounter; + private final AtomicBoolean commandSent=new AtomicBoolean(true); + private final AtomicBoolean inSend=new AtomicBoolean(false); + + private final AtomicBoolean commandReceived=new AtomicBoolean(true); + private final AtomicBoolean inReceive=new AtomicBoolean(false); - public InactivityMonitor(Transport next, long maxInactivityDuration, CountStatisticImpl readCounter, CountStatisticImpl writeCounter ) { + public InactivityMonitor(Transport next, long maxInactivityDuration) { super(next); this.maxInactivityDuration = maxInactivityDuration; - this.readCounter = readCounter; - this.writeCounter = writeCounter; } - + public void start() throws Exception { next.start(); - Scheduler.executePeriodically(this, maxInactivityDuration/5); + Scheduler.executePeriodically(this, maxInactivityDuration/2); } public void stop() throws Exception { @@ -60,33 +63,74 @@ public class InactivityMonitor extends TransportFilter implements Runnable { } public void run() { - - switch(runIteration) { + switch(readCheckIteration) { + case 0: + writeCheck(); + readCheckIteration++; case 1: - case 2: - long wc = writeCounter.getCount(); - if( wc==lastWriteCount ) { - try { - oneway(new KeepAliveInfo()); - } catch (IOException e) { - onException(e); - } - } else { - lastWriteCount = wc; - } + readCheck(); + writeCheck(); + readCheckIteration=0; break; - case 4: - long rc = readCounter.getCount(); - if( rc == lastReadCount ) { - onException(new InactivityIOException("Channel was inactive for too long.")); - } else { - lastReadCount = rc; - } + } + } + + private void writeCheck() { + if( inSend.get() ) { + log.debug("A send is in progress"); + return; } - runIteration++; - if(runIteration>=5) - runIteration=0; + if( !commandSent.get() ) { + log.debug("No message sent since last write check, sending a KeepAliveInfo"); + try { + next.oneway(new KeepAliveInfo()); + } catch (IOException e) { + onException(e); + } + } else { + log.debug("Message sent since last write check, resetting flag"); + } + + commandSent.set(false); + + } + + private void readCheck() { + if( inReceive.get() ) { + log.debug("A receive is in progress"); + return; + } + + if( !commandReceived.get() ) { + log.debug("No message received since last read check!"); + onException(new InactivityIOException("Channel was inactive for too long.")); + } else { + log.debug("Message received since last read check, resetting flag"); + } + + commandReceived.set(false); + } + + public void onCommand(Command command) { + inReceive.set(true); + try { + commandListener.onCommand(command); + } finally { + inReceive.set(false); + commandReceived.set(true); + } + } + + public void oneway(Command command) throws IOException { + // Disable inactivity monitoring while processing a command. + inSend.set(true); + commandSent.set(true); + try { + next.oneway(command); + } finally { + inSend.set(false); + } } public void onException(IOException error) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java index 662b36d233..aac21596b2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java @@ -32,7 +32,7 @@ public class TransportLogger extends TransportFilter { private final Log log; public TransportLogger(Transport next) { - this( next, LogFactory.getLog(TransportLogger.class.getName()+"."+getNextId())); + this( next, LogFactory.getLog(TransportLogger.class.getName()+":"+getNextId())); } synchronized private static int getNextId() { @@ -51,6 +51,20 @@ public class TransportLogger extends TransportFilter { next.oneway(command); } + public void onCommand(Command command) { + if( log.isDebugEnabled() ) { + log.debug("RECEIVED: "+command); + } + commandListener.onCommand(command); + } + + public void onException(IOException error) { + if( log.isDebugEnabled() ) { + log.debug("RECEIVED Exception: "+error, error); + } + commandListener.onException(error); + } + public String toString() { return next.toString(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java index 101b67da43..f0f4da2480 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java @@ -229,8 +229,8 @@ public class ActiveIOTransportFactory extends TransportFactory { if( activeIOTransport.isTrace() ) { transport = new TransportLogger(transport); } - transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration(), activityMonitor.getReadCounter(), activityMonitor.getWriteCounter()); transport = new WireFormatNegotiator(transport,format, activeIOTransport.getMinmumWireFormatVersion()); + transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration()); transport = new MutexTransport(transport); transport = new ResponseCorrelator(transport); return transport; @@ -275,8 +275,8 @@ public class ActiveIOTransportFactory extends TransportFactory { if( activeIOTransport.isTrace() ) { transport = new TransportLogger(transport); } - transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration(), activityMonitor.getReadCounter(), activityMonitor.getWriteCounter()); transport = new WireFormatNegotiator(transport,format, activeIOTransport.getMinmumWireFormatVersion()); + transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration()); return transport; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 48d2e2f459..a1b57a67fb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -55,7 +55,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S private boolean trace; private boolean useLocalHost = true; private int minmumWireFormatVersion; - + private long maxInactivityDuration = 30000; + /** * Construct basic helpers * @@ -275,4 +276,12 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S } } + public long getMaxInactivityDuration() { + return maxInactivityDuration; + } + + public void setMaxInactivityDuration(long maxInactivityDuration) { + this.maxInactivityDuration = maxInactivityDuration; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index bc1d02d583..20843bfdd4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -26,6 +26,7 @@ import javax.net.ServerSocketFactory; import javax.net.SocketFactory; import org.activeio.command.WireFormat; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; @@ -67,9 +68,12 @@ public class TcpTransportFactory extends TransportFactory { // transport = new InactivityMonitor(transport, // temp.getMaxInactivityDuration(), activityMonitor.getReadCounter(), // activityMonitor.getWriteCounter()); + if( format instanceof OpenWireFormat ) transport = new WireFormatNegotiator(transport, format, tcpTransport.getMinmumWireFormatVersion()); + transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration()); + transport = new MutexTransport(transport); transport = new ResponseCorrelator(transport); return transport; @@ -87,6 +91,7 @@ public class TcpTransportFactory extends TransportFactory { // temp.getMaxInactivityDuration(), activityMonitor.getReadCounter(), // activityMonitor.getWriteCounter()); transport = new WireFormatNegotiator(transport, format, tcpTransport.getMinmumWireFormatVersion()); + transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration()); return transport; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index f7ded68f0f..3fd7380fda 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -48,6 +48,9 @@ public class TcpTransportServer extends TransportServerThreadSupport { private int backlog = 5000; private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); private TcpTransportFactory transportFactory = new TcpTransportFactory(); + private long maxInactivityDuration = 30000; + private int minmumWireFormatVersion; + private boolean trace; /** * Constructor @@ -101,6 +104,9 @@ public class TcpTransportServer extends TransportServerThreadSupport { } else { HashMap options = new HashMap(); + options.put("maxInactivityDuration", new Long(maxInactivityDuration)); + options.put("minmumWireFormatVersion", new Integer(minmumWireFormatVersion)); + options.put("trace", new Boolean(trace)); WireFormat format = wireFormatFactory.createWireFormat(); TcpTransport transport = new TcpTransport(format, socket); Transport configuredTransport = transportFactory.configure(transport, format, options); @@ -181,4 +187,28 @@ public class TcpTransportServer extends TransportServerThreadSupport { serverSocket.close(); } } + + public long getMaxInactivityDuration() { + return maxInactivityDuration; + } + + public void setMaxInactivityDuration(long maxInactivityDuration) { + this.maxInactivityDuration = maxInactivityDuration; + } + + public int getMinmumWireFormatVersion() { + return minmumWireFormatVersion; + } + + public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { + this.minmumWireFormatVersion = minmumWireFormatVersion; + } + + public boolean isTrace() { + return trace; + } + + public void setTrace(boolean trace) { + this.trace = trace; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java new file mode 100644 index 0000000000..4ee0943e7b --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -0,0 +1,165 @@ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.URI; + +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportServer; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + + +public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener { + + private TransportServer server; + private Transport clientTransport; + private Transport serverTransport; + + private final AtomicInteger clientReceiveCount = new AtomicInteger(0); + private final AtomicInteger clientErrorCount = new AtomicInteger(0); + private final AtomicInteger serverReceiveCount = new AtomicInteger(0); + private final AtomicInteger serverErrorCount = new AtomicInteger(0); + + private final AtomicBoolean ignoreClientError = new AtomicBoolean(false); + private final AtomicBoolean ignoreServerError = new AtomicBoolean(false); + + public Runnable serverRunOnCommand; + public Runnable clientRunOnCommand; + + public long clientInactivityLimit; + public long serverInactivityLimit; + + + protected void setUp() throws Exception { + super.setUp(); + server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?maxInactivityDuration="+serverInactivityLimit)); + server.setAcceptListener(this); + server.start(); + clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?maxInactivityDuration="+clientInactivityLimit)); + clientTransport.setTransportListener(new TransportListener() { + public void onCommand(Command command) { + clientReceiveCount.incrementAndGet(); + if( clientRunOnCommand !=null ) { + clientRunOnCommand.run(); + } + } + public void onException(IOException error) { + if( !ignoreClientError.get() ) { + System.out.println("Client transport error:"); + error.printStackTrace(); + clientErrorCount.incrementAndGet(); + } + } + public void transportInterupted() { + } + public void transportResumed() { + }}); + clientTransport.start(); + } + + protected void tearDown() throws Exception { + ignoreClientError.set(true); + ignoreServerError.set(true); + clientTransport.stop(); + serverTransport.stop(); + server.stop(); + super.tearDown(); + } + + public void onAccept(Transport transport) { + try { + serverTransport = transport; + serverTransport.setTransportListener(new TransportListener() { + public void onCommand(Command command) { + serverReceiveCount.incrementAndGet(); + if( serverRunOnCommand !=null ) { + serverRunOnCommand.run(); + } + } + public void onException(IOException error) { + if( !ignoreClientError.get() ) { + System.out.println("Server transport error:"); + error.printStackTrace(); + serverErrorCount.incrementAndGet(); + } + } + public void transportInterupted() { + } + public void transportResumed() { + }}); + serverTransport.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void onAcceptError(Exception error) { + error.printStackTrace(); + } + + public void initCombosForTestClientHang() { + addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000*60)}); + addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)}); + } + public void testClientHang() throws Exception { + + assertEquals(0, serverErrorCount.get()); + assertEquals(0, clientErrorCount.get()); + + // Server should consider the client timed out right away since the client is not hart beating fast enough. + Thread.sleep(3000); + + assertEquals(0, clientErrorCount.get()); + assertTrue(serverErrorCount.get()>0); + } + + public void initCombosForTestNoClientHang() { + addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)}); + addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)}); + } + public void testNoClientHang() throws Exception { + + assertEquals(0, serverErrorCount.get()); + assertEquals(0, clientErrorCount.get()); + + Thread.sleep(4000); + + assertEquals(0, clientErrorCount.get()); + assertEquals(0, serverErrorCount.get()); + } + + /** + * Used to test when a operation blocks. This should + * not cause transport to get disconnected. + */ + public void initCombosForTestNoClientHangWithServerBlock() { + addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)}); + addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)}); + addCombinationValues("serverRunOnCommand", new Object[] { new Runnable() { + public void run() { + try { + System.out.println("Sleeping"); + Thread.sleep(2000); + } catch (InterruptedException e) { + } + } + }}); + } + public void testNoClientHangWithServerBlock() throws Exception { + + assertEquals(0, serverErrorCount.get()); + assertEquals(0, clientErrorCount.get()); + + Thread.sleep(4000); + + assertEquals(0, clientErrorCount.get()); + assertEquals(0, serverErrorCount.get()); + } + +}