From 78265ea2110c689047845878d006d45cf41e268b Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 23 Oct 2012 19:13:19 +0000 Subject: [PATCH] fix and test for: https://issues.apache.org/jira/browse/AMQ-4106 NIO based transports weren't updating the receive counter in the TcpTransport which can lead to the inactivity monitor mistakenly shutting down the connection. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1401394 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/amqp/AmqpNioTransport.java | 23 +++++---- .../transport/nio/NIOSSLTransport.java | 5 +- .../activemq/transport/stomp/StompCodec.java | 11 ++++ .../transport/stomp/StompNIOTransport.java | 4 +- .../activemq/transport/tcp/TcpTransport.java | 2 +- .../activemq/transport/stomp/Stomp11Test.java | 51 +++++++++++++++++++ 6 files changed, 82 insertions(+), 14 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java index 6f4550ff5f..d4ebafffca 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java @@ -16,16 +16,6 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.transport.nio.NIOOutputStream; -import org.apache.activemq.transport.nio.SelectorManager; -import org.apache.activemq.transport.nio.SelectorSelection; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.wireformat.WireFormat; -import org.fusesource.hawtbuf.DataByteArrayInputStream; - -import javax.net.SocketFactory; import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; @@ -36,6 +26,16 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import javax.net.SocketFactory; + +import org.apache.activemq.transport.nio.NIOOutputStream; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; + /** * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO */ @@ -97,11 +97,12 @@ public class AmqpNioTransport extends TcpTransport { break; } + receiveCounter += readSize; + inputBuffer.flip(); doConsume(AmqpSupport.toBuffer(inputBuffer)); // clear the buffer inputBuffer.clear(); - } } catch (IOException e) { onException(e); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java index 913ebecaec..e6552894e9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java @@ -178,8 +178,9 @@ public class NIOSSLTransport extends NIOTransport { } int readCount = secureRead(plain); - if (readCount == 0) + if (readCount == 0) { break; + } // channel is closed, cleanup if (readCount == -1) { @@ -187,6 +188,8 @@ public class NIOSSLTransport extends NIOTransport { selection.close(); break; } + + receiveCounter += readCount; } if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java index 8349ca3aab..f1e1b5bd40 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java @@ -39,6 +39,7 @@ public class StompCodec { int contentLength = -1; int readLength = 0; int previousByte = -1; + boolean awaitingCommandStart = true; String version = Stomp.DEFAULT_VERSION; public StompCodec(TcpTransport transport) { @@ -56,6 +57,14 @@ public class StompCodec { } if (!processedHeaders) { + + // skip heart beat commands. + if (awaitingCommandStart && b == '\n') { + continue; + } else { + awaitingCommandStart = false; // non-newline indicates next frame. + } + currentCommand.write(b); // end of headers section, parse action and header if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) { @@ -74,6 +83,7 @@ public class StompCodec { processedHeaders = true; currentCommand.reset(); } + } else { if (contentLength == -1) { @@ -102,6 +112,7 @@ public class StompCodec { StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray()); transport.doConsume(frame); processedHeaders = false; + awaitingCommandStart = true; currentCommand.reset(); contentLength = -1; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java index db66059926..2d760d0013 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java @@ -97,11 +97,14 @@ public class StompNIOTransport extends TcpTransport { selection.close(); break; } + // nothing more to read, break if (readSize == 0) { break; } + receiveCounter += readSize; + inputBuffer.flip(); ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array()); @@ -109,7 +112,6 @@ public class StompNIOTransport extends TcpTransport { // clear the buffer inputBuffer.clear(); - } } catch (IOException e) { onException(e); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index fe59b961c1..a0ff42d9ed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -128,13 +128,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S protected int minmumWireFormatVersion; protected SocketFactory socketFactory; protected final AtomicReference stoppedLatch = new AtomicReference(); + protected volatile int receiveCounter; private Map socketOptions; private int soLinger = Integer.MIN_VALUE; private Boolean keepAlive; private Boolean tcpNoDelay; private Thread runnerThread; - private volatile int receiveCounter; /** * Connect to a remote Node - e.g. a Broker diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index ccaa6c0c21..c620b2e140 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -23,6 +23,8 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.jms.Connection; @@ -290,6 +292,55 @@ public class Stomp11Test extends CombinationTestSupport { assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000); } + public void testHeartbeatsKeepsConnectionOpen() throws Exception { + + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "heart-beat:2000,0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(connectFrame); + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.1") >= 0); + assertTrue(f.indexOf("heart-beat:") >= 0); + assertTrue(f.indexOf("session:") >= 0); + LOG.debug("Broker sent: " + f); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + stompConnection.sendFrame(message); + + ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + + service.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + LOG.info("Sending next KeepAlive"); + stompConnection.keepAlive(); + } catch (Exception e) { + } + } + }, 1, 1, TimeUnit.SECONDS); + + TimeUnit.SECONDS.sleep(20); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame stompFrame = stompConnection.receive(); + assertTrue(stompFrame.getAction().equals("MESSAGE")); + + service.shutdownNow(); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + public void testSendAfterMissingHeartbeat() throws Exception { String connectFrame = "STOMP\n" + "login:system\n" +