mirror of https://github.com/apache/activemq.git
adding some message group tests
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@737685 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
977393d9e7
commit
26c25ebe6d
|
@ -0,0 +1,165 @@
|
|||
package org.apache.activemq.broker.region.group;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.CombinationTestSupport;
|
||||
import org.apache.activemq.JmsTestSupport;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
public class MessageGroupTest extends JmsTestSupport {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(CombinationTestSupport.class);
|
||||
|
||||
public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
|
||||
|
||||
ActiveMQDestination destination = new ActiveMQQueue("TEST");
|
||||
|
||||
// Setup a first connection
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer1 = session.createConsumer(destination);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
// Send the messages.
|
||||
for (int i = 0; i < 4; i++) {
|
||||
TextMessage message = session.createTextMessage("message " + i);
|
||||
message.setStringProperty("JMSXGroupID", "TEST-GROUP");
|
||||
message.setIntProperty("JMSXGroupSeq", i + 1);
|
||||
LOG.info("sending message: " + message);
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
// All the messages should have been sent down connection 1.. just get
|
||||
// the first 3
|
||||
for (int i = 0; i < 3; i++) {
|
||||
TextMessage m1 = (TextMessage)consumer1.receive(500);
|
||||
assertNotNull("m1 is null for index: " + i, m1);
|
||||
assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1);
|
||||
}
|
||||
|
||||
// Setup a second connection
|
||||
Connection connection1 = factory.createConnection(userName, password);
|
||||
connection1.start();
|
||||
Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer2 = session2.createConsumer(destination);
|
||||
|
||||
// Close the first consumer.
|
||||
consumer1.close();
|
||||
|
||||
// The last messages should now go the the second consumer.
|
||||
for (int i = 0; i < 1; i++) {
|
||||
TextMessage m1 = (TextMessage)consumer2.receive(500);
|
||||
assertNotNull("m1 is null for index: " + i, m1);
|
||||
assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i);
|
||||
}
|
||||
|
||||
//assert that there are no other messages left for the consumer 2
|
||||
Message m = consumer2.receive(100);
|
||||
assertNull("consumer 2 has some messages left", m);
|
||||
}
|
||||
|
||||
public void testAddingConsumer() throws Exception {
|
||||
ActiveMQDestination destination = new ActiveMQQueue("TEST");
|
||||
|
||||
// Setup a first connection
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
//MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
TextMessage message = session.createTextMessage("message");
|
||||
message.setStringProperty("JMSXGroupID", "TEST-GROUP");
|
||||
|
||||
LOG.info("sending message: " + message);
|
||||
producer.send(message);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
TextMessage msg = (TextMessage)consumer.receive();
|
||||
assertNotNull(msg);
|
||||
boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
|
||||
assertTrue(first);
|
||||
}
|
||||
|
||||
public void testClosingMessageGroup() throws Exception {
|
||||
|
||||
ActiveMQDestination destination = new ActiveMQQueue("TEST");
|
||||
|
||||
// Setup a first connection
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer1 = session.createConsumer(destination);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
// Send the messages.
|
||||
for (int i = 0; i < 4; i++) {
|
||||
TextMessage message = session.createTextMessage("message " + i);
|
||||
message.setStringProperty("JMSXGroupID", "TEST-GROUP");
|
||||
LOG.info("sending message: " + message);
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
|
||||
|
||||
// All the messages should have been sent down consumer1.. just get
|
||||
// the first 3
|
||||
for (int i = 0; i < 3; i++) {
|
||||
TextMessage m1 = (TextMessage)consumer1.receive(500);
|
||||
assertNotNull("m1 is null for index: " + i, m1);
|
||||
}
|
||||
|
||||
// Setup a second consumer
|
||||
Connection connection1 = factory.createConnection(userName, password);
|
||||
connection1.start();
|
||||
Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer2 = session2.createConsumer(destination);
|
||||
|
||||
//assert that there are no messages for the consumer 2
|
||||
Message m = consumer2.receive(100);
|
||||
assertNull("consumer 2 has some messages", m);
|
||||
|
||||
// Close the group
|
||||
TextMessage message = session.createTextMessage("message " + 5);
|
||||
message.setStringProperty("JMSXGroupID", "TEST-GROUP");
|
||||
message.setIntProperty("JMSXGroupSeq", -1);
|
||||
LOG.info("sending message: " + message);
|
||||
producer.send(message);
|
||||
|
||||
//Send some more messages
|
||||
for (int i = 0; i < 4; i++) {
|
||||
message = session.createTextMessage("message " + i);
|
||||
message.setStringProperty("JMSXGroupID", "TEST-GROUP");
|
||||
LOG.info("sending message: " + message);
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
// Receive the fourth message
|
||||
TextMessage m1 = (TextMessage)consumer1.receive(500);
|
||||
assertNotNull("m1 is null for index: " + 4, m1);
|
||||
|
||||
// Receive the closing message
|
||||
m1 = (TextMessage)consumer1.receive(500);
|
||||
assertNotNull("m1 is null for index: " + 5, m1);
|
||||
|
||||
//assert that there are no messages for the consumer 1
|
||||
m = consumer1.receive(100);
|
||||
assertNull("consumer 1 has some messages left", m);
|
||||
|
||||
// The messages should now go to the second consumer.
|
||||
for (int i = 0; i < 4; i++) {
|
||||
m1 = (TextMessage)consumer2.receive(500);
|
||||
assertNotNull("m1 is null for index: " + i, m1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue