From ef0c0e1f87d2875a960eb5fad6d336230b2bda08 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 21 Mar 2006 16:12:31 +0000 Subject: [PATCH] http://jira.activemq.org/jira/browse/AMQ-643 The maxInactivityDuration is now negociated using the WireFormatInfo. This makes it easier to configure connections since client and server configs do not HAVE to match up excactly. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@387566 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/command/WireFormatInfo.java | 11 ++ .../openwire/OpenWireFormatFactory.java | 12 +- .../activemq/transport/InactivityMonitor.java | 103 +++++++++++----- .../activeio/ActiveIOTransportFactory.java | 8 +- .../activemq/transport/tcp/TcpTransport.java | 12 -- .../transport/tcp/TcpTransportFactory.java | 11 +- .../transport/tcp/TcpTransportServer.java | 2 +- .../activemq/transport/udp/UdpTransport.java | 12 -- .../transport/udp/UdpTransportFactory.java | 12 +- .../transport/udp/UdpTransportServer.java | 5 +- .../transport/tcp/InactivityMonitorTest.java | 114 ++++++++++++++---- 11 files changed, 200 insertions(+), 102 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java index 449098c339..69986a0854 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java @@ -240,6 +240,17 @@ public class WireFormatInfo implements Command, MarshallAware { public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException { setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE); } + + /** + * @throws IOException + */ + public long getMaxInactivityDuration() throws IOException { + Long l = (Long) getProperty("MaxInactivityDuration"); + return l == null ? 0 : l.longValue(); + } + public void seMaxInactivityDuration(long maxInactivityDuration) throws IOException { + setProperty("MaxInactivityDuration", new Long(maxInactivityDuration)); + } public Response visit(CommandVisitor visitor) throws Exception { return visitor.processWireFormat(this); diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java index 33183c2c7d..255afaadf5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java @@ -35,7 +35,8 @@ public class OpenWireFormatFactory implements WireFormatFactory { private boolean cacheEnabled=true; private boolean tightEncodingEnabled=true; private boolean sizePrefixDisabled=false; - + private long maxInactivityDuration=30*1000; + public WireFormat createWireFormat() { WireFormatInfo info = new WireFormatInfo(); info.setVersion(version); @@ -46,6 +47,7 @@ public class OpenWireFormatFactory implements WireFormatFactory { info.setTcpNoDelayEnabled(tcpNoDelayEnabled); info.setTightEncodingEnabled(tightEncodingEnabled); info.setSizePrefixDisabled(sizePrefixDisabled); + info.seMaxInactivityDuration(maxInactivityDuration); } catch (Exception e) { IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo"); ise.initCause(e); @@ -104,4 +106,12 @@ public class OpenWireFormatFactory implements WireFormatFactory { public void setSizePrefixDisabled(boolean sizePrefixDisabled) { this.sizePrefixDisabled = sizePrefixDisabled; } + + public long getMaxInactivityDuration() { + return maxInactivityDuration; + } + + public void setMaxInactivityDuration(long maxInactivityDuration) { + this.maxInactivityDuration = maxInactivityDuration; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 965980206b..f1372e3ad5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.activemq.command.Command; import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.thread.Scheduler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,12 +32,13 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; * * @version $Revision$ */ -public class InactivityMonitor extends TransportFilter implements Runnable { +public class InactivityMonitor extends TransportFilter { private final Log log = LogFactory.getLog(InactivityMonitor.class); - private final long maxInactivityDuration; - private byte readCheckIteration=0; + private WireFormatInfo localWireFormatInfo; + private WireFormatInfo remoteWireFormatInfo; + private boolean monitorStarted=false; private final AtomicBoolean commandSent=new AtomicBoolean(false); private final AtomicBoolean inSend=new AtomicBoolean(false); @@ -44,35 +46,29 @@ public class InactivityMonitor extends TransportFilter implements Runnable { private final AtomicBoolean commandReceived=new AtomicBoolean(true); private final AtomicBoolean inReceive=new AtomicBoolean(false); - public InactivityMonitor(Transport next, long maxInactivityDuration) { - super(next); - this.maxInactivityDuration = maxInactivityDuration; - } - - public void start() throws Exception { - next.start(); - Scheduler.executePeriodically(this, maxInactivityDuration/2); - } + private final Runnable readChecker = new Runnable() { + public void run() { + readCheck(); + } + }; + private final Runnable writeChecker = new Runnable() { + public void run() { + writeCheck(); + } + }; + + + public InactivityMonitor(Transport next) { + super(next); + } + public void stop() throws Exception { - Scheduler.cancel(this); + stopMonitorThreads(); next.stop(); } - - synchronized public void run() { - switch(readCheckIteration) { - case 0: - writeCheck(); - readCheckIteration++; - break; - case 1: - readCheck(); - writeCheck(); - readCheckIteration=0; - break; - } - } - + + private void writeCheck() { if( inSend.get() ) { log.debug("A send is in progress"); @@ -82,7 +78,7 @@ public class InactivityMonitor extends TransportFilter implements Runnable { if( !commandSent.get() ) { log.debug("No message sent since last write check, sending a KeepAliveInfo"); try { - next.oneway(new KeepAliveInfo()); + next.oneway(new KeepAliveInfo()); } catch (IOException e) { onException(e); } @@ -113,18 +109,35 @@ public class InactivityMonitor extends TransportFilter implements Runnable { public void onCommand(Command command) { inReceive.set(true); try { + if( command.isWireFormatInfo() ) { + synchronized( this ) { + remoteWireFormatInfo = (WireFormatInfo) command; + try { + startMonitorThreads(); + } catch (IOException e) { + onException(e); + } + } + } getTransportListener().onCommand(command); } finally { inReceive.set(false); commandReceived.set(true); } } + public void oneway(Command command) throws IOException { // Disable inactivity monitoring while processing a command. inSend.set(true); commandSent.set(true); try { + if( command.isWireFormatInfo() ) { + synchronized( this ) { + localWireFormatInfo = (WireFormatInfo) command; + startMonitorThreads(); + } + } next.oneway(command); } finally { inSend.set(false); @@ -132,7 +145,37 @@ public class InactivityMonitor extends TransportFilter implements Runnable { } public void onException(IOException error) { - Scheduler.cancel(this); + stopMonitorThreads(); getTransportListener().onException(error); } + + + synchronized private void startMonitorThreads() throws IOException { + if( monitorStarted ) + return; + if( localWireFormatInfo == null ) + return; + if( remoteWireFormatInfo == null ) + return; + + long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); + if( l > 0 ) { + Scheduler.executePeriodically(writeChecker, l/2); + Scheduler.executePeriodically(readChecker, l); + monitorStarted=true; + } + } + + /** + * + */ + synchronized private void stopMonitorThreads() { + if( monitorStarted ) { + Scheduler.cancel(readChecker); + Scheduler.cancel(writeChecker); + monitorStarted=false; + } + } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java index 7848f3a553..6234fe89c2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java @@ -230,11 +230,13 @@ public class ActiveIOTransportFactory extends TransportFactory { if( activeIOTransport.isTrace() ) { transport = new TransportLogger(transport); } + + transport = new InactivityMonitor(transport); + if( format instanceof OpenWireFormat ) { transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion()); } - transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration()); transport = new MutexTransport(transport); transport = new ResponseCorrelator(transport); return transport; @@ -279,10 +281,12 @@ public class ActiveIOTransportFactory extends TransportFactory { if( activeIOTransport.isTrace() ) { transport = new TransportLogger(transport); } + + transport = new InactivityMonitor(transport); + if( format instanceof OpenWireFormat ) { transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion()); } - transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration()); return transport; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index f3639fde41..a42011b611 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -56,7 +56,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S private boolean trace; private boolean useLocalHost = true; private int minmumWireFormatVersion; - private long maxInactivityDuration = 0; //30000; private InetSocketAddress socketAddress; @@ -206,17 +205,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S this.soTimeout = soTimeout; } - public long getMaxInactivityDuration() { - return maxInactivityDuration; - } - - /** - * Sets the maximum inactivity duration - */ - public void setMaxInactivityDuration(long maxInactivityDuration) { - this.maxInactivityDuration = maxInactivityDuration; - } - public int getConnectionTimeout() { return connectionTimeout; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 1435864d0f..b3e7e35c2d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -64,15 +64,13 @@ public class TcpTransportFactory extends TransportFactory { transport = new TransportLogger(transport); } + transport = new InactivityMonitor(transport); + // Only need the OpenWireFormat if using openwire if( format instanceof OpenWireFormat ) { transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); } - if( tcpTransport.getMaxInactivityDuration() > 0 ) { - transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration()); - } - transport = new MutexTransport(transport); transport = new ResponseCorrelator(transport); return transport; @@ -85,14 +83,13 @@ public class TcpTransportFactory extends TransportFactory { transport = new TransportLogger(transport); } + transport = new InactivityMonitor(transport); + // Only need the OpenWireFormat if using openwire if( format instanceof OpenWireFormat ) { transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); } - if( tcpTransport.getMaxInactivityDuration() > 0 ) { - transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration()); - } return transport; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index d41b13f2bd..21b4db130c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -48,7 +48,7 @@ public class TcpTransportServer extends TransportServerThreadSupport { private int backlog = 5000; private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); private TcpTransportFactory transportFactory = new TcpTransportFactory(); - private long maxInactivityDuration = 0; //30000; + private long maxInactivityDuration = 30000; private int minmumWireFormatVersion; private boolean trace; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index d5337bec0d..f6cf53926d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -57,7 +57,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy(); private ReplayBuffer replayBuffer; private int datagramSize = 4 * 1024; - private long maxInactivityDuration = 0; // 30000; private SocketAddress targetAddress; private SocketAddress originalTargetAddress; private DatagramChannel channel; @@ -223,10 +222,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S this.trace = trace; } - public long getMaxInactivityDuration() { - return maxInactivityDuration; - } - public int getDatagramSize() { return datagramSize; } @@ -235,13 +230,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S this.datagramSize = datagramSize; } - /** - * Sets the maximum inactivity duration - */ - public void setMaxInactivityDuration(long maxInactivityDuration) { - this.maxInactivityDuration = maxInactivityDuration; - } - public boolean isUseLocalHost() { return useLocalHost; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java index c7bf221e51..46575e15c0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java @@ -81,14 +81,12 @@ public class UdpTransportFactory extends TransportFactory { transport = new TransportLogger(transport); } + transport = new InactivityMonitor(transport); + if (format instanceof OpenWireFormat) { transport = configureClientSideNegotiator(transport, format, udpTransport); } - if (udpTransport.getMaxInactivityDuration() > 0) { - transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration()); - } - return transport; } @@ -115,14 +113,12 @@ public class UdpTransportFactory extends TransportFactory { transport = new TransportLogger(transport); } + transport = new InactivityMonitor(transport); + if (!acceptServer && format instanceof OpenWireFormat) { transport = configureClientSideNegotiator(transport, format, udpTransport); } - if (udpTransport.getMaxInactivityDuration() > 0) { - transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration()); - } - // deal with fragmentation if (acceptServer) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java index 8ba6869741..d00c6bdcae 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java @@ -134,10 +134,7 @@ public class UdpTransportServer extends TransportServerSupport { } protected Transport configureTransport(Transport transport) { - if (serverTransport.getMaxInactivityDuration() > 0) { - transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); - } - + transport = new InactivityMonitor(transport); getAcceptListener().onAccept(transport); return transport; } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java index 9b7c596ced..c82722cf8b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -1,10 +1,28 @@ +/* + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.activemq.transport.tcp; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.command.Command; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportFactory; @@ -32,16 +50,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra public Runnable serverRunOnCommand; public Runnable clientRunOnCommand; - public long clientInactivityLimit; - public long serverInactivityLimit; - - protected void setUp() throws Exception { super.setUp(); - server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&maxInactivityDuration="+serverInactivityLimit)); - server.setAcceptListener(this); - server.start(); - clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&maxInactivityDuration="+clientInactivityLimit)); + startTransportServer(); + } + + /** + * @throws Exception + * @throws URISyntaxException + */ + private void startClient() throws Exception, URISyntaxException { + clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000")); clientTransport.setTransportListener(new TransportListener() { public void onCommand(Command command) { clientReceiveCount.incrementAndGet(); @@ -62,18 +81,37 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra }}); clientTransport.start(); } + + /** + * @throws IOException + * @throws URISyntaxException + * @throws Exception + */ + private void startTransportServer() throws IOException, URISyntaxException, Exception { + server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000")); + server.setAcceptListener(this); + server.start(); + } protected void tearDown() throws Exception { ignoreClientError.set(true); ignoreServerError.set(true); - clientTransport.stop(); - serverTransport.stop(); - server.stop(); + try { + if( clientTransport!=null ) + clientTransport.stop(); + if( serverTransport!=null ) + serverTransport.stop(); + if( server!=null ) + server.stop(); + } catch (Throwable e) { + e.printStackTrace(); + } super.tearDown(); } public void onAccept(Transport transport) { try { + System.out.println("["+getName()+"] Server Accepted a Connection"); serverTransport = transport; serverTransport.setTransportListener(new TransportListener() { public void onCommand(Command command) { @@ -103,12 +141,35 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra error.printStackTrace(); } - public void initCombosForTestClientHang() { - addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000*60)}); - addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)}); - } public void testClientHang() throws Exception { + // + // Manually create a client transport so that it does not send KeepAlive packets. + // this should simulate a client hang. + clientTransport = new TcpTransport(new OpenWireFormat(), new URI("tcp://localhost:61616")); + clientTransport.setTransportListener(new TransportListener() { + public void onCommand(Command command) { + clientReceiveCount.incrementAndGet(); + if( clientRunOnCommand !=null ) { + clientRunOnCommand.run(); + } + } + public void onException(IOException error) { + if( !ignoreClientError.get() ) { + System.out.println("Client transport error:"); + error.printStackTrace(); + clientErrorCount.incrementAndGet(); + } + } + public void transportInterupted() { + } + public void transportResumed() { + }}); + clientTransport.start(); + WireFormatInfo info = new WireFormatInfo(); + info.seMaxInactivityDuration(1000); + clientTransport.oneway(info); + assertEquals(0, serverErrorCount.get()); assertEquals(0, clientErrorCount.get()); @@ -119,42 +180,45 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra assertTrue(serverErrorCount.get()>0); } - public void initCombosForTestNoClientHang() { - addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)}); - addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)}); - } public void testNoClientHang() throws Exception { + startClient(); assertEquals(0, serverErrorCount.get()); assertEquals(0, clientErrorCount.get()); Thread.sleep(4000); - if( clientErrorCount.get() > 0 ) - assertEquals(0, clientErrorCount.get()); - if( serverErrorCount.get() > 0 ) - assertEquals(0, serverErrorCount.get()); + assertEquals(0, clientErrorCount.get()); + assertEquals(0, serverErrorCount.get()); } /** * Used to test when a operation blocks. This should * not cause transport to get disconnected. + * @throws Exception + * @throws URISyntaxException */ - public void initCombosForTestNoClientHangWithServerBlock() { + public void initCombosForTestNoClientHangWithServerBlock() throws Exception { + + startClient(); + addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)}); addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)}); addCombinationValues("serverRunOnCommand", new Object[] { new Runnable() { public void run() { try { System.out.println("Sleeping"); - Thread.sleep(2000); + Thread.sleep(4000); } catch (InterruptedException e) { } } }}); } + public void testNoClientHangWithServerBlock() throws Exception { + startClient(); + assertEquals(0, serverErrorCount.get()); assertEquals(0, clientErrorCount.get());