From caabb6cb3de9c937b471f874c3ddf422152d198d Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 3 Feb 2010 14:42:33 +0000 Subject: [PATCH] Fix for: https://issues.apache.org/activemq/browse/AMQ-2490 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@906071 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp/FrameTranslator.java | 4 + .../activemq/transport/stomp/Stomp.java | 1 + .../activemq/transport/stomp/StompTest.java | 319 ++++++++++-------- 3 files changed, 178 insertions(+), 146 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java index 3a6c7d845e..300344df45 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java @@ -74,6 +74,10 @@ public interface FrameTranslator { headers.put(Stomp.Headers.Message.TYPE, message.getJMSType()); } + if (message.getUserID() != null) { + headers.put(Stomp.Headers.Message.USERID, message.getUserID()); + } + // now lets add all the message headers final Map properties = message.getProperties(); if (properties != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index 81185baf86..b603772657 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -76,6 +76,7 @@ public interface Stomp { String TIMESTAMP = "timestamp"; String TYPE = "type"; String SUBSCRIPTION = "subscription"; + String USERID = "JMSXUserID"; } public interface Subscribe { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 74d6c977bd..1bcddd6d7c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -58,53 +58,53 @@ public class StompTest extends CombinationTestSupport { private Connection connection; private Session session; private ActiveMQQueue queue; - private String xmlObject = "\n" + private String xmlObject = "\n" + " Dejan\n" - + " Belgrade\n" + + " Belgrade\n" + ""; - private String xmlMap = "\n" + private String xmlMap = "\n" + " \n" - + " name\n" + + " name\n" + " Dejan\n" - + " \n" - + " \n" + + " \n" + + " \n" + " city\n" - + " Belgrade\n" - + " \n" + + " Belgrade\n" + + " \n" + "\n"; - private String jsonObject = "{\"pojo\":{" + private String jsonObject = "{\"pojo\":{" + "\"name\":\"Dejan\"," - + "\"city\":\"Belgrade\"" + + "\"city\":\"Belgrade\"" + "}}"; - private String jsonMap = "{\"map\":{" + private String jsonMap = "{\"map\":{" + "\"entry\":[" + "{\"string\":[\"name\",\"Dejan\"]}," - + "{\"string\":[\"city\",\"Belgrade\"]}" - + "]" + + "{\"string\":[\"city\",\"Belgrade\"]}" + + "]" + "}}"; protected void setUp() throws Exception { // The order of the entries is different when using ibm jdk 5. if (System.getProperty("java.vendor").equals("IBM Corporation") && System.getProperty("java.version").startsWith("1.5")) { - xmlMap = "\n" - + " \n" - + " city\n" - + " Belgrade\n" - + " \n" + xmlMap = "\n" + " \n" - + " name\n" + + " city\n" + + " Belgrade\n" + + " \n" + + " \n" + + " name\n" + " Dejan\n" - + " \n" + + " \n" + "\n"; - jsonMap = "{\"map\":{" + jsonMap = "{\"map\":{" + "\"entry\":[" - + "{\"string\":[\"city\",\"Belgrade\"]}," + + "{\"string\":[\"city\",\"Belgrade\"]}," + "{\"string\":[\"name\",\"Dejan\"]}" - + "]" + + "]" + "}}"; } broker = BrokerFactory.createBroker(new URI(confUri)); @@ -133,14 +133,14 @@ public class StompTest extends CombinationTestSupport { } protected void tearDown() throws Exception { - try { - connection.close(); - stompDisconnect(); - } catch(Exception e) { - // Some tests explicitly disconnect from stomp so can ignore - } finally { - broker.stop(); - } + try { + connection.close(); + stompDisconnect(); + } catch(Exception e) { + // Some tests explicitly disconnect from stomp so can ignore + } finally { + broker.stop(); + } } private void stompDisconnect() throws IOException { @@ -470,9 +470,7 @@ public class StompTest extends CombinationTestSupport { LOG.info("Received frame: " + frame); fail("No message should have been received since subscription was removed"); } catch (SocketTimeoutException e) { - } - } public void testTransactionCommit() throws Exception { @@ -554,29 +552,28 @@ public class StompTest extends CombinationTestSupport { assertClients(1); } - + public void testConnectNotAuthenticatedWrongUser() throws Exception { String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); String f = stompConnection.receiveFrame(); - - assertTrue(f.startsWith("ERROR")); + + assertTrue(f.startsWith("ERROR")); assertClients(1); - } - + public void testConnectNotAuthenticatedWrongPassword() throws Exception { - + String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); String f = stompConnection.receiveFrame(); - - assertTrue(f.startsWith("ERROR")); - assertClients(1); - } - + + assertTrue(f.startsWith("ERROR")); + assertClients(1); + } + public void testSendNotAuthorized() throws Exception { String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; @@ -590,9 +587,8 @@ public class StompTest extends CombinationTestSupport { stompConnection.sendFrame(frame); String f = stompConnection.receiveFrame(); assertTrue(f.startsWith("ERROR")); - } - + public void testSubscribeNotAuthorized() throws Exception { String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; @@ -607,8 +603,8 @@ public class StompTest extends CombinationTestSupport { String f = stompConnection.receiveFrame(); assertTrue(f.startsWith("ERROR")); - } - + } + public void testTransformationUnknownTranslator() throws Exception { MessageConsumer consumer = session.createConsumer(queue); @@ -624,9 +620,9 @@ public class StompTest extends CombinationTestSupport { TextMessage message = (TextMessage)consumer.receive(2500); assertNotNull(message); - assertEquals("Hello World", message.getText()); + assertEquals("Hello World", message.getText()); } - + public void testTransformationFailed() throws Exception { MessageConsumer consumer = session.createConsumer(queue); @@ -643,9 +639,9 @@ public class StompTest extends CombinationTestSupport { TextMessage message = (TextMessage)consumer.receive(2500); assertNotNull(message); assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR)); - assertEquals("Hello World", message.getText()); + assertEquals("Hello World", message.getText()); } - + public void testTransformationSendXMLObject() throws Exception { MessageConsumer consumer = session.createConsumer(queue); @@ -654,7 +650,7 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject + Stomp.NULL; stompConnection.sendFrame(frame); @@ -663,8 +659,8 @@ public class StompTest extends CombinationTestSupport { assertNotNull(message); SamplePojo object = (SamplePojo)message.getObject(); assertEquals("Dejan", object.getName()); - } - + } + public void testTransformationSendJSONObject() throws Exception { MessageConsumer consumer = session.createConsumer(queue); @@ -673,7 +669,7 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject + Stomp.NULL; stompConnection.sendFrame(frame); @@ -683,13 +679,13 @@ public class StompTest extends CombinationTestSupport { SamplePojo object = (SamplePojo)message.getObject(); assertEquals("Dejan", object.getName()); } - + public void testTransformationSubscribeXML() throws Exception { - + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); producer.send(message); - + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -698,7 +694,7 @@ public class StompTest extends CombinationTestSupport { frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - + frame = stompConnection.receiveFrame(); assertTrue(frame.trim().endsWith(xmlObject)); @@ -706,12 +702,12 @@ public class StompTest extends CombinationTestSupport { frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); } - + public void testTransformationReceiveJSONObject() throws Exception { MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); producer.send(message); - + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -720,21 +716,21 @@ public class StompTest extends CombinationTestSupport { frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - + frame = stompConnection.receiveFrame(); assertTrue(frame.trim().endsWith(jsonObject)); - + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); + stompConnection.sendFrame(frame); } - + public void testTransformationReceiveXMLObject() throws Exception { - + MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); producer.send(message); - + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -743,15 +739,15 @@ public class StompTest extends CombinationTestSupport { frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - + frame = stompConnection.receiveFrame(); assertTrue(frame.trim().endsWith(xmlObject)); - + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - } - + } + public void testTransformationNotOverrideSubscription() throws Exception { MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); @@ -866,7 +862,7 @@ public class StompTest extends CombinationTestSupport { message.setString("name", "Dejan"); message.setString("city", "Belgrade"); producer.send(message); - + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -875,81 +871,81 @@ public class StompTest extends CombinationTestSupport { frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - + frame = stompConnection.receiveFrame(); assertTrue(frame.trim().endsWith(jsonMap.trim())); - + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); + stompConnection.sendFrame(frame); } - + public void testDurableUnsub() throws Exception { - // get broker JMX view - + // get broker JMX view + String domain = "org.apache.activemq"; ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost"); - - BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); - - // connect + + BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); + + // connect String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); assertEquals(view.getDurableTopicSubscribers().length, 0); - + // subscribe frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); // wait a bit for MBean to get refreshed try { - Thread.sleep(400); + Thread.sleep(400); } catch (InterruptedException e){} - + assertEquals(view.getDurableTopicSubscribers().length, 1); // disconnect frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); try { - Thread.sleep(400); + Thread.sleep(400); } catch (InterruptedException e){} - + //reconnect stompConnect(); - // connect + // connect frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); + stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - + // unsubscribe frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - stompConnection.sendFrame(frame); + stompConnection.sendFrame(frame); try { - Thread.sleep(400); + Thread.sleep(400); } catch (InterruptedException e){} assertEquals(view.getDurableTopicSubscribers().length, 0); } - - public void testMessageIdHeader() throws Exception { + + public void testMessageIdHeader() throws Exception { stompConnection.connect("system", "manager"); - + stompConnection.begin("tx1"); stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null); stompConnection.commit("tx1"); - + stompConnection.subscribe("/queue/" + getQueueName()); StompFrame stompMessage = stompConnection.receive(); - assertNull(stompMessage.getHeaders().get("transaction")); + assertNull(stompMessage.getHeaders().get("transaction")); } - + public void testPrefetchSize() throws Exception { stompConnection.connect("system", "manager"); - + HashMap headers = new HashMap(); headers.put("activemq.prefetchSize", "1"); stompConnection.subscribe("/queue/" + getQueueName(), "client", headers); @@ -960,85 +956,80 @@ public class StompTest extends CombinationTestSupport { sendMessage("message 3"); sendMessage("message 4"); sendMessage("message 5"); - - StompFrame frame = stompConnection.receive(); assertEquals(frame.getBody(), "message 1"); - + stompConnection.begin("tx1"); stompConnection.ack(frame, "tx1"); StompFrame frame1 = stompConnection.receive(); assertEquals(frame1.getBody(), "message 2"); - + try { - StompFrame frame2 = stompConnection.receive(500); - if (frame2 != null) { - fail("Should not have received the second message"); - } + StompFrame frame2 = stompConnection.receive(500); + if (frame2 != null) { + fail("Should not have received the second message"); + } } catch (SocketTimeoutException soe) {} - + stompConnection.ack(frame1, "tx1"); Thread.sleep(1000); stompConnection.abort("tx1"); - + stompConnection.begin("tx2"); - + // Previously delivered message need to get re-acked... stompConnection.ack(frame, "tx2"); stompConnection.ack(frame1, "tx2"); - + StompFrame frame3 = stompConnection.receive(); assertEquals(frame3.getBody(), "message 3"); stompConnection.ack(frame3, "tx2"); - + StompFrame frame4 = stompConnection.receive(); assertEquals(frame4.getBody(), "message 4"); stompConnection.ack(frame4, "tx2"); - + stompConnection.commit("tx2"); - + stompConnection.begin("tx3"); StompFrame frame5 = stompConnection.receive(); assertEquals(frame5.getBody(), "message 5"); stompConnection.ack(frame5, "tx3"); stompConnection.commit("tx3"); - + stompDisconnect(); - } - + public void testTransactionsWithMultipleDestinations() throws Exception { - stompConnection.connect("system", "manager"); - + stompConnection.connect("system", "manager"); + HashMap headers = new HashMap(); headers.put("activemq.prefetchSize", "1"); headers.put("activemq.exclusive", "true"); - - stompConnection.subscribe("/queue/test1", "client", headers); - - stompConnection.begin("ID:tx1"); - - headers.clear(); - headers.put("receipt", "ID:msg1"); - stompConnection.send("/queue/test2", "test message", "ID:tx1", headers); - - stompConnection.commit("ID:tx1"); - - // make sure connection is active after commit - Thread.sleep(1000); - stompConnection.send("/queue/test1", "another message"); - - StompFrame frame = stompConnection.receive(500); - System.out.println(frame); - assertNotNull(frame); - - - stompConnection.disconnect(); + + stompConnection.subscribe("/queue/test1", "client", headers); + + stompConnection.begin("ID:tx1"); + + headers.clear(); + headers.put("receipt", "ID:msg1"); + stompConnection.send("/queue/test2", "test message", "ID:tx1", headers); + + stompConnection.commit("ID:tx1"); + + // make sure connection is active after commit + Thread.sleep(1000); + stompConnection.send("/queue/test1", "another message"); + + StompFrame frame = stompConnection.receive(500); + assertNotNull(frame); + + stompConnection.disconnect(); } - + public void testTempDestination() throws Exception { String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; @@ -1046,7 +1037,7 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - + frame = "SUBSCRIBE\n" + "destination:/temp-queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -1056,7 +1047,45 @@ public class StompTest extends CombinationTestSupport { StompFrame message = stompConnection.receive(1000); assertEquals("Hello World", message.getBody()); } - + + public void testJMSXUserIDIsSetInMessage() throws Exception { + + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + TextMessage message = (TextMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID)); + + } + + public void testJMSXUserIDIsSetInStompMessage() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame message = stompConnection.receive(1000); + assertEquals("system", message.getHeaders().get(Stomp.Headers.Message.USERID)); + } + protected void assertClients(int expected) throws Exception { org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); int actual = clients.length; @@ -1071,5 +1100,3 @@ public class StompTest extends CombinationTestSupport { Thread.sleep(2000); } } - -