diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index 9731f7b1b4..62f77dfa92 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -25,9 +25,11 @@ public interface Stomp { byte BREAK = '\n'; byte COLON = ':'; byte ESCAPE = '\\'; + byte CARRIAGE_RETURN = '\r'; byte[] ESCAPE_ESCAPE_SEQ = { 92, 92 }; byte[] COLON_ESCAPE_SEQ = { 92, 99 }; byte[] NEWLINE_ESCAPE_SEQ = { 92, 110 }; + byte[] CARRIAGE_ESCAPE_SEQ = { 92, 114 }; String COMMA = ","; String V1_0 = "1.0"; diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java index a172e67795..cfc4c2d8ab 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -316,6 +316,11 @@ public class StompWireFormat implements WireFormat { case Stomp.COLON: stream.write(Stomp.COLON_ESCAPE_SEQ); break; + case Stomp.CARRIAGE_RETURN: + if(stompVersion.equals(Stomp.V1_2)) { + stream.write(Stomp.CARRIAGE_ESCAPE_SEQ); + break; + } default: stream.write(val); } @@ -347,6 +352,11 @@ public class StompWireFormat implements WireFormat { case 92: decoded.write(Stomp.ESCAPE); break; + case 114: + if(stompVersion.equals(Stomp.V1_2)) { + decoded.write(Stomp.CARRIAGE_RETURN); + break; + } default: stream.unread(next); decoded.write(value); diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index 8679684f8c..a2a284a987 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -16,11 +16,6 @@ */ package org.apache.activemq.transport.stomp; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.DataInputStream; import java.net.SocketTimeoutException; import java.util.concurrent.Executors; @@ -43,6 +38,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.*; + public class Stomp11Test extends StompTestSupport { private static final Logger LOG = LoggerFactory.getLogger(Stomp11Test.class); @@ -1208,6 +1205,44 @@ public class Stomp11Test extends StompTestSupport { doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual"); } + @Test(timeout = 60000) + public void testFrameHeaderEscapes() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + + "colon:\\c\n" + + "linefeed:\\n\n" + + "backslash:\\\\\n" + + "carriagereturn:\\r\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(message); + + frame = stompConnection.receiveFrame(); + + LOG.debug("Broker sent: " + frame); + assertTrue(frame.startsWith("MESSAGE")); + assertTrue(frame.contains("colon:\\c\n")); + assertTrue(frame.contains("linefeed:\\n\n")); + assertTrue(frame.contains("backslash:\\\\\n")); + assertFalse(frame.contains("carriagereturn:\\r\n")); + } + public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String ackMode) throws Exception { MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Message 1")); diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java index 3944c500af..073eef9ea8 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java @@ -1066,6 +1066,44 @@ public class Stomp12Test extends StompTestSupport { doTestMixedAckNackWithMessageAckIds(true); } + @Test(timeout = 60000) + public void testFrameHeaderEscapes() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.2\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + + "colon:\\c\n" + + "linefeed:\\n\n" + + "backslash:\\\\\n" + + "carriagereturn:\\r\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(message); + + frame = stompConnection.receiveFrame(); + + LOG.debug("Broker sent: " + frame); + assertTrue(frame.startsWith("MESSAGE")); + assertTrue(frame.contains("colon:\\c\n")); + assertTrue(frame.contains("linefeed:\\n\n")); + assertTrue(frame.contains("backslash:\\\\\n")); + assertTrue(frame.contains("carriagereturn:\\r\n")); + } + public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception { final int MESSAGE_COUNT = 20; diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 0404762a7c..2bc0d102b4 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -2285,6 +2285,45 @@ public class StompTest extends StompTestSupport { doTestAckInTransaction(false); } + @Test(timeout = 60000) + public void testFrameHeaderEscapes() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + + "colon:\\c\n" + + "linefeed:\\n\n" + + "backslash:\\\\\n" + + "carriagereturn:\\r\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(message); + + frame = stompConnection.receiveFrame(); + + LOG.debug("Broker sent: " + frame); + assertTrue(frame.startsWith("MESSAGE")); + assertFalse(frame.contains("colon:\\c\n")); + assertFalse(frame.contains("linefeed:\\n\n")); + assertFalse(frame.contains("backslash:\\\\\n")); + assertFalse(frame.contains("carriagereturn:\\\\r\n")); + + } + public void doTestAckInTransaction(boolean topic) throws Exception { String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;