mirror of https://github.com/apache/activemq.git
added test case and fix for the sending of Stomp messages with headers
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@377712 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f7ed407271
commit
c4c5895d33
|
@ -142,7 +142,7 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
|
|
||||||
public void setProperties(Map properties) throws IOException {
|
public void setProperties(Map properties) throws IOException {
|
||||||
lazyCreateProperties();
|
lazyCreateProperties();
|
||||||
properties.putAll(properties);
|
this.properties.putAll(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setProperty(String name, Object value) throws IOException {
|
public void setProperty(String name, Object value) throws IOException {
|
||||||
|
|
|
@ -16,27 +16,25 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.stomp;
|
package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.URI;
|
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
import javax.jms.*;
|
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
|
||||||
import org.apache.activemq.CombinationTestSupport;
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
|
||||||
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
|
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
|
||||||
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
|
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
|
||||||
import org.apache.activemq.transport.stomp.Stomp;
|
|
||||||
|
|
||||||
public class StompTest extends CombinationTestSupport {
|
public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
|
@ -62,12 +60,16 @@ public class StompTest extends CombinationTestSupport {
|
||||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
|
||||||
connection = cf.createConnection();
|
connection = cf.createConnection();
|
||||||
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
|
||||||
queue = new ActiveMQQueue("TEST");
|
queue = new ActiveMQQueue(getQueueName());
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected String getQueueName() {
|
||||||
|
return getClass().getName() + "." + getName();
|
||||||
|
}
|
||||||
|
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
connection.close();
|
connection.close();
|
||||||
stompSocket.close();
|
stompSocket.close();
|
||||||
|
@ -123,6 +125,34 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
String frame =
|
||||||
|
"CONNECT\n" +
|
||||||
|
"login: brianm\n" +
|
||||||
|
"passcode: wombats\n\n"+
|
||||||
|
Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
frame = receiveFrame(10000);
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame =
|
||||||
|
"SEND\n" +
|
||||||
|
"destination:/queue/" + getQueueName() + "\n\n" +
|
||||||
|
"Hello World" +
|
||||||
|
Stomp.NULL;
|
||||||
|
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals("Hello World", message.getText());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testSendMessageWithHeaders() throws Exception {
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
|
||||||
|
|
||||||
String frame =
|
String frame =
|
||||||
"CONNECT\n" +
|
"CONNECT\n" +
|
||||||
"login: brianm\n" +
|
"login: brianm\n" +
|
||||||
|
@ -135,17 +165,17 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
frame =
|
frame =
|
||||||
"SEND\n" +
|
"SEND\n" +
|
||||||
"destination:/queue/TEST\n\n" +
|
"foo:abc\n" +
|
||||||
|
"bar:123\n" +
|
||||||
|
"destination:/queue/" + getQueueName() + "\n\n" +
|
||||||
"Hello World" +
|
"Hello World" +
|
||||||
Stomp.NULL;
|
Stomp.NULL;
|
||||||
|
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
|
||||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
TextMessage message = (TextMessage) consumer.receive(60000);
|
||||||
assertNotNull(message);
|
assertNotNull(message);
|
||||||
assertEquals("Hello World", message.getText());
|
assertEquals("Hello World", message.getText());
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSubscribeWithAutoAck() throws Exception {
|
public void testSubscribeWithAutoAck() throws Exception {
|
||||||
|
@ -162,7 +192,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
frame =
|
frame =
|
||||||
"SUBSCRIBE\n" +
|
"SUBSCRIBE\n" +
|
||||||
"destination:/queue/TEST\n" +
|
"destination:/queue/" + getQueueName() + "\n" +
|
||||||
"ack:auto\n\n" +
|
"ack:auto\n\n" +
|
||||||
Stomp.NULL;
|
Stomp.NULL;
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
@ -197,7 +227,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
frame =
|
frame =
|
||||||
"SUBSCRIBE\n" +
|
"SUBSCRIBE\n" +
|
||||||
"destination:/queue/TEST\n" +
|
"destination:/queue/" + getQueueName() + "\n" +
|
||||||
"ack:client\n\n"+
|
"ack:client\n\n"+
|
||||||
Stomp.NULL;
|
Stomp.NULL;
|
||||||
|
|
||||||
|
@ -236,13 +266,13 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
frame =
|
frame =
|
||||||
"SUBSCRIBE\n" +
|
"SUBSCRIBE\n" +
|
||||||
"destination:/queue/TEST\n" +
|
"destination:/queue/" + getQueueName() + "\n" +
|
||||||
"ack:auto\n\n" +
|
"ack:auto\n\n" +
|
||||||
Stomp.NULL;
|
Stomp.NULL;
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
|
||||||
//send a message to our queue
|
//send a message to our queue
|
||||||
sendMessage(getName());
|
sendMessage("first message");
|
||||||
|
|
||||||
|
|
||||||
//receive message from socket
|
//receive message from socket
|
||||||
|
@ -252,17 +282,18 @@ public class StompTest extends CombinationTestSupport {
|
||||||
//remove suscription
|
//remove suscription
|
||||||
frame =
|
frame =
|
||||||
"UNSUBSCRIBE\n" +
|
"UNSUBSCRIBE\n" +
|
||||||
"destination:/queue/TEST\n" +
|
"destination:/queue/" + getQueueName() + "\n" +
|
||||||
"\n\n" +
|
"\n\n" +
|
||||||
Stomp.NULL;
|
Stomp.NULL;
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
|
||||||
//send a message to our queue
|
//send a message to our queue
|
||||||
sendMessage(getName());
|
sendMessage("second message");
|
||||||
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
frame = receiveFrame(1000);
|
frame = receiveFrame(1000);
|
||||||
|
System.out.println("Received frame: " + frame);
|
||||||
fail("No message should have been received since subscription was removed");
|
fail("No message should have been received since subscription was removed");
|
||||||
}catch (SocketTimeoutException e){
|
}catch (SocketTimeoutException e){
|
||||||
|
|
||||||
|
@ -291,7 +322,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
frame =
|
frame =
|
||||||
"SEND\n" +
|
"SEND\n" +
|
||||||
"destination:/queue/TEST\n" +
|
"destination:/queue/" + getQueueName() + "\n" +
|
||||||
"transaction: tx1\n" +
|
"transaction: tx1\n" +
|
||||||
"\n\n" +
|
"\n\n" +
|
||||||
"Hello World" +
|
"Hello World" +
|
||||||
|
@ -335,7 +366,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
frame =
|
frame =
|
||||||
"SEND\n" +
|
"SEND\n" +
|
||||||
"destination:/queue/TEST\n" +
|
"destination:/queue/" + getQueueName() + "\n" +
|
||||||
"transaction: tx1\n" +
|
"transaction: tx1\n" +
|
||||||
"\n\n" +
|
"\n\n" +
|
||||||
"first message" +
|
"first message" +
|
||||||
|
@ -352,7 +383,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
frame =
|
frame =
|
||||||
"SEND\n" +
|
"SEND\n" +
|
||||||
"destination:/queue/TEST\n" +
|
"destination:/queue/" + getQueueName() + "\n" +
|
||||||
"transaction: tx1\n" +
|
"transaction: tx1\n" +
|
||||||
"\n\n" +
|
"\n\n" +
|
||||||
"second message" +
|
"second message" +
|
||||||
|
|
Loading…
Reference in New Issue