git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@742458 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-02-09 12:49:42 +00:00
parent 4dda0d8353
commit 15065b71c7
9 changed files with 41 additions and 8 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.DataStructure;
@ -61,6 +62,8 @@ public final class OpenWireFormat implements WireFormat {
private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
private WireFormatInfo preferedWireFormatInfo; private WireFormatInfo preferedWireFormatInfo;
private AtomicBoolean receivingMessage = new AtomicBoolean(false);
public OpenWireFormat() { public OpenWireFormat() {
this(DEFAULT_VERSION); this(DEFAULT_VERSION);
@ -350,6 +353,7 @@ public final class OpenWireFormat implements WireFormat {
public Object doUnmarshal(DataInput dis) throws IOException { public Object doUnmarshal(DataInput dis) throws IOException {
byte dataType = dis.readByte(); byte dataType = dis.readByte();
receivingMessage.set(true);
if (dataType != NULL_TYPE) { if (dataType != NULL_TYPE) {
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
if (dsm == null) { if (dsm == null) {
@ -363,8 +367,10 @@ public final class OpenWireFormat implements WireFormat {
} else { } else {
dsm.looseUnmarshal(this, data, dis); dsm.looseUnmarshal(this, data, dis);
} }
receivingMessage.set(false);
return data; return data;
} else { } else {
receivingMessage.set(false);
return null; return null;
} }
} }
@ -589,6 +595,10 @@ public final class OpenWireFormat implements WireFormat {
public WireFormatInfo getPreferedWireFormatInfo() { public WireFormatInfo getPreferedWireFormatInfo() {
return preferedWireFormatInfo; return preferedWireFormatInfo;
} }
public boolean inReceive() {
return receivingMessage.get();
}
public void renegotiateWireFormat(WireFormatInfo info) throws IOException { public void renegotiateWireFormat(WireFormatInfo info) throws IOException {

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask; import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -62,6 +63,8 @@ public class InactivityMonitor extends TransportFilter {
private long writeCheckTime; private long writeCheckTime;
private long initialDelayTime; private long initialDelayTime;
private WireFormat wireFormat;
private final Runnable readChecker = new Runnable() { private final Runnable readChecker = new Runnable() {
long lastRunTime; long lastRunTime;
public void run() { public void run() {
@ -104,8 +107,9 @@ public class InactivityMonitor extends TransportFilter {
} }
}; };
public InactivityMonitor(Transport next) { public InactivityMonitor(Transport next, WireFormat wireFormat) {
super(next); super(next);
this.wireFormat = wireFormat;
} }
public void stop() throws Exception { public void stop() throws Exception {
@ -114,7 +118,7 @@ public class InactivityMonitor extends TransportFilter {
} }
final void writeCheck() { final void writeCheck() {
if (inSend.get()) { if (inSend.get()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("A send is in progress"); LOG.trace("A send is in progress");
} }
@ -149,7 +153,7 @@ public class InactivityMonitor extends TransportFilter {
} }
final void readCheck() { final void readCheck() {
if (inReceive.get()) { if (inReceive.get() || wireFormat.inReceive()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress"); LOG.trace("A receive is in progress");
} }

View File

@ -202,4 +202,11 @@ public class StompWireFormat implements WireFormat {
this.version = version; this.version = version;
} }
public boolean inReceive() {
//TODO implement the inactivity monitor
return false;
}
} }

View File

@ -108,7 +108,7 @@ public class SslTransportFactory extends TcpTransportFactory {
} }
} }
transport = new InactivityMonitor(transport); transport = new InactivityMonitor(transport, format);
// Only need the WireFormatNegotiator if using openwire // Only need the WireFormatNegotiator if using openwire
if (format instanceof OpenWireFormat) { if (format instanceof OpenWireFormat) {

View File

@ -97,7 +97,7 @@ public class TcpTransportFactory extends TransportFactory {
} }
if (isUseInactivityMonitor(transport)) { if (isUseInactivityMonitor(transport)) {
transport = new InactivityMonitor(transport); transport = new InactivityMonitor(transport, format);
} }
// Only need the WireFormatNegotiator if using openwire // Only need the WireFormatNegotiator if using openwire

View File

@ -91,7 +91,7 @@ public class UdpTransportFactory extends TransportFactory {
} }
} }
transport = new InactivityMonitor(transport); transport = new InactivityMonitor(transport, format);
if (format instanceof OpenWireFormat) { if (format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport); transport = configureClientSideNegotiator(transport, format, udpTransport);
@ -123,7 +123,7 @@ public class UdpTransportFactory extends TransportFactory {
transport = TransportLoggerFactory.getInstance().createTransportLogger(transport); transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
} }
transport = new InactivityMonitor(transport); transport = new InactivityMonitor(transport, format);
if (!acceptServer && format instanceof OpenWireFormat) { if (!acceptServer && format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport); transport = configureClientSideNegotiator(transport, format, udpTransport);

View File

@ -132,7 +132,7 @@ public class UdpTransportServer extends TransportServerSupport {
} }
protected Transport configureTransport(Transport transport) { protected Transport configureTransport(Transport transport) {
transport = new InactivityMonitor(transport); transport = new InactivityMonitor(transport, serverTransport.getWireFormat());
getAcceptListener().onAccept(transport); getAcceptListener().onAccept(transport);
return transport; return transport;
} }

View File

@ -75,4 +75,11 @@ public class ObjectStreamWireFormat implements WireFormat {
return 0; return 0;
} }
public boolean inReceive() {
// TODO implement the inactivity monitor
return false;
}
} }

View File

@ -61,4 +61,9 @@ public interface WireFormat {
*/ */
int getVersion(); int getVersion();
/**
* @return true if message is being received
*/
boolean inReceive();
} }