NIO based transports weren't updating the receive counter in the TcpTransport which can lead to the inactivity monitor mistakenly shutting down the connection.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1401394 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-10-23 19:13:19 +00:00
parent 79127ae440
commit 78265ea211
6 changed files with 82 additions and 14 deletions

View File

@ -16,16 +16,6 @@
*/ */
package org.apache.activemq.transport.amqp; package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
import javax.net.SocketFactory;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
@ -36,6 +26,16 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
/** /**
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
*/ */
@ -97,11 +97,12 @@ public class AmqpNioTransport extends TcpTransport {
break; break;
} }
receiveCounter += readSize;
inputBuffer.flip(); inputBuffer.flip();
doConsume(AmqpSupport.toBuffer(inputBuffer)); doConsume(AmqpSupport.toBuffer(inputBuffer));
// clear the buffer // clear the buffer
inputBuffer.clear(); inputBuffer.clear();
} }
} catch (IOException e) { } catch (IOException e) {
onException(e); onException(e);

View File

@ -178,8 +178,9 @@ public class NIOSSLTransport extends NIOTransport {
} }
int readCount = secureRead(plain); int readCount = secureRead(plain);
if (readCount == 0) if (readCount == 0) {
break; break;
}
// channel is closed, cleanup // channel is closed, cleanup
if (readCount == -1) { if (readCount == -1) {
@ -187,6 +188,8 @@ public class NIOSSLTransport extends NIOTransport {
selection.close(); selection.close();
break; break;
} }
receiveCounter += readCount;
} }
if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {

View File

@ -39,6 +39,7 @@ public class StompCodec {
int contentLength = -1; int contentLength = -1;
int readLength = 0; int readLength = 0;
int previousByte = -1; int previousByte = -1;
boolean awaitingCommandStart = true;
String version = Stomp.DEFAULT_VERSION; String version = Stomp.DEFAULT_VERSION;
public StompCodec(TcpTransport transport) { public StompCodec(TcpTransport transport) {
@ -56,6 +57,14 @@ public class StompCodec {
} }
if (!processedHeaders) { if (!processedHeaders) {
// skip heart beat commands.
if (awaitingCommandStart && b == '\n') {
continue;
} else {
awaitingCommandStart = false; // non-newline indicates next frame.
}
currentCommand.write(b); currentCommand.write(b);
// end of headers section, parse action and header // end of headers section, parse action and header
if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) { if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
@ -74,6 +83,7 @@ public class StompCodec {
processedHeaders = true; processedHeaders = true;
currentCommand.reset(); currentCommand.reset();
} }
} else { } else {
if (contentLength == -1) { if (contentLength == -1) {
@ -102,6 +112,7 @@ public class StompCodec {
StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray()); StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
transport.doConsume(frame); transport.doConsume(frame);
processedHeaders = false; processedHeaders = false;
awaitingCommandStart = true;
currentCommand.reset(); currentCommand.reset();
contentLength = -1; contentLength = -1;
} }

View File

@ -97,11 +97,14 @@ public class StompNIOTransport extends TcpTransport {
selection.close(); selection.close();
break; break;
} }
// nothing more to read, break // nothing more to read, break
if (readSize == 0) { if (readSize == 0) {
break; break;
} }
receiveCounter += readSize;
inputBuffer.flip(); inputBuffer.flip();
ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array()); ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
@ -109,7 +112,6 @@ public class StompNIOTransport extends TcpTransport {
// clear the buffer // clear the buffer
inputBuffer.clear(); inputBuffer.clear();
} }
} catch (IOException e) { } catch (IOException e) {
onException(e); onException(e);

View File

@ -128,13 +128,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected int minmumWireFormatVersion; protected int minmumWireFormatVersion;
protected SocketFactory socketFactory; protected SocketFactory socketFactory;
protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>(); protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
protected volatile int receiveCounter;
private Map<String, Object> socketOptions; private Map<String, Object> socketOptions;
private int soLinger = Integer.MIN_VALUE; private int soLinger = Integer.MIN_VALUE;
private Boolean keepAlive; private Boolean keepAlive;
private Boolean tcpNoDelay; private Boolean tcpNoDelay;
private Thread runnerThread; private Thread runnerThread;
private volatile int receiveCounter;
/** /**
* Connect to a remote Node - e.g. a Broker * Connect to a remote Node - e.g. a Broker

View File

@ -23,6 +23,8 @@ import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
@ -290,6 +292,55 @@ public class Stomp11Test extends CombinationTestSupport {
assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000); assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000);
} }
public void testHeartbeatsKeepsConnectionOpen() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.1\n" +
"heart-beat:2000,0\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.1") >= 0);
assertTrue(f.indexOf("heart-beat:") >= 0);
assertTrue(f.indexOf("session:") >= 0);
LOG.debug("Broker sent: " + f);
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(message);
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
LOG.info("Sending next KeepAlive");
stompConnection.keepAlive();
} catch (Exception e) {
}
}
}, 1, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(20);
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame stompFrame = stompConnection.receive();
assertTrue(stompFrame.getAction().equals("MESSAGE"));
service.shutdownNow();
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testSendAfterMissingHeartbeat() throws Exception { public void testSendAfterMissingHeartbeat() throws Exception {
String connectFrame = "STOMP\n" + "login:system\n" + String connectFrame = "STOMP\n" + "login:system\n" +