Fix for AMQ-2511: Inactivity monitor does not time out stale connections.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@884267 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-11-25 20:56:28 +00:00
parent 48764becbd
commit 8978ac0772
21 changed files with 127 additions and 35 deletions

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.DataStructure;
@ -63,8 +62,6 @@ public final class OpenWireFormat implements WireFormat {
private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
private WireFormatInfo preferedWireFormatInfo; private WireFormatInfo preferedWireFormatInfo;
private AtomicBoolean receivingMessage = new AtomicBoolean(false);
public OpenWireFormat() { public OpenWireFormat() {
this(DEFAULT_VERSION); this(DEFAULT_VERSION);
} }
@ -353,7 +350,6 @@ public final class OpenWireFormat implements WireFormat {
public Object doUnmarshal(DataInput dis) throws IOException { public Object doUnmarshal(DataInput dis) throws IOException {
byte dataType = dis.readByte(); byte dataType = dis.readByte();
receivingMessage.set(true);
if (dataType != NULL_TYPE) { if (dataType != NULL_TYPE) {
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
if (dsm == null) { if (dsm == null) {
@ -367,10 +363,8 @@ public final class OpenWireFormat implements WireFormat {
} else { } else {
dsm.looseUnmarshal(this, data, dis); dsm.looseUnmarshal(this, data, dis);
} }
receivingMessage.set(false);
return data; return data;
} else { } else {
receivingMessage.set(false);
return null; return null;
} }
} }
@ -595,10 +589,6 @@ public final class OpenWireFormat implements WireFormat {
public WireFormatInfo getPreferedWireFormatInfo() { public WireFormatInfo getPreferedWireFormatInfo() {
return preferedWireFormatInfo; return preferedWireFormatInfo;
} }
public boolean inReceive() {
return receivingMessage.get();
}
public void renegotiateWireFormat(WireFormatInfo info) throws IOException { public void renegotiateWireFormat(WireFormatInfo info) throws IOException {

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
@ -56,6 +57,8 @@ public class InactivityMonitor extends TransportFilter {
private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
private SchedulerTimerTask writeCheckerTask; private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask; private SchedulerTimerTask readCheckerTask;
@ -153,7 +156,9 @@ public class InactivityMonitor extends TransportFilter {
} }
final void readCheck() { final void readCheck() {
if (inReceive.get() || wireFormat.inReceive()) { int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
if (inReceive.get() || currentCounter!=previousCounter ) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress"); LOG.trace("A receive is in progress");
} }

View File

@ -154,4 +154,12 @@ public interface Transport extends Service {
*/ */
void reconnect(URI uri) throws IOException; void reconnect(URI uri) throws IOException;
/**
* Returns a counter which gets incremented as data is read from the transport.
* It should only be used to determine if there is progress being made in reading the next command from the transport.
* The value may wrap into the negative numbers.
*
* @return a counter which gets incremented as data is read from the transport.
*/
int getReceiveCounter();
} }

View File

@ -137,4 +137,8 @@ public class TransportFilter implements TransportListener, Transport {
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
next.reconnect(uri); next.reconnect(uri);
} }
public int getReceiveCounter() {
return next.getReceiveCounter();
}
} }

View File

@ -868,4 +868,12 @@ public class FailoverTransport implements CompositeTransport {
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
add(new URI[] {uri}); add(new URI[] {uri});
} }
public int getReceiveCounter() {
Transport transport = connectedTransport.get();
if( transport == null ) {
return 0;
}
return transport.getReceiveCounter();
}
} }

View File

@ -586,4 +586,16 @@ public class FanoutTransport implements CompositeTransport {
public boolean isConnected() { public boolean isConnected() {
return connected; return connected;
} }
public int getReceiveCounter() {
int rc = 0;
synchronized (reconnectMutex) {
for (FanoutTransportHandler th : transports) {
if (th.transport != null) {
rc += th.transport.getReceiveCounter();
}
}
}
return rc;
}
} }

View File

@ -151,4 +151,8 @@ public class MockTransport extends DefaultTransportListener implements Transport
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
getNext().reconnect(uri); getNext().reconnect(uri);
} }
public int getReceiveCounter() {
return getNext().getReceiveCounter();
}
} }

View File

@ -202,11 +202,4 @@ public class StompWireFormat implements WireFormat {
this.version = version; this.version = version;
} }
public boolean inReceive() {
//TODO implement the inactivity monitor
return false;
}
} }

View File

@ -43,7 +43,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
internalBuffer = new byte[size]; internalBuffer = new byte[size];
} }
private void fill() throws IOException { protected void fill() throws IOException {
byte[] buffer = internalBuffer; byte[] buffer = internalBuffer;
count = 0; count = 0;
position = 0; position = 0;

View File

@ -118,6 +118,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
private Boolean keepAlive; private Boolean keepAlive;
private Boolean tcpNoDelay; private Boolean tcpNoDelay;
private Thread runnerThread; private Thread runnerThread;
private volatile int receiveCounter;
/** /**
* Connect to a remote Node - e.g. a Broker * Connect to a remote Node - e.g. a Broker
@ -504,7 +505,28 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
} }
protected void initializeStreams() throws Exception { protected void initializeStreams() throws Exception {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
@Override
public int read() throws IOException {
receiveCounter++;
return super.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
receiveCounter++;
return super.read(b, off, len);
}
@Override
public long skip(long n) throws IOException {
receiveCounter++;
return super.skip(n);
}
@Override
protected void fill() throws IOException {
receiveCounter++;
super.fill();
}
};
this.dataIn = new DataInputStream(buffIn); this.dataIn = new DataInputStream(buffIn);
buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(buffOut); this.dataOut = new DataOutputStream(buffOut);
@ -551,4 +573,9 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
} }
}); });
} }
public int getReceiveCounter() {
return receiveCounter;
}
} }

View File

@ -50,4 +50,7 @@ public interface CommandChannel extends Replayer, Service {
void setReplayAddress(SocketAddress address); void setReplayAddress(SocketAddress address);
void setReplayBuffer(ReplayBuffer replayBuffer); void setReplayBuffer(ReplayBuffer replayBuffer);
public int getReceiveCounter();
} }

View File

@ -54,6 +54,7 @@ public class CommandDatagramChannel extends CommandChannelSupport {
// writing // writing
private Object writeLock = new Object(); private Object writeLock = new Object();
private int defaultMarshalBufferSize = 64 * 1024; private int defaultMarshalBufferSize = 64 * 1024;
private volatile int receiveCounter;
public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
DatagramChannel channel, ByteBufferPool bufferPool) { DatagramChannel channel, ByteBufferPool bufferPool) {
@ -85,6 +86,8 @@ public class CommandDatagramChannel extends CommandChannelSupport {
if (readBuffer.limit() == 0) { if (readBuffer.limit() == 0) {
continue; continue;
} }
receiveCounter++;
from = headerMarshaller.createEndpoint(readBuffer, address); from = headerMarshaller.createEndpoint(readBuffer, address);
int remaining = readBuffer.remaining(); int remaining = readBuffer.remaining();
@ -252,4 +255,8 @@ public class CommandDatagramChannel extends CommandChannelSupport {
} }
} }
public int getReceiveCounter() {
return receiveCounter;
}
} }

View File

@ -48,6 +48,8 @@ public class CommandDatagramSocket extends CommandChannelSupport {
private Object readLock = new Object(); private Object readLock = new Object();
private Object writeLock = new Object(); private Object writeLock = new Object();
private volatile int receiveCounter;
public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
DatagramSocket channel) { DatagramSocket channel) {
super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller); super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
@ -70,8 +72,9 @@ public class CommandDatagramSocket extends CommandChannelSupport {
// TODO could use a DataInput implementation that talks direct // TODO could use a DataInput implementation that talks direct
// to the byte[] to avoid object allocation // to the byte[] to avoid object allocation
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData())); receiveCounter++;
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData(), 0, datagram.getLength()));
from = headerMarshaller.createEndpoint(datagram, dataIn); from = headerMarshaller.createEndpoint(datagram, dataIn);
answer = (Command)wireFormat.unmarshal(dataIn); answer = (Command)wireFormat.unmarshal(dataIn);
break; break;
@ -232,4 +235,8 @@ public class CommandDatagramSocket extends CommandChannelSupport {
protected ByteArrayOutputStream createByteArrayOutputStream() { protected ByteArrayOutputStream createByteArrayOutputStream() {
return new ByteArrayOutputStream(datagramSize); return new ByteArrayOutputStream(datagramSize);
} }
public int getReceiveCounter() {
return receiveCounter;
}
} }

View File

@ -462,4 +462,11 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
} }
return null; return null;
} }
public int getReceiveCounter() {
if (commandChannel == null) {
return 0;
}
return commandChannel.getReceiveCounter();
}
} }

View File

@ -60,6 +60,7 @@ public class VMTransport implements Transport, Task {
private final Object lazyInitMutext = new Object(); private final Object lazyInitMutext = new Object();
private final Valve enqueueValve = new Valve(true); private final Valve enqueueValve = new Valve(true);
private final AtomicBoolean stopping = new AtomicBoolean(); private final AtomicBoolean stopping = new AtomicBoolean();
private volatile int receiveCounter;
public VMTransport(URI location) { public VMTransport(URI location) {
this.location = location; this.location = location;
@ -110,6 +111,7 @@ public class VMTransport implements Transport, Task {
if( command == DISCONNECT ) { if( command == DISCONNECT ) {
transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else { } else {
peer.receiveCounter++;
transportListener.onCommand(command); transportListener.onCommand(command);
} }
} }
@ -124,6 +126,7 @@ public class VMTransport implements Transport, Task {
if (messageQueue != null && !async) { if (messageQueue != null && !async) {
Object command; Object command;
while ((command = messageQueue.poll()) != null && !stopping.get() ) { while ((command = messageQueue.poll()) != null && !stopping.get() ) {
receiveCounter++;
transportListener.onCommand(command); transportListener.onCommand(command);
} }
} }
@ -341,4 +344,8 @@ public class VMTransport implements Transport, Task {
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
throw new IOException("Not supported"); throw new IOException("Not supported");
} }
public int getReceiveCounter() {
return receiveCounter;
}
} }

View File

@ -75,11 +75,4 @@ public class ObjectStreamWireFormat implements WireFormat {
return 0; return 0;
} }
public boolean inReceive() {
// TODO implement the inactivity monitor
return false;
}
} }

View File

@ -61,9 +61,4 @@ public interface WireFormat {
*/ */
int getVersion(); int getVersion();
/**
* @return true if message is being received
*/
boolean inReceive();
} }

View File

@ -29,6 +29,7 @@ import org.apache.activemq.util.ServiceStopper;
public class StubTransport extends TransportSupport { public class StubTransport extends TransportSupport {
private Queue<Object> queue = new ConcurrentLinkedQueue<Object>(); private Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
private volatile int receiveCounter;
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
} }
@ -37,6 +38,7 @@ public class StubTransport extends TransportSupport {
} }
public void oneway(Object command) throws IOException { public void oneway(Object command) throws IOException {
receiveCounter++;
queue.add(command); queue.add(command);
} }
@ -48,4 +50,8 @@ public class StubTransport extends TransportSupport {
return null; return null;
} }
public int getReceiveCounter() {
return receiveCounter;
}
} }

View File

@ -64,5 +64,9 @@ public class BlockingQueueTransport extends TransportSupport {
} }
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
}
public int getReceiveCounter() {
return 0;
} }
} }

View File

@ -58,6 +58,7 @@ public class HttpClientTransport extends HttpTransportSupport {
private final String clientID = CLIENT_ID_GENERATOR.generateId(); private final String clientID = CLIENT_ID_GENERATOR.generateId();
private boolean trace; private boolean trace;
private GetMethod httpMethod; private GetMethod httpMethod;
private volatile int receiveCounter;
public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
super(wireFormat, remoteUrl); super(wireFormat, remoteUrl);
@ -135,6 +136,7 @@ public class HttpClientTransport extends HttpTransportSupport {
break; break;
} }
} else { } else {
receiveCounter++;
DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream()); DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
Object command = (Object)getTextWireFormat().unmarshal(stream); Object command = (Object)getTextWireFormat().unmarshal(stream);
if (command == null) { if (command == null) {
@ -221,4 +223,8 @@ public class HttpClientTransport extends HttpTransportSupport {
this.trace = trace; this.trace = trace;
} }
public int getReceiveCounter() {
return receiveCounter;
}
} }

View File

@ -47,7 +47,8 @@ public class HttpTransport extends HttpTransportSupport {
private HttpURLConnection receiveConnection; private HttpURLConnection receiveConnection;
private URL url; private URL url;
private String clientID; private String clientID;
private volatile int receiveCounter;
// private String sessionID; // private String sessionID;
public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException { public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException {
@ -102,6 +103,7 @@ public class HttpTransport extends HttpTransportSupport {
// checkSession(connection); // checkSession(connection);
// Create a String for the UTF content // Create a String for the UTF content
receiveCounter++;
InputStream is = connection.getInputStream(); InputStream is = connection.getInputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength() > 0 ? connection.getContentLength() : 1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength() > 0 ? connection.getContentLength() : 1024);
int c = 0; int c = 0;
@ -228,4 +230,8 @@ public class HttpTransport extends HttpTransportSupport {
} }
} }
public int getReceiveCounter() {
return receiveCounter;
}
} }