git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@906071 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2010-02-03 14:42:33 +00:00
parent 5170a8bba3
commit caabb6cb3d
3 changed files with 178 additions and 146 deletions

View File

@ -74,6 +74,10 @@ public interface FrameTranslator {
headers.put(Stomp.Headers.Message.TYPE, message.getJMSType()); headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
} }
if (message.getUserID() != null) {
headers.put(Stomp.Headers.Message.USERID, message.getUserID());
}
// now lets add all the message headers // now lets add all the message headers
final Map<String, Object> properties = message.getProperties(); final Map<String, Object> properties = message.getProperties();
if (properties != null) { if (properties != null) {

View File

@ -76,6 +76,7 @@ public interface Stomp {
String TIMESTAMP = "timestamp"; String TIMESTAMP = "timestamp";
String TYPE = "type"; String TYPE = "type";
String SUBSCRIPTION = "subscription"; String SUBSCRIPTION = "subscription";
String USERID = "JMSXUserID";
} }
public interface Subscribe { public interface Subscribe {

View File

@ -58,53 +58,53 @@ public class StompTest extends CombinationTestSupport {
private Connection connection; private Connection connection;
private Session session; private Session session;
private ActiveMQQueue queue; private ActiveMQQueue queue;
private String xmlObject = "<pojo>\n" private String xmlObject = "<pojo>\n"
+ " <name>Dejan</name>\n" + " <name>Dejan</name>\n"
+ " <city>Belgrade</city>\n" + " <city>Belgrade</city>\n"
+ "</pojo>"; + "</pojo>";
private String xmlMap = "<map>\n" private String xmlMap = "<map>\n"
+ " <entry>\n" + " <entry>\n"
+ " <string>name</string>\n" + " <string>name</string>\n"
+ " <string>Dejan</string>\n" + " <string>Dejan</string>\n"
+ " </entry>\n" + " </entry>\n"
+ " <entry>\n" + " <entry>\n"
+ " <string>city</string>\n" + " <string>city</string>\n"
+ " <string>Belgrade</string>\n" + " <string>Belgrade</string>\n"
+ " </entry>\n" + " </entry>\n"
+ "</map>\n"; + "</map>\n";
private String jsonObject = "{\"pojo\":{" private String jsonObject = "{\"pojo\":{"
+ "\"name\":\"Dejan\"," + "\"name\":\"Dejan\","
+ "\"city\":\"Belgrade\"" + "\"city\":\"Belgrade\""
+ "}}"; + "}}";
private String jsonMap = "{\"map\":{" private String jsonMap = "{\"map\":{"
+ "\"entry\":[" + "\"entry\":["
+ "{\"string\":[\"name\",\"Dejan\"]}," + "{\"string\":[\"name\",\"Dejan\"]},"
+ "{\"string\":[\"city\",\"Belgrade\"]}" + "{\"string\":[\"city\",\"Belgrade\"]}"
+ "]" + "]"
+ "}}"; + "}}";
protected void setUp() throws Exception { protected void setUp() throws Exception {
// The order of the entries is different when using ibm jdk 5. // The order of the entries is different when using ibm jdk 5.
if (System.getProperty("java.vendor").equals("IBM Corporation") if (System.getProperty("java.vendor").equals("IBM Corporation")
&& System.getProperty("java.version").startsWith("1.5")) { && System.getProperty("java.version").startsWith("1.5")) {
xmlMap = "<map>\n" xmlMap = "<map>\n"
+ " <entry>\n"
+ " <string>city</string>\n"
+ " <string>Belgrade</string>\n"
+ " </entry>\n"
+ " <entry>\n" + " <entry>\n"
+ " <string>name</string>\n" + " <string>city</string>\n"
+ " <string>Belgrade</string>\n"
+ " </entry>\n"
+ " <entry>\n"
+ " <string>name</string>\n"
+ " <string>Dejan</string>\n" + " <string>Dejan</string>\n"
+ " </entry>\n" + " </entry>\n"
+ "</map>\n"; + "</map>\n";
jsonMap = "{\"map\":{" jsonMap = "{\"map\":{"
+ "\"entry\":[" + "\"entry\":["
+ "{\"string\":[\"city\",\"Belgrade\"]}," + "{\"string\":[\"city\",\"Belgrade\"]},"
+ "{\"string\":[\"name\",\"Dejan\"]}" + "{\"string\":[\"name\",\"Dejan\"]}"
+ "]" + "]"
+ "}}"; + "}}";
} }
broker = BrokerFactory.createBroker(new URI(confUri)); broker = BrokerFactory.createBroker(new URI(confUri));
@ -133,14 +133,14 @@ public class StompTest extends CombinationTestSupport {
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
try { try {
connection.close(); connection.close();
stompDisconnect(); stompDisconnect();
} catch(Exception e) { } catch(Exception e) {
// Some tests explicitly disconnect from stomp so can ignore // Some tests explicitly disconnect from stomp so can ignore
} finally { } finally {
broker.stop(); broker.stop();
} }
} }
private void stompDisconnect() throws IOException { private void stompDisconnect() throws IOException {
@ -470,9 +470,7 @@ public class StompTest extends CombinationTestSupport {
LOG.info("Received frame: " + frame); LOG.info("Received frame: " + frame);
fail("No message should have been received since subscription was removed"); fail("No message should have been received since subscription was removed");
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
} }
} }
public void testTransactionCommit() throws Exception { public void testTransactionCommit() throws Exception {
@ -554,29 +552,28 @@ public class StompTest extends CombinationTestSupport {
assertClients(1); assertClients(1);
} }
public void testConnectNotAuthenticatedWrongUser() throws Exception { public void testConnectNotAuthenticatedWrongUser() throws Exception {
String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame(); String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR")); assertTrue(f.startsWith("ERROR"));
assertClients(1); assertClients(1);
} }
public void testConnectNotAuthenticatedWrongPassword() throws Exception { public void testConnectNotAuthenticatedWrongPassword() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame(); String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR")); assertTrue(f.startsWith("ERROR"));
assertClients(1); assertClients(1);
} }
public void testSendNotAuthorized() throws Exception { public void testSendNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL;
@ -590,9 +587,8 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame(); String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR")); assertTrue(f.startsWith("ERROR"));
} }
public void testSubscribeNotAuthorized() throws Exception { public void testSubscribeNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL;
@ -607,8 +603,8 @@ public class StompTest extends CombinationTestSupport {
String f = stompConnection.receiveFrame(); String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR")); assertTrue(f.startsWith("ERROR"));
} }
public void testTransformationUnknownTranslator() throws Exception { public void testTransformationUnknownTranslator() throws Exception {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
@ -624,9 +620,9 @@ public class StompTest extends CombinationTestSupport {
TextMessage message = (TextMessage)consumer.receive(2500); TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message); assertNotNull(message);
assertEquals("Hello World", message.getText()); assertEquals("Hello World", message.getText());
} }
public void testTransformationFailed() throws Exception { public void testTransformationFailed() throws Exception {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
@ -643,9 +639,9 @@ public class StompTest extends CombinationTestSupport {
TextMessage message = (TextMessage)consumer.receive(2500); TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message); assertNotNull(message);
assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR)); assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR));
assertEquals("Hello World", message.getText()); assertEquals("Hello World", message.getText());
} }
public void testTransformationSendXMLObject() throws Exception { public void testTransformationSendXMLObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
@ -654,7 +650,7 @@ public class StompTest extends CombinationTestSupport {
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED")); assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject + Stomp.NULL; frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -663,8 +659,8 @@ public class StompTest extends CombinationTestSupport {
assertNotNull(message); assertNotNull(message);
SamplePojo object = (SamplePojo)message.getObject(); SamplePojo object = (SamplePojo)message.getObject();
assertEquals("Dejan", object.getName()); assertEquals("Dejan", object.getName());
} }
public void testTransformationSendJSONObject() throws Exception { public void testTransformationSendJSONObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
@ -673,7 +669,7 @@ public class StompTest extends CombinationTestSupport {
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED")); assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject + Stomp.NULL; frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -683,13 +679,13 @@ public class StompTest extends CombinationTestSupport {
SamplePojo object = (SamplePojo)message.getObject(); SamplePojo object = (SamplePojo)message.getObject();
assertEquals("Dejan", object.getName()); assertEquals("Dejan", object.getName());
} }
public void testTransformationSubscribeXML() throws Exception { public void testTransformationSubscribeXML() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message); producer.send(message);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -698,7 +694,7 @@ public class StompTest extends CombinationTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject)); assertTrue(frame.trim().endsWith(xmlObject));
@ -706,12 +702,12 @@ public class StompTest extends CombinationTestSupport {
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
public void testTransformationReceiveJSONObject() throws Exception { public void testTransformationReceiveJSONObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message); producer.send(message);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -720,21 +716,21 @@ public class StompTest extends CombinationTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonObject)); assertTrue(frame.trim().endsWith(jsonObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
public void testTransformationReceiveXMLObject() throws Exception { public void testTransformationReceiveXMLObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message); producer.send(message);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -743,15 +739,15 @@ public class StompTest extends CombinationTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject)); assertTrue(frame.trim().endsWith(xmlObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
public void testTransformationNotOverrideSubscription() throws Exception { public void testTransformationNotOverrideSubscription() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName())); MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade")); ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -866,7 +862,7 @@ public class StompTest extends CombinationTestSupport {
message.setString("name", "Dejan"); message.setString("name", "Dejan");
message.setString("city", "Belgrade"); message.setString("city", "Belgrade");
producer.send(message); producer.send(message);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -875,81 +871,81 @@ public class StompTest extends CombinationTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonMap.trim())); assertTrue(frame.trim().endsWith(jsonMap.trim()));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
public void testDurableUnsub() throws Exception { public void testDurableUnsub() throws Exception {
// get broker JMX view // get broker JMX view
String domain = "org.apache.activemq"; String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost"); ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
// connect // connect
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED")); assertTrue(frame.startsWith("CONNECTED"));
assertEquals(view.getDurableTopicSubscribers().length, 0); assertEquals(view.getDurableTopicSubscribers().length, 0);
// subscribe // subscribe
frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
// wait a bit for MBean to get refreshed // wait a bit for MBean to get refreshed
try { try {
Thread.sleep(400); Thread.sleep(400);
} catch (InterruptedException e){} } catch (InterruptedException e){}
assertEquals(view.getDurableTopicSubscribers().length, 1); assertEquals(view.getDurableTopicSubscribers().length, 1);
// disconnect // disconnect
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
try { try {
Thread.sleep(400); Thread.sleep(400);
} catch (InterruptedException e){} } catch (InterruptedException e){}
//reconnect //reconnect
stompConnect(); stompConnect();
// connect // connect
frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED")); assertTrue(frame.startsWith("CONNECTED"));
// unsubscribe // unsubscribe
frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
try { try {
Thread.sleep(400); Thread.sleep(400);
} catch (InterruptedException e){} } catch (InterruptedException e){}
assertEquals(view.getDurableTopicSubscribers().length, 0); assertEquals(view.getDurableTopicSubscribers().length, 0);
} }
public void testMessageIdHeader() throws Exception { public void testMessageIdHeader() throws Exception {
stompConnection.connect("system", "manager"); stompConnection.connect("system", "manager");
stompConnection.begin("tx1"); stompConnection.begin("tx1");
stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null); stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null);
stompConnection.commit("tx1"); stompConnection.commit("tx1");
stompConnection.subscribe("/queue/" + getQueueName()); stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive(); StompFrame stompMessage = stompConnection.receive();
assertNull(stompMessage.getHeaders().get("transaction")); assertNull(stompMessage.getHeaders().get("transaction"));
} }
public void testPrefetchSize() throws Exception { public void testPrefetchSize() throws Exception {
stompConnection.connect("system", "manager"); stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>(); HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1"); headers.put("activemq.prefetchSize", "1");
stompConnection.subscribe("/queue/" + getQueueName(), "client", headers); stompConnection.subscribe("/queue/" + getQueueName(), "client", headers);
@ -960,85 +956,80 @@ public class StompTest extends CombinationTestSupport {
sendMessage("message 3"); sendMessage("message 3");
sendMessage("message 4"); sendMessage("message 4");
sendMessage("message 5"); sendMessage("message 5");
StompFrame frame = stompConnection.receive(); StompFrame frame = stompConnection.receive();
assertEquals(frame.getBody(), "message 1"); assertEquals(frame.getBody(), "message 1");
stompConnection.begin("tx1"); stompConnection.begin("tx1");
stompConnection.ack(frame, "tx1"); stompConnection.ack(frame, "tx1");
StompFrame frame1 = stompConnection.receive(); StompFrame frame1 = stompConnection.receive();
assertEquals(frame1.getBody(), "message 2"); assertEquals(frame1.getBody(), "message 2");
try { try {
StompFrame frame2 = stompConnection.receive(500); StompFrame frame2 = stompConnection.receive(500);
if (frame2 != null) { if (frame2 != null) {
fail("Should not have received the second message"); fail("Should not have received the second message");
} }
} catch (SocketTimeoutException soe) {} } catch (SocketTimeoutException soe) {}
stompConnection.ack(frame1, "tx1"); stompConnection.ack(frame1, "tx1");
Thread.sleep(1000); Thread.sleep(1000);
stompConnection.abort("tx1"); stompConnection.abort("tx1");
stompConnection.begin("tx2"); stompConnection.begin("tx2");
// Previously delivered message need to get re-acked... // Previously delivered message need to get re-acked...
stompConnection.ack(frame, "tx2"); stompConnection.ack(frame, "tx2");
stompConnection.ack(frame1, "tx2"); stompConnection.ack(frame1, "tx2");
StompFrame frame3 = stompConnection.receive(); StompFrame frame3 = stompConnection.receive();
assertEquals(frame3.getBody(), "message 3"); assertEquals(frame3.getBody(), "message 3");
stompConnection.ack(frame3, "tx2"); stompConnection.ack(frame3, "tx2");
StompFrame frame4 = stompConnection.receive(); StompFrame frame4 = stompConnection.receive();
assertEquals(frame4.getBody(), "message 4"); assertEquals(frame4.getBody(), "message 4");
stompConnection.ack(frame4, "tx2"); stompConnection.ack(frame4, "tx2");
stompConnection.commit("tx2"); stompConnection.commit("tx2");
stompConnection.begin("tx3"); stompConnection.begin("tx3");
StompFrame frame5 = stompConnection.receive(); StompFrame frame5 = stompConnection.receive();
assertEquals(frame5.getBody(), "message 5"); assertEquals(frame5.getBody(), "message 5");
stompConnection.ack(frame5, "tx3"); stompConnection.ack(frame5, "tx3");
stompConnection.commit("tx3"); stompConnection.commit("tx3");
stompDisconnect(); stompDisconnect();
} }
public void testTransactionsWithMultipleDestinations() throws Exception { public void testTransactionsWithMultipleDestinations() throws Exception {
stompConnection.connect("system", "manager"); stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>(); HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1"); headers.put("activemq.prefetchSize", "1");
headers.put("activemq.exclusive", "true"); headers.put("activemq.exclusive", "true");
stompConnection.subscribe("/queue/test1", "client", headers); stompConnection.subscribe("/queue/test1", "client", headers);
stompConnection.begin("ID:tx1"); stompConnection.begin("ID:tx1");
headers.clear(); headers.clear();
headers.put("receipt", "ID:msg1"); headers.put("receipt", "ID:msg1");
stompConnection.send("/queue/test2", "test message", "ID:tx1", headers); stompConnection.send("/queue/test2", "test message", "ID:tx1", headers);
stompConnection.commit("ID:tx1"); stompConnection.commit("ID:tx1");
// make sure connection is active after commit // make sure connection is active after commit
Thread.sleep(1000); Thread.sleep(1000);
stompConnection.send("/queue/test1", "another message"); stompConnection.send("/queue/test1", "another message");
StompFrame frame = stompConnection.receive(500); StompFrame frame = stompConnection.receive(500);
System.out.println(frame); assertNotNull(frame);
assertNotNull(frame);
stompConnection.disconnect();
stompConnection.disconnect();
} }
public void testTempDestination() throws Exception { public void testTempDestination() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
@ -1046,7 +1037,7 @@ public class StompTest extends CombinationTestSupport {
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED")); assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/temp-queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/temp-queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -1056,7 +1047,45 @@ public class StompTest extends CombinationTestSupport {
StompFrame message = stompConnection.receive(1000); StompFrame message = stompConnection.receive(1000);
assertEquals("Hello World", message.getBody()); assertEquals("Hello World", message.getBody());
} }
public void testJMSXUserIDIsSetInMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(1000);
assertNotNull(message);
assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID));
}
public void testJMSXUserIDIsSetInStompMessage() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive(1000);
assertEquals("system", message.getHeaders().get(Stomp.Headers.Message.USERID));
}
protected void assertClients(int expected) throws Exception { protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
int actual = clients.length; int actual = clients.length;
@ -1071,5 +1100,3 @@ public class StompTest extends CombinationTestSupport {
Thread.sleep(2000); Thread.sleep(2000);
} }
} }