mirror of https://github.com/apache/activemq.git
added test case and support for selectors on Stomp subscriptions
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@377713 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c4c5895d33
commit
a603c40b64
|
@ -79,6 +79,7 @@ public interface Stomp {
|
||||||
String DESTINATION = "destination";
|
String DESTINATION = "destination";
|
||||||
String ACK_MODE = "ack";
|
String ACK_MODE = "ack";
|
||||||
String ID = "id";
|
String ID = "id";
|
||||||
|
String SELECTOR = "selector";
|
||||||
|
|
||||||
public interface AckModeValues {
|
public interface AckModeValues {
|
||||||
String AUTO = "auto";
|
String AUTO = "auto";
|
||||||
|
|
|
@ -16,14 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.stomp;
|
package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
class Subscribe implements StompCommand {
|
class Subscribe implements StompCommand {
|
||||||
private HeaderParser headerParser = new HeaderParser();
|
private HeaderParser headerParser = new HeaderParser();
|
||||||
private StompWireFormat format;
|
private StompWireFormat format;
|
||||||
|
@ -43,6 +43,9 @@ class Subscribe implements StompCommand {
|
||||||
ci.setPrefetchSize(1000);
|
ci.setPrefetchSize(1000);
|
||||||
ci.setDispatchAsync(true);
|
ci.setDispatchAsync(true);
|
||||||
|
|
||||||
|
String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
|
||||||
|
ci.setSelector(selector);
|
||||||
|
|
||||||
IntrospectionSupport.setProperties(ci, headers, "activemq.");
|
IntrospectionSupport.setProperties(ci, headers, "activemq.");
|
||||||
|
|
||||||
ci.setDestination(DestinationNamer.convert(destination));
|
ci.setDestination(DestinationNamer.convert(destination));
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
@ -103,12 +104,16 @@ public class StompTest extends CombinationTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessage(String msg) throws Exception {
|
|
||||||
|
|
||||||
|
public void sendMessage(String msg) throws Exception {
|
||||||
|
sendMessage(msg, "foo", "xyz");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
|
||||||
MessageProducer producer = session.createProducer(queue);
|
MessageProducer producer = session.createProducer(queue);
|
||||||
TextMessage message = session.createTextMessage(msg);
|
TextMessage message = session.createTextMessage(msg);
|
||||||
|
message.setStringProperty(propertyName, propertyValue);
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConnect() throws Exception {
|
public void testConnect() throws Exception {
|
||||||
|
@ -202,13 +207,46 @@ public class StompTest extends CombinationTestSupport {
|
||||||
frame = receiveFrame(10000);
|
frame = receiveFrame(10000);
|
||||||
assertTrue(frame.startsWith("MESSAGE"));
|
assertTrue(frame.startsWith("MESSAGE"));
|
||||||
|
|
||||||
|
frame =
|
||||||
|
"DISCONNECT\n" +
|
||||||
|
"\n\n"+
|
||||||
|
Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testSubscribeWithAutoAckAndSelector() throws Exception {
|
||||||
|
|
||||||
|
String frame =
|
||||||
|
"CONNECT\n" +
|
||||||
|
"login: brianm\n" +
|
||||||
|
"passcode: wombats\n\n"+
|
||||||
|
Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
frame = receiveFrame(100000);
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame =
|
||||||
|
"SUBSCRIBE\n" +
|
||||||
|
"destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"selector: foo = 'zzz'\n" +
|
||||||
|
"ack:auto\n\n" +
|
||||||
|
Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
sendMessage("Ignored message", "foo", "1234");
|
||||||
|
sendMessage("Real message", "foo", "zzz");
|
||||||
|
|
||||||
|
frame = receiveFrame(10000);
|
||||||
|
assertTrue(frame.startsWith("MESSAGE"));
|
||||||
|
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
|
||||||
|
|
||||||
frame =
|
frame =
|
||||||
"DISCONNECT\n" +
|
"DISCONNECT\n" +
|
||||||
"\n\n"+
|
"\n\n"+
|
||||||
Stomp.NULL;
|
Stomp.NULL;
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue