- Implemented transpor inactivity monitoring for the tcp transport

- The properties set on the tcp transport can now be set on the transport sever, and the sever will configure the transports it creates with those properties.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382503 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-02 20:25:19 +00:00
parent 3bdf883d04
commit 1229c23bc4
7 changed files with 305 additions and 38 deletions

View File

@ -18,9 +18,11 @@ package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.thread.Scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@ -31,25 +33,26 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
*/
public class InactivityMonitor extends TransportFilter implements Runnable {
private final Log log = LogFactory.getLog(InactivityMonitor.class);
private final long maxInactivityDuration;
private final AtomicBoolean cancled = new AtomicBoolean(false);
private byte runIteration=0;
private byte readCheckIteration=0;
private long lastReadCount;
private long lastWriteCount;
private final CountStatisticImpl readCounter;
private final CountStatisticImpl writeCounter;
private final AtomicBoolean commandSent=new AtomicBoolean(true);
private final AtomicBoolean inSend=new AtomicBoolean(false);
public InactivityMonitor(Transport next, long maxInactivityDuration, CountStatisticImpl readCounter, CountStatisticImpl writeCounter ) {
private final AtomicBoolean commandReceived=new AtomicBoolean(true);
private final AtomicBoolean inReceive=new AtomicBoolean(false);
public InactivityMonitor(Transport next, long maxInactivityDuration) {
super(next);
this.maxInactivityDuration = maxInactivityDuration;
this.readCounter = readCounter;
this.writeCounter = writeCounter;
}
public void start() throws Exception {
next.start();
Scheduler.executePeriodically(this, maxInactivityDuration/5);
Scheduler.executePeriodically(this, maxInactivityDuration/2);
}
public void stop() throws Exception {
@ -60,33 +63,74 @@ public class InactivityMonitor extends TransportFilter implements Runnable {
}
public void run() {
switch(runIteration) {
switch(readCheckIteration) {
case 0:
writeCheck();
readCheckIteration++;
case 1:
case 2:
long wc = writeCounter.getCount();
if( wc==lastWriteCount ) {
readCheck();
writeCheck();
readCheckIteration=0;
break;
}
}
private void writeCheck() {
if( inSend.get() ) {
log.debug("A send is in progress");
return;
}
if( !commandSent.get() ) {
log.debug("No message sent since last write check, sending a KeepAliveInfo");
try {
oneway(new KeepAliveInfo());
next.oneway(new KeepAliveInfo());
} catch (IOException e) {
onException(e);
}
} else {
lastWriteCount = wc;
log.debug("Message sent since last write check, resetting flag");
}
break;
case 4:
long rc = readCounter.getCount();
if( rc == lastReadCount ) {
commandSent.set(false);
}
private void readCheck() {
if( inReceive.get() ) {
log.debug("A receive is in progress");
return;
}
if( !commandReceived.get() ) {
log.debug("No message received since last read check!");
onException(new InactivityIOException("Channel was inactive for too long."));
} else {
lastReadCount = rc;
log.debug("Message received since last read check, resetting flag");
}
commandReceived.set(false);
}
public void onCommand(Command command) {
inReceive.set(true);
try {
commandListener.onCommand(command);
} finally {
inReceive.set(false);
commandReceived.set(true);
}
}
runIteration++;
if(runIteration>=5)
runIteration=0;
public void oneway(Command command) throws IOException {
// Disable inactivity monitoring while processing a command.
inSend.set(true);
commandSent.set(true);
try {
next.oneway(command);
} finally {
inSend.set(false);
}
}
public void onException(IOException error) {

View File

@ -32,7 +32,7 @@ public class TransportLogger extends TransportFilter {
private final Log log;
public TransportLogger(Transport next) {
this( next, LogFactory.getLog(TransportLogger.class.getName()+"."+getNextId()));
this( next, LogFactory.getLog(TransportLogger.class.getName()+":"+getNextId()));
}
synchronized private static int getNextId() {
@ -51,6 +51,20 @@ public class TransportLogger extends TransportFilter {
next.oneway(command);
}
public void onCommand(Command command) {
if( log.isDebugEnabled() ) {
log.debug("RECEIVED: "+command);
}
commandListener.onCommand(command);
}
public void onException(IOException error) {
if( log.isDebugEnabled() ) {
log.debug("RECEIVED Exception: "+error, error);
}
commandListener.onException(error);
}
public String toString() {
return next.toString();
}

View File

@ -229,8 +229,8 @@ public class ActiveIOTransportFactory extends TransportFactory {
if( activeIOTransport.isTrace() ) {
transport = new TransportLogger(transport);
}
transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration(), activityMonitor.getReadCounter(), activityMonitor.getWriteCounter());
transport = new WireFormatNegotiator(transport,format, activeIOTransport.getMinmumWireFormatVersion());
transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
@ -275,8 +275,8 @@ public class ActiveIOTransportFactory extends TransportFactory {
if( activeIOTransport.isTrace() ) {
transport = new TransportLogger(transport);
}
transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration(), activityMonitor.getReadCounter(), activityMonitor.getWriteCounter());
transport = new WireFormatNegotiator(transport,format, activeIOTransport.getMinmumWireFormatVersion());
transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
return transport;
}

View File

@ -55,6 +55,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
private boolean trace;
private boolean useLocalHost = true;
private int minmumWireFormatVersion;
private long maxInactivityDuration = 30000;
/**
* Construct basic helpers
@ -275,4 +276,12 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
}
}
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
}

View File

@ -26,6 +26,7 @@ import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
@ -67,9 +68,12 @@ public class TcpTransportFactory extends TransportFactory {
// transport = new InactivityMonitor(transport,
// temp.getMaxInactivityDuration(), activityMonitor.getReadCounter(),
// activityMonitor.getWriteCounter());
if( format instanceof OpenWireFormat )
transport = new WireFormatNegotiator(transport, format, tcpTransport.getMinmumWireFormatVersion());
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
@ -87,6 +91,7 @@ public class TcpTransportFactory extends TransportFactory {
// temp.getMaxInactivityDuration(), activityMonitor.getReadCounter(),
// activityMonitor.getWriteCounter());
transport = new WireFormatNegotiator(transport, format, tcpTransport.getMinmumWireFormatVersion());
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
return transport;
}

View File

@ -48,6 +48,9 @@ public class TcpTransportServer extends TransportServerThreadSupport {
private int backlog = 5000;
private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
private TcpTransportFactory transportFactory = new TcpTransportFactory();
private long maxInactivityDuration = 30000;
private int minmumWireFormatVersion;
private boolean trace;
/**
* Constructor
@ -101,6 +104,9 @@ public class TcpTransportServer extends TransportServerThreadSupport {
}
else {
HashMap options = new HashMap();
options.put("maxInactivityDuration", new Long(maxInactivityDuration));
options.put("minmumWireFormatVersion", new Integer(minmumWireFormatVersion));
options.put("trace", new Boolean(trace));
WireFormat format = wireFormatFactory.createWireFormat();
TcpTransport transport = new TcpTransport(format, socket);
Transport configuredTransport = transportFactory.configure(transport, format, options);
@ -181,4 +187,28 @@ public class TcpTransportServer extends TransportServerThreadSupport {
serverSocket.close();
}
}
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
public int getMinmumWireFormatVersion() {
return minmumWireFormatVersion;
}
public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
this.minmumWireFormatVersion = minmumWireFormatVersion;
}
public boolean isTrace() {
return trace;
}
public void setTrace(boolean trace) {
this.trace = trace;
}
}

View File

@ -0,0 +1,165 @@
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener {
private TransportServer server;
private Transport clientTransport;
private Transport serverTransport;
private final AtomicInteger clientReceiveCount = new AtomicInteger(0);
private final AtomicInteger clientErrorCount = new AtomicInteger(0);
private final AtomicInteger serverReceiveCount = new AtomicInteger(0);
private final AtomicInteger serverErrorCount = new AtomicInteger(0);
private final AtomicBoolean ignoreClientError = new AtomicBoolean(false);
private final AtomicBoolean ignoreServerError = new AtomicBoolean(false);
public Runnable serverRunOnCommand;
public Runnable clientRunOnCommand;
public long clientInactivityLimit;
public long serverInactivityLimit;
protected void setUp() throws Exception {
super.setUp();
server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?maxInactivityDuration="+serverInactivityLimit));
server.setAcceptListener(this);
server.start();
clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?maxInactivityDuration="+clientInactivityLimit));
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
clientReceiveCount.incrementAndGet();
if( clientRunOnCommand !=null ) {
clientRunOnCommand.run();
}
}
public void onException(IOException error) {
if( !ignoreClientError.get() ) {
System.out.println("Client transport error:");
error.printStackTrace();
clientErrorCount.incrementAndGet();
}
}
public void transportInterupted() {
}
public void transportResumed() {
}});
clientTransport.start();
}
protected void tearDown() throws Exception {
ignoreClientError.set(true);
ignoreServerError.set(true);
clientTransport.stop();
serverTransport.stop();
server.stop();
super.tearDown();
}
public void onAccept(Transport transport) {
try {
serverTransport = transport;
serverTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
serverReceiveCount.incrementAndGet();
if( serverRunOnCommand !=null ) {
serverRunOnCommand.run();
}
}
public void onException(IOException error) {
if( !ignoreClientError.get() ) {
System.out.println("Server transport error:");
error.printStackTrace();
serverErrorCount.incrementAndGet();
}
}
public void transportInterupted() {
}
public void transportResumed() {
}});
serverTransport.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public void onAcceptError(Exception error) {
error.printStackTrace();
}
public void initCombosForTestClientHang() {
addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000*60)});
addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
}
public void testClientHang() throws Exception {
assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get());
// Server should consider the client timed out right away since the client is not hart beating fast enough.
Thread.sleep(3000);
assertEquals(0, clientErrorCount.get());
assertTrue(serverErrorCount.get()>0);
}
public void initCombosForTestNoClientHang() {
addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
}
public void testNoClientHang() throws Exception {
assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get());
Thread.sleep(4000);
assertEquals(0, clientErrorCount.get());
assertEquals(0, serverErrorCount.get());
}
/**
* Used to test when a operation blocks. This should
* not cause transport to get disconnected.
*/
public void initCombosForTestNoClientHangWithServerBlock() {
addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
addCombinationValues("serverRunOnCommand", new Object[] { new Runnable() {
public void run() {
try {
System.out.println("Sleeping");
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
}});
}
public void testNoClientHangWithServerBlock() throws Exception {
assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get());
Thread.sleep(4000);
assertEquals(0, clientErrorCount.get());
assertEquals(0, serverErrorCount.get());
}
}