diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java index 60ea82966e..111cef4e44 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -254,6 +254,7 @@ public class StompWireFormat implements WireFormat { stream.write(val); } } + result = new String(stream.toByteArray(), "UTF-8"); } return result; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index 966a1572d5..878d98e7b2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -24,9 +24,17 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +48,9 @@ public class Stomp11Test extends CombinationTestSupport { private BrokerService broker; private StompConnection stompConnection = new StompConnection(); + private Connection connection; + private Session session; + private ActiveMQQueue queue; @Override protected void setUp() throws Exception { @@ -49,6 +60,13 @@ public class Stomp11Test extends CombinationTestSupport { broker.waitUntilStarted(); stompConnect(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri); + connection = cf.createConnection("system", "manager"); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = new ActiveMQQueue(getQueueName()); + connection.start(); + } private void stompConnect() throws IOException, URISyntaxException, UnknownHostException { @@ -347,8 +365,8 @@ public class Stomp11Test extends CombinationTestSupport { "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - frame = stompConnection.receiveFrame(); - assertTrue(frame.startsWith("MESSAGE")); + StompFrame stompFrame = stompConnection.receive(); + assertTrue(stompFrame.getAction().equals("MESSAGE")); frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL; @@ -570,4 +588,65 @@ public class Stomp11Test extends CombinationTestSupport { String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); } + + + public void testSendMessageWithStandardHeadersEncoded() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n" + + "accept-version:1.1" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "correlation-id:c1\\:\\n\\23\n" + "priority:3\n" + "type:t34:5\n" + "JMSXGroupID:abc\n" + "foo:a\\bc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(2500); + assertNotNull(message); + assertEquals("Hello World", message.getText()); + assertEquals("JMSCorrelationID", "c1\\:\n\\23", message.getJMSCorrelationID()); + assertEquals("getJMSType", "t34:5", message.getJMSType()); + assertEquals("getJMSPriority", 3, message.getJMSPriority()); + assertEquals("foo", "a\\bc", message.getStringProperty("foo")); + assertEquals("bar", "123", message.getStringProperty("bar")); + + assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); + ActiveMQTextMessage amqMessage = (ActiveMQTextMessage)message; + assertEquals("GroupID", "abc", amqMessage.getGroupID()); + } + + + public void testSubscribeWithMessageSentWithEncodedProperties() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n" + "accept-version:1.1" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage("Hello World"); + message.setStringProperty("s", "\\value:"); + producer.send(message); + + frame = stompConnection.receiveFrame(); + assertTrue("" + frame, frame.startsWith("MESSAGE")); + + int start = frame.indexOf("\ns:") + 3; + final String expectedEncoded = "\\\\value\\c"; + final String headerVal = frame.substring(start, start + expectedEncoded.length()); + assertEquals("" + frame, expectedEncoded, headerVal); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + }