mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@737062 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a321204079
commit
977393d9e7
|
@ -70,7 +70,7 @@ public class QueueDispatchSelector extends SimpleDispatchSelector {
|
||||||
if (result) {
|
if (result) {
|
||||||
result = exclusiveConsumer == null
|
result = exclusiveConsumer == null
|
||||||
|| exclusiveConsumer == subscription;
|
|| exclusiveConsumer == subscription;
|
||||||
if (result && !subscription.isFull()) {
|
if (result) {
|
||||||
QueueMessageReference node = (QueueMessageReference) m;
|
QueueMessageReference node = (QueueMessageReference) m;
|
||||||
// Keep message groups together.
|
// Keep message groups together.
|
||||||
String groupId = node.getGroupID();
|
String groupId = node.getGroupID();
|
||||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test cases used to test the JMS message comsumer.
|
* Test cases used to test the JMS message consumer.
|
||||||
*
|
*
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -54,8 +54,7 @@ public class MessageGroupDelayedTest extends JmsTestSupport {
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
broker = createBroker();
|
broker = createBroker();
|
||||||
broker.start();
|
broker.start();
|
||||||
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri());
|
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=1");
|
||||||
//ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
|
|
||||||
connection = connFactory.createConnection();
|
connection = connFactory.createConnection();
|
||||||
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
destination = new ActiveMQQueue("test-queue2");
|
destination = new ActiveMQQueue("test-queue2");
|
||||||
|
|
|
@ -0,0 +1,163 @@
|
||||||
|
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test plan:
|
||||||
|
* Producer: publish messages into a queue, with three message groups
|
||||||
|
* Consumer1: created before any messages are created
|
||||||
|
* Consumer2: created after consumer1 has processed one message from each message group
|
||||||
|
*
|
||||||
|
* All three groups are handled by to consumer1, so consumer2 should not get any messages.
|
||||||
|
* See bug AMQ-2016: Message grouping fails when consumers are added
|
||||||
|
*/
|
||||||
|
public class MessageGroupNewConsumerTest extends TestCase {
|
||||||
|
private static final Log LOG = LogFactory.getLog(MessageGroupNewConsumerTest.class);
|
||||||
|
private Connection connection;
|
||||||
|
// Released after the messages are created
|
||||||
|
private CountDownLatch latchMessagesCreated = new CountDownLatch(1);
|
||||||
|
// Released after one message from each group is consumed
|
||||||
|
private CountDownLatch latchGroupsAcquired = new CountDownLatch(1);
|
||||||
|
|
||||||
|
private static final String[] groupNames = { "GrA", "GrB", "GrC" };
|
||||||
|
private int messagesSent, messagesRecvd1, messagesRecvd2;
|
||||||
|
// with the prefetch too high, this bug is not realized
|
||||||
|
private static final String connStr = "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";
|
||||||
|
|
||||||
|
public void testNewConsumer() throws JMSException, InterruptedException {
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connStr);
|
||||||
|
connection = factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
final String queueName = this.getClass().getSimpleName();
|
||||||
|
final Thread producerThread = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
MessageProducer prod = session.createProducer(queue);
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
for(String group : groupNames) {
|
||||||
|
Message message = generateMessage(session, group, i+1);
|
||||||
|
prod.send(message);
|
||||||
|
session.commit();
|
||||||
|
messagesSent++;
|
||||||
|
}
|
||||||
|
LOG.info("Sent message seq "+ (i+1));
|
||||||
|
if (i==0) {
|
||||||
|
latchMessagesCreated.countDown();
|
||||||
|
}
|
||||||
|
if (i==2) {
|
||||||
|
LOG.info("Prod: Waiting for groups");
|
||||||
|
latchGroupsAcquired.await();
|
||||||
|
}
|
||||||
|
Thread.sleep(20);
|
||||||
|
}
|
||||||
|
LOG.info(messagesSent+" messages sent");
|
||||||
|
prod.close();
|
||||||
|
session.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Producer failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final Thread consumerThread1 = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
MessageConsumer con1 = session.createConsumer(queue);
|
||||||
|
latchMessagesCreated.await();
|
||||||
|
while(true) {
|
||||||
|
Message message = con1.receive(1000);
|
||||||
|
if (message == null) break;
|
||||||
|
LOG.info("Con1 got message "+formatMessage(message));
|
||||||
|
session.commit();
|
||||||
|
messagesRecvd1++;
|
||||||
|
// since we get the messages in order, the first few messages will be one from each group
|
||||||
|
// after we get one from each group, start the other consumer
|
||||||
|
if (messagesRecvd1 == groupNames.length) {
|
||||||
|
LOG.info("All groups acquired");
|
||||||
|
latchGroupsAcquired.countDown();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
LOG.info(messagesRecvd1+" messages received by consumer1");
|
||||||
|
con1.close();
|
||||||
|
session.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Consumer 1 failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final Thread consumerThread2 = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
latchGroupsAcquired.await();
|
||||||
|
while(consumerThread1.isAlive()) {
|
||||||
|
LOG.info("(re)starting consumer2");
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
MessageConsumer con2 = session.createConsumer(queue);
|
||||||
|
while(true) {
|
||||||
|
Message message = con2.receive(500);
|
||||||
|
if (message == null) break;
|
||||||
|
LOG.info("Con2 got message "+formatMessage(message));
|
||||||
|
session.commit();
|
||||||
|
messagesRecvd2++;
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
con2.close();
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
LOG.info(messagesRecvd2+" messages received by consumer2");
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Consumer 2 failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
consumerThread2.start();
|
||||||
|
consumerThread1.start();
|
||||||
|
producerThread.start();
|
||||||
|
// wait for threads to finish
|
||||||
|
producerThread.join();
|
||||||
|
consumerThread1.join();
|
||||||
|
consumerThread2.join();
|
||||||
|
connection.close();
|
||||||
|
// check results
|
||||||
|
assertEquals("consumer 2 should not get any messages", 0, messagesRecvd2);
|
||||||
|
assertEquals("consumer 1 should get all the messages", messagesSent, messagesRecvd1);
|
||||||
|
assertTrue("producer failed to send any messages", messagesSent > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Message generateMessage(Session session, String groupId, int seq) throws JMSException {
|
||||||
|
TextMessage m = session.createTextMessage();
|
||||||
|
m.setJMSType("TEST_MESSAGE");
|
||||||
|
m.setStringProperty("JMSXGroupID", groupId);
|
||||||
|
m.setIntProperty("JMSXGroupSeq", seq);
|
||||||
|
m.setText("<?xml?><testMessage/>");
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
public String formatMessage(Message m) {
|
||||||
|
try {
|
||||||
|
return m.getStringProperty("JMSXGroupID")+"-"+m.getIntProperty("JMSXGroupSeq")+"-"+m.getBooleanProperty("JMSXGroupFirstForConsumer");
|
||||||
|
} catch (Exception e) {
|
||||||
|
return e.getClass().getSimpleName()+": "+e.getMessage();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue