diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index ed824f4a85..29236557bf 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -22,17 +22,19 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.URI; +import java.net.SocketTimeoutException; -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; + +import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.transport.stomp.Stomp; @@ -49,6 +51,7 @@ public class StompTest extends CombinationTestSupport { protected void setUp() throws Exception { broker = new BrokerService(); broker.setPersistent(false); + connector = broker.addConnector("stomp://localhost:0"); broker.start(); @@ -61,6 +64,8 @@ public class StompTest extends CombinationTestSupport { session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); queue = new ActiveMQQueue("TEST"); connection.start(); + + } protected void tearDown() throws Exception { @@ -95,9 +100,17 @@ public class StompTest extends CombinationTestSupport { } } } - + + public void sendMessage(String msg) throws Exception { + + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage(msg); + producer.send(message); + + } + public void testConnect() throws Exception { - + String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL; sendFrame(connect_frame); @@ -119,46 +132,252 @@ public class StompTest extends CombinationTestSupport { frame = receiveFrame(10000); assertTrue(frame.startsWith("CONNECTED")); - - frame = - "SEND\n" + - "destination:/queue/TEST\n\n" + - "Hello World" + + + frame = + "SEND\n" + + "destination:/queue/TEST\n\n" + + "Hello World" + Stomp.NULL; + sendFrame(frame); - + TextMessage message = (TextMessage) consumer.receive(1000); assertNotNull(message); assertEquals("Hello World", message.getText()); - + + } - + public void testSubscribeWithAutoAck() throws Exception { - - String frame = - "CONNECT\n" + - "login: brianm\n" + + + String frame = + "CONNECT\n" + + "login: brianm\n" + "passcode: wombats\n\n"+ Stomp.NULL; sendFrame(frame); - - frame = receiveFrame(10000000); + + frame = receiveFrame(100000); assertTrue(frame.startsWith("CONNECTED")); - - frame = - "SUBSCRIBE\n" + + + frame = + "SUBSCRIBE\n" + "destination:/queue/TEST\n" + - "ack:auto\n\n" + + "ack:auto\n\n" + Stomp.NULL; sendFrame(frame); - MessageProducer producer = session.createProducer(queue); - TextMessage message = session.createTextMessage(getName()); - producer.send(message); - + sendMessage(getName()); + frame = receiveFrame(10000); assertTrue(frame.startsWith("MESSAGE")); - + + frame = + "DISCONNECT\n" + + "\n\n"+ + Stomp.NULL; + sendFrame(frame); + + } - + + + public void testSubscribeWithClientAck() throws Exception { + + String frame = + "CONNECT\n" + + "login: brianm\n" + + "passcode: wombats\n\n"+ + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(10000); + assertTrue(frame.startsWith("CONNECTED")); + + + frame = + "SUBSCRIBE\n" + + "destination:/queue/TEST\n" + + "ack:client\n\n"+ + Stomp.NULL; + + + sendFrame(frame); + sendMessage(getName()); + frame = receiveFrame(10000); + assertTrue(frame.startsWith("MESSAGE")); + + frame = + "DISCONNECT\n" + + "\n\n"+ + Stomp.NULL; + sendFrame(frame); + + // message should be received since message was not acknowledged + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertTrue(message.getJMSRedelivered()); + + + + } + + public void testUnsubscribe() throws Exception { + + String frame = + "CONNECT\n" + + "login: brianm\n" + + "passcode: wombats\n\n"+ + Stomp.NULL; + sendFrame(frame); + frame = receiveFrame(100000); + assertTrue(frame.startsWith("CONNECTED")); + + frame = + "SUBSCRIBE\n" + + "destination:/queue/TEST\n" + + "ack:auto\n\n" + + Stomp.NULL; + sendFrame(frame); + + //send a message to our queue + sendMessage(getName()); + + + //receive message from socket + frame = receiveFrame(10000); + assertTrue(frame.startsWith("MESSAGE")); + + //remove suscription + frame = + "UNSUBSCRIBE\n" + + "destination:/queue/TEST\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + + //send a message to our queue + sendMessage(getName()); + + + try { + frame = receiveFrame(1000); + fail("No message should have been received since subscription was removed"); + }catch (SocketTimeoutException e){ + + } + + } + + + public void testTransactionCommit() throws Exception { + String frame = + "CONNECT\n" + + "login: brianm\n" + + "passcode: wombats\n\n"+ + Stomp.NULL; + sendFrame(frame); + + String f = receiveFrame(1000); + assertTrue(f.startsWith("CONNECTED")); + + frame = + "BEGIN\n" + + "transaction: tx1\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + + frame = + "SEND\n" + + "destination:/queue/TEST\n" + + "transaction: tx1\n" + + "\n\n" + + "Hello World" + + Stomp.NULL; + sendFrame(frame); + + + frame = + "COMMIT\n" + + "transaction: tx1\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + + // This test case is currently failing + + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + + + } + + public void testTransactionRollback() throws Exception { + String frame = + "CONNECT\n" + + "login: brianm\n" + + "passcode: wombats\n\n"+ + Stomp.NULL; + sendFrame(frame); + + String f = receiveFrame(1000); + assertTrue(f.startsWith("CONNECTED")); + + frame = + "BEGIN\n" + + "transaction: tx1\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + + frame = + "SEND\n" + + "destination:/queue/TEST\n" + + "transaction: tx1\n" + + "\n\n" + + "first message" + + Stomp.NULL; + sendFrame(frame); + + //rollback first message + frame = + "ABORT\n" + + "transaction: tx1\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + + frame = + "SEND\n" + + "destination:/queue/TEST\n" + + "transaction: tx1\n" + + "\n\n" + + "second message" + + Stomp.NULL; + sendFrame(frame); + + frame = + "COMMIT\n" + + "transaction: tx1\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + + // This test case is currently failing + + //only second msg should be received since first msg was rolled back + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("second message", message.getText()); + + + } + + + + }