The maxInactivityDuration is now negociated using the WireFormatInfo.  This makes it easier to configure connections since client and server configs do not HAVE to match up excactly.


git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@387566 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-21 16:12:31 +00:00
parent a5ee130bea
commit ef0c0e1f87
11 changed files with 200 additions and 102 deletions

View File

@ -241,6 +241,17 @@ public class WireFormatInfo implements Command, MarshallAware {
setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE); setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE);
} }
/**
* @throws IOException
*/
public long getMaxInactivityDuration() throws IOException {
Long l = (Long) getProperty("MaxInactivityDuration");
return l == null ? 0 : l.longValue();
}
public void seMaxInactivityDuration(long maxInactivityDuration) throws IOException {
setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
}
public Response visit(CommandVisitor visitor) throws Exception { public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processWireFormat(this); return visitor.processWireFormat(this);
} }

View File

@ -35,6 +35,7 @@ public class OpenWireFormatFactory implements WireFormatFactory {
private boolean cacheEnabled=true; private boolean cacheEnabled=true;
private boolean tightEncodingEnabled=true; private boolean tightEncodingEnabled=true;
private boolean sizePrefixDisabled=false; private boolean sizePrefixDisabled=false;
private long maxInactivityDuration=30*1000;
public WireFormat createWireFormat() { public WireFormat createWireFormat() {
WireFormatInfo info = new WireFormatInfo(); WireFormatInfo info = new WireFormatInfo();
@ -46,6 +47,7 @@ public class OpenWireFormatFactory implements WireFormatFactory {
info.setTcpNoDelayEnabled(tcpNoDelayEnabled); info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
info.setTightEncodingEnabled(tightEncodingEnabled); info.setTightEncodingEnabled(tightEncodingEnabled);
info.setSizePrefixDisabled(sizePrefixDisabled); info.setSizePrefixDisabled(sizePrefixDisabled);
info.seMaxInactivityDuration(maxInactivityDuration);
} catch (Exception e) { } catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo"); IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
ise.initCause(e); ise.initCause(e);
@ -104,4 +106,12 @@ public class OpenWireFormatFactory implements WireFormatFactory {
public void setSizePrefixDisabled(boolean sizePrefixDisabled) { public void setSizePrefixDisabled(boolean sizePrefixDisabled) {
this.sizePrefixDisabled = sizePrefixDisabled; this.sizePrefixDisabled = sizePrefixDisabled;
} }
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
} }

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,12 +32,13 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class InactivityMonitor extends TransportFilter implements Runnable { public class InactivityMonitor extends TransportFilter {
private final Log log = LogFactory.getLog(InactivityMonitor.class); private final Log log = LogFactory.getLog(InactivityMonitor.class);
private final long maxInactivityDuration; private WireFormatInfo localWireFormatInfo;
private byte readCheckIteration=0; private WireFormatInfo remoteWireFormatInfo;
private boolean monitorStarted=false;
private final AtomicBoolean commandSent=new AtomicBoolean(false); private final AtomicBoolean commandSent=new AtomicBoolean(false);
private final AtomicBoolean inSend=new AtomicBoolean(false); private final AtomicBoolean inSend=new AtomicBoolean(false);
@ -44,34 +46,28 @@ public class InactivityMonitor extends TransportFilter implements Runnable {
private final AtomicBoolean commandReceived=new AtomicBoolean(true); private final AtomicBoolean commandReceived=new AtomicBoolean(true);
private final AtomicBoolean inReceive=new AtomicBoolean(false); private final AtomicBoolean inReceive=new AtomicBoolean(false);
public InactivityMonitor(Transport next, long maxInactivityDuration) { private final Runnable readChecker = new Runnable() {
super(next); public void run() {
this.maxInactivityDuration = maxInactivityDuration; readCheck();
} }
};
public void start() throws Exception { private final Runnable writeChecker = new Runnable() {
next.start(); public void run() {
Scheduler.executePeriodically(this, maxInactivityDuration/2); writeCheck();
}
};
public InactivityMonitor(Transport next) {
super(next);
} }
public void stop() throws Exception { public void stop() throws Exception {
Scheduler.cancel(this); stopMonitorThreads();
next.stop(); next.stop();
} }
synchronized public void run() {
switch(readCheckIteration) {
case 0:
writeCheck();
readCheckIteration++;
break;
case 1:
readCheck();
writeCheck();
readCheckIteration=0;
break;
}
}
private void writeCheck() { private void writeCheck() {
if( inSend.get() ) { if( inSend.get() ) {
@ -113,6 +109,16 @@ public class InactivityMonitor extends TransportFilter implements Runnable {
public void onCommand(Command command) { public void onCommand(Command command) {
inReceive.set(true); inReceive.set(true);
try { try {
if( command.isWireFormatInfo() ) {
synchronized( this ) {
remoteWireFormatInfo = (WireFormatInfo) command;
try {
startMonitorThreads();
} catch (IOException e) {
onException(e);
}
}
}
getTransportListener().onCommand(command); getTransportListener().onCommand(command);
} finally { } finally {
inReceive.set(false); inReceive.set(false);
@ -120,11 +126,18 @@ public class InactivityMonitor extends TransportFilter implements Runnable {
} }
} }
public void oneway(Command command) throws IOException { public void oneway(Command command) throws IOException {
// Disable inactivity monitoring while processing a command. // Disable inactivity monitoring while processing a command.
inSend.set(true); inSend.set(true);
commandSent.set(true); commandSent.set(true);
try { try {
if( command.isWireFormatInfo() ) {
synchronized( this ) {
localWireFormatInfo = (WireFormatInfo) command;
startMonitorThreads();
}
}
next.oneway(command); next.oneway(command);
} finally { } finally {
inSend.set(false); inSend.set(false);
@ -132,7 +145,37 @@ public class InactivityMonitor extends TransportFilter implements Runnable {
} }
public void onException(IOException error) { public void onException(IOException error) {
Scheduler.cancel(this); stopMonitorThreads();
getTransportListener().onException(error); getTransportListener().onException(error);
} }
synchronized private void startMonitorThreads() throws IOException {
if( monitorStarted )
return;
if( localWireFormatInfo == null )
return;
if( remoteWireFormatInfo == null )
return;
long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
if( l > 0 ) {
Scheduler.executePeriodically(writeChecker, l/2);
Scheduler.executePeriodically(readChecker, l);
monitorStarted=true;
}
}
/**
*
*/
synchronized private void stopMonitorThreads() {
if( monitorStarted ) {
Scheduler.cancel(readChecker);
Scheduler.cancel(writeChecker);
monitorStarted=false;
}
}
} }

View File

@ -230,11 +230,13 @@ public class ActiveIOTransportFactory extends TransportFactory {
if( activeIOTransport.isTrace() ) { if( activeIOTransport.isTrace() ) {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
transport = new InactivityMonitor(transport);
if( format instanceof OpenWireFormat ) { if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion()); transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion());
} }
transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
transport = new MutexTransport(transport); transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport); transport = new ResponseCorrelator(transport);
return transport; return transport;
@ -279,10 +281,12 @@ public class ActiveIOTransportFactory extends TransportFactory {
if( activeIOTransport.isTrace() ) { if( activeIOTransport.isTrace() ) {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
transport = new InactivityMonitor(transport);
if( format instanceof OpenWireFormat ) { if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion()); transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion());
} }
transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
return transport; return transport;
} }

View File

@ -56,7 +56,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
private boolean trace; private boolean trace;
private boolean useLocalHost = true; private boolean useLocalHost = true;
private int minmumWireFormatVersion; private int minmumWireFormatVersion;
private long maxInactivityDuration = 0; //30000;
private InetSocketAddress socketAddress; private InetSocketAddress socketAddress;
@ -206,17 +205,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
this.soTimeout = soTimeout; this.soTimeout = soTimeout;
} }
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
/**
* Sets the maximum inactivity duration
*/
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
public int getConnectionTimeout() { public int getConnectionTimeout() {
return connectionTimeout; return connectionTimeout;
} }

View File

@ -64,15 +64,13 @@ public class TcpTransportFactory extends TransportFactory {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
transport = new InactivityMonitor(transport);
// Only need the OpenWireFormat if using openwire // Only need the OpenWireFormat if using openwire
if( format instanceof OpenWireFormat ) { if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
} }
if( tcpTransport.getMaxInactivityDuration() > 0 ) {
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
}
transport = new MutexTransport(transport); transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport); transport = new ResponseCorrelator(transport);
return transport; return transport;
@ -85,14 +83,13 @@ public class TcpTransportFactory extends TransportFactory {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
transport = new InactivityMonitor(transport);
// Only need the OpenWireFormat if using openwire // Only need the OpenWireFormat if using openwire
if( format instanceof OpenWireFormat ) { if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
} }
if( tcpTransport.getMaxInactivityDuration() > 0 ) {
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
}
return transport; return transport;
} }

View File

@ -48,7 +48,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
private int backlog = 5000; private int backlog = 5000;
private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
private TcpTransportFactory transportFactory = new TcpTransportFactory(); private TcpTransportFactory transportFactory = new TcpTransportFactory();
private long maxInactivityDuration = 0; //30000; private long maxInactivityDuration = 30000;
private int minmumWireFormatVersion; private int minmumWireFormatVersion;
private boolean trace; private boolean trace;

View File

@ -57,7 +57,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy(); private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
private ReplayBuffer replayBuffer; private ReplayBuffer replayBuffer;
private int datagramSize = 4 * 1024; private int datagramSize = 4 * 1024;
private long maxInactivityDuration = 0; // 30000;
private SocketAddress targetAddress; private SocketAddress targetAddress;
private SocketAddress originalTargetAddress; private SocketAddress originalTargetAddress;
private DatagramChannel channel; private DatagramChannel channel;
@ -223,10 +222,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.trace = trace; this.trace = trace;
} }
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
public int getDatagramSize() { public int getDatagramSize() {
return datagramSize; return datagramSize;
} }
@ -235,13 +230,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.datagramSize = datagramSize; this.datagramSize = datagramSize;
} }
/**
* Sets the maximum inactivity duration
*/
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
public boolean isUseLocalHost() { public boolean isUseLocalHost() {
return useLocalHost; return useLocalHost;
} }

View File

@ -81,14 +81,12 @@ public class UdpTransportFactory extends TransportFactory {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
transport = new InactivityMonitor(transport);
if (format instanceof OpenWireFormat) { if (format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport); transport = configureClientSideNegotiator(transport, format, udpTransport);
} }
if (udpTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
}
return transport; return transport;
} }
@ -115,14 +113,12 @@ public class UdpTransportFactory extends TransportFactory {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
transport = new InactivityMonitor(transport);
if (!acceptServer && format instanceof OpenWireFormat) { if (!acceptServer && format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport); transport = configureClientSideNegotiator(transport, format, udpTransport);
} }
if (udpTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
}
// deal with fragmentation // deal with fragmentation
if (acceptServer) { if (acceptServer) {

View File

@ -134,10 +134,7 @@ public class UdpTransportServer extends TransportServerSupport {
} }
protected Transport configureTransport(Transport transport) { protected Transport configureTransport(Transport transport) {
if (serverTransport.getMaxInactivityDuration() > 0) { transport = new InactivityMonitor(transport);
transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
}
getAcceptListener().onAccept(transport); getAcceptListener().onAccept(transport);
return transport; return transport;
} }

View File

@ -1,10 +1,28 @@
/*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp; package org.apache.activemq.transport.tcp;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
@ -32,16 +50,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
public Runnable serverRunOnCommand; public Runnable serverRunOnCommand;
public Runnable clientRunOnCommand; public Runnable clientRunOnCommand;
public long clientInactivityLimit;
public long serverInactivityLimit;
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&maxInactivityDuration="+serverInactivityLimit)); startTransportServer();
server.setAcceptListener(this); }
server.start();
clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&maxInactivityDuration="+clientInactivityLimit)); /**
* @throws Exception
* @throws URISyntaxException
*/
private void startClient() throws Exception, URISyntaxException {
clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
clientTransport.setTransportListener(new TransportListener() { clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) { public void onCommand(Command command) {
clientReceiveCount.incrementAndGet(); clientReceiveCount.incrementAndGet();
@ -63,17 +82,36 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
clientTransport.start(); clientTransport.start();
} }
/**
* @throws IOException
* @throws URISyntaxException
* @throws Exception
*/
private void startTransportServer() throws IOException, URISyntaxException, Exception {
server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
server.setAcceptListener(this);
server.start();
}
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
ignoreClientError.set(true); ignoreClientError.set(true);
ignoreServerError.set(true); ignoreServerError.set(true);
try {
if( clientTransport!=null )
clientTransport.stop(); clientTransport.stop();
if( serverTransport!=null )
serverTransport.stop(); serverTransport.stop();
if( server!=null )
server.stop(); server.stop();
} catch (Throwable e) {
e.printStackTrace();
}
super.tearDown(); super.tearDown();
} }
public void onAccept(Transport transport) { public void onAccept(Transport transport) {
try { try {
System.out.println("["+getName()+"] Server Accepted a Connection");
serverTransport = transport; serverTransport = transport;
serverTransport.setTransportListener(new TransportListener() { serverTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) { public void onCommand(Command command) {
@ -103,12 +141,35 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
error.printStackTrace(); error.printStackTrace();
} }
public void initCombosForTestClientHang() {
addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000*60)});
addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
}
public void testClientHang() throws Exception { public void testClientHang() throws Exception {
//
// Manually create a client transport so that it does not send KeepAlive packets.
// this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), new URI("tcp://localhost:61616"));
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
clientReceiveCount.incrementAndGet();
if( clientRunOnCommand !=null ) {
clientRunOnCommand.run();
}
}
public void onException(IOException error) {
if( !ignoreClientError.get() ) {
System.out.println("Client transport error:");
error.printStackTrace();
clientErrorCount.incrementAndGet();
}
}
public void transportInterupted() {
}
public void transportResumed() {
}});
clientTransport.start();
WireFormatInfo info = new WireFormatInfo();
info.seMaxInactivityDuration(1000);
clientTransport.oneway(info);
assertEquals(0, serverErrorCount.get()); assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get()); assertEquals(0, clientErrorCount.get());
@ -119,42 +180,45 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
assertTrue(serverErrorCount.get()>0); assertTrue(serverErrorCount.get()>0);
} }
public void initCombosForTestNoClientHang() {
addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
}
public void testNoClientHang() throws Exception { public void testNoClientHang() throws Exception {
startClient();
assertEquals(0, serverErrorCount.get()); assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get()); assertEquals(0, clientErrorCount.get());
Thread.sleep(4000); Thread.sleep(4000);
if( clientErrorCount.get() > 0 )
assertEquals(0, clientErrorCount.get()); assertEquals(0, clientErrorCount.get());
if( serverErrorCount.get() > 0 )
assertEquals(0, serverErrorCount.get()); assertEquals(0, serverErrorCount.get());
} }
/** /**
* Used to test when a operation blocks. This should * Used to test when a operation blocks. This should
* not cause transport to get disconnected. * not cause transport to get disconnected.
* @throws Exception
* @throws URISyntaxException
*/ */
public void initCombosForTestNoClientHangWithServerBlock() { public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
startClient();
addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)}); addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)}); addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
addCombinationValues("serverRunOnCommand", new Object[] { new Runnable() { addCombinationValues("serverRunOnCommand", new Object[] { new Runnable() {
public void run() { public void run() {
try { try {
System.out.println("Sleeping"); System.out.println("Sleeping");
Thread.sleep(2000); Thread.sleep(4000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
} }
}}); }});
} }
public void testNoClientHangWithServerBlock() throws Exception { public void testNoClientHangWithServerBlock() throws Exception {
startClient();
assertEquals(0, serverErrorCount.get()); assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get()); assertEquals(0, clientErrorCount.get());