From 15065b71c7decf00c0ce9f6ceb8fa75cdc18e5c1 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Mon, 9 Feb 2009 12:49:42 +0000 Subject: [PATCH] fix for https://issues.apache.org/activemq/browse/AMQ-2088 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@742458 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/openwire/OpenWireFormat.java | 10 ++++++++++ .../apache/activemq/transport/InactivityMonitor.java | 10 +++++++--- .../activemq/transport/stomp/StompWireFormat.java | 7 +++++++ .../activemq/transport/tcp/SslTransportFactory.java | 2 +- .../activemq/transport/tcp/TcpTransportFactory.java | 2 +- .../activemq/transport/udp/UdpTransportFactory.java | 4 ++-- .../activemq/transport/udp/UdpTransportServer.java | 2 +- .../activemq/wireformat/ObjectStreamWireFormat.java | 7 +++++++ .../org/apache/activemq/wireformat/WireFormat.java | 5 +++++ 9 files changed, 41 insertions(+), 8 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java index c9030a0aa5..04b6633a5b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.DataStructure; @@ -61,6 +62,8 @@ public final class OpenWireFormat implements WireFormat { private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); private WireFormatInfo preferedWireFormatInfo; + + private AtomicBoolean receivingMessage = new AtomicBoolean(false); public OpenWireFormat() { this(DEFAULT_VERSION); @@ -350,6 +353,7 @@ public final class OpenWireFormat implements WireFormat { public Object doUnmarshal(DataInput dis) throws IOException { byte dataType = dis.readByte(); + receivingMessage.set(true); if (dataType != NULL_TYPE) { DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; if (dsm == null) { @@ -363,8 +367,10 @@ public final class OpenWireFormat implements WireFormat { } else { dsm.looseUnmarshal(this, data, dis); } + receivingMessage.set(false); return data; } else { + receivingMessage.set(false); return null; } } @@ -589,6 +595,10 @@ public final class OpenWireFormat implements WireFormat { public WireFormatInfo getPreferedWireFormatInfo() { return preferedWireFormatInfo; } + + public boolean inReceive() { + return receivingMessage.get(); + } public void renegotiateWireFormat(WireFormatInfo info) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 27c2085353..7f14aba23e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.thread.SchedulerTimerTask; +import org.apache.activemq.wireformat.WireFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -62,6 +63,8 @@ public class InactivityMonitor extends TransportFilter { private long writeCheckTime; private long initialDelayTime; + private WireFormat wireFormat; + private final Runnable readChecker = new Runnable() { long lastRunTime; public void run() { @@ -104,8 +107,9 @@ public class InactivityMonitor extends TransportFilter { } }; - public InactivityMonitor(Transport next) { + public InactivityMonitor(Transport next, WireFormat wireFormat) { super(next); + this.wireFormat = wireFormat; } public void stop() throws Exception { @@ -114,7 +118,7 @@ public class InactivityMonitor extends TransportFilter { } final void writeCheck() { - if (inSend.get()) { + if (inSend.get()) { if (LOG.isTraceEnabled()) { LOG.trace("A send is in progress"); } @@ -149,7 +153,7 @@ public class InactivityMonitor extends TransportFilter { } final void readCheck() { - if (inReceive.get()) { + if (inReceive.get() || wireFormat.inReceive()) { if (LOG.isTraceEnabled()) { LOG.trace("A receive is in progress"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java index b58a3b132c..165e977cf7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -202,4 +202,11 @@ public class StompWireFormat implements WireFormat { this.version = version; } + public boolean inReceive() { + //TODO implement the inactivity monitor + return false; + } + + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java index 32e1122564..d37708fd3d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java @@ -108,7 +108,7 @@ public class SslTransportFactory extends TcpTransportFactory { } } - transport = new InactivityMonitor(transport); + transport = new InactivityMonitor(transport, format); // Only need the WireFormatNegotiator if using openwire if (format instanceof OpenWireFormat) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 0f82b73ba9..9f38ce1d32 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -97,7 +97,7 @@ public class TcpTransportFactory extends TransportFactory { } if (isUseInactivityMonitor(transport)) { - transport = new InactivityMonitor(transport); + transport = new InactivityMonitor(transport, format); } // Only need the WireFormatNegotiator if using openwire diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java index de51ddfb63..23518cce97 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java @@ -91,7 +91,7 @@ public class UdpTransportFactory extends TransportFactory { } } - transport = new InactivityMonitor(transport); + transport = new InactivityMonitor(transport, format); if (format instanceof OpenWireFormat) { transport = configureClientSideNegotiator(transport, format, udpTransport); @@ -123,7 +123,7 @@ public class UdpTransportFactory extends TransportFactory { transport = TransportLoggerFactory.getInstance().createTransportLogger(transport); } - transport = new InactivityMonitor(transport); + transport = new InactivityMonitor(transport, format); if (!acceptServer && format instanceof OpenWireFormat) { transport = configureClientSideNegotiator(transport, format, udpTransport); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java index 8bd5291584..42662abd23 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java @@ -132,7 +132,7 @@ public class UdpTransportServer extends TransportServerSupport { } protected Transport configureTransport(Transport transport) { - transport = new InactivityMonitor(transport); + transport = new InactivityMonitor(transport, serverTransport.getWireFormat()); getAcceptListener().onAccept(transport); return transport; } diff --git a/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java index 5628ac4168..7879555fbe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java @@ -75,4 +75,11 @@ public class ObjectStreamWireFormat implements WireFormat { return 0; } + public boolean inReceive() { + // TODO implement the inactivity monitor + return false; + } + + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java b/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java index 2375fcd66b..b5cefa88f8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java @@ -61,4 +61,9 @@ public interface WireFormat { */ int getVersion(); + /** + * @return true if message is being received + */ + boolean inReceive(); + }