From a603c40b6427407a99b9ec190ed5c6b8afb932a9 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Tue, 14 Feb 2006 12:55:35 +0000 Subject: [PATCH] added test case and support for selectors on Stomp subscriptions git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@377713 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/stomp/Stomp.java | 1 + .../activemq/transport/stomp/Subscribe.java | 11 +++-- .../activemq/transport/stomp/StompTest.java | 48 +++++++++++++++++-- 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index 67ab2dae91..1130cd137a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -79,6 +79,7 @@ public interface Stomp { String DESTINATION = "destination"; String ACK_MODE = "ack"; String ID = "id"; + String SELECTOR = "selector"; public interface AckModeValues { String AUTO = "auto"; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java index 646e5b1a3d..e14ccf71c8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java @@ -16,14 +16,14 @@ */ package org.apache.activemq.transport.stomp; -import java.io.DataInput; -import java.io.IOException; -import java.util.Properties; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.util.IntrospectionSupport; +import java.io.DataInput; +import java.io.IOException; +import java.util.Properties; + class Subscribe implements StompCommand { private HeaderParser headerParser = new HeaderParser(); private StompWireFormat format; @@ -43,6 +43,9 @@ class Subscribe implements StompCommand { ci.setPrefetchSize(1000); ci.setDispatchAsync(true); + String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR); + ci.setSelector(selector); + IntrospectionSupport.setProperties(ci, headers, "activemq."); ci.setDestination(DestinationNamer.convert(destination)); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index cd23afd716..9a24bddfac 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -23,6 +23,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQQueue; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -103,12 +104,16 @@ public class StompTest extends CombinationTestSupport { } } + public void sendMessage(String msg) throws Exception { + sendMessage(msg, "foo", "xyz"); + } + public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException { MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage(msg); + message.setStringProperty(propertyName, propertyValue); producer.send(message); - } public void testConnect() throws Exception { @@ -177,8 +182,40 @@ public class StompTest extends CombinationTestSupport { assertNotNull(message); assertEquals("Hello World", message.getText()); } - + public void testSubscribeWithAutoAck() throws Exception { + + String frame = + "CONNECT\n" + + "login: brianm\n" + + "passcode: wombats\n\n"+ + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(100000); + assertTrue(frame.startsWith("CONNECTED")); + + frame = + "SUBSCRIBE\n" + + "destination:/queue/" + getQueueName() + "\n" + + "ack:auto\n\n" + + Stomp.NULL; + sendFrame(frame); + + sendMessage(getName()); + + frame = receiveFrame(10000); + assertTrue(frame.startsWith("MESSAGE")); + + frame = + "DISCONNECT\n" + + "\n\n"+ + Stomp.NULL; + sendFrame(frame); + } + + + public void testSubscribeWithAutoAckAndSelector() throws Exception { String frame = "CONNECT\n" + @@ -193,22 +230,23 @@ public class StompTest extends CombinationTestSupport { frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "selector: foo = 'zzz'\n" + "ack:auto\n\n" + Stomp.NULL; sendFrame(frame); - sendMessage(getName()); + sendMessage("Ignored message", "foo", "1234"); + sendMessage("Real message", "foo", "zzz"); frame = receiveFrame(10000); assertTrue(frame.startsWith("MESSAGE")); + assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0); frame = "DISCONNECT\n" + "\n\n"+ Stomp.NULL; sendFrame(frame); - - }