Fixed and added test cases for the consumer start() stop() methods.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@358579 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2005-12-22 15:43:42 +00:00
parent 84077a3663
commit 18dd4cd867
2 changed files with 68 additions and 8 deletions

View File

@ -245,10 +245,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @return true if this is a durable topic subscriber
*/
public boolean isDurableSubscriber() {
// TODO Add ActiveMQTopicSubscriber
return false;
// return this instanceof ActiveMQTopicSubscriber && consumerName !=
// null && consumerName.length() > 0;
return info.getSubcriptionName()!=null && info.getDestination().isTopic();
}
/**
@ -671,8 +668,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
Scheduler.executeAfterDelay(new Runnable() {
public void run() {
if (started.get())
unconsumedMessages.start();
try {
if (started.get())
start();
} catch (JMSException e) {
session.connection.onAsyncException(e);
}
}
}, redeliveryDelay);
@ -695,7 +696,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
MessageListener listener = this.messageListener;
try {
if (!unconsumedMessages.isClosed()) {
if (listener != null) {
if (listener != null && started.get()) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
listener.onMessage(message);
@ -716,9 +717,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return unconsumedMessages.size();
}
public void start() {
public void start() throws JMSException {
started.set(true);
unconsumedMessages.start();
MessageListener listener = this.messageListener;
if( listener!=null ) {
MessageDispatch md;
while( (md = unconsumedMessages.dequeueNoWait())!=null ) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
listener.onMessage(message);
afterMessageIsConsumed(md, false);
}
}
}
public void stop() {

View File

@ -59,6 +59,55 @@ public class JMSConsumerTest extends JmsTestSupport {
public byte destinationType;
public boolean durableConsumer;
public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
addCombinationValues("deliveryMode", new Object[] {
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT) });
addCombinationValues("destinationType", new Object[] {
new Byte(ActiveMQDestination.QUEUE_TYPE),
new Byte(ActiveMQDestination.TOPIC_TYPE),
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE) });
}
public void testMessageListenerWithConsumerCanBeStopped() throws Throwable {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch done1 = new CountDownLatch(1);
final CountDownLatch done2 = new CountDownLatch(1);
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
counter.incrementAndGet();
if( counter.get()==1 )
done1.countDown();
if( counter.get()==2 )
done2.countDown();
}
});
// Send a first message to make sure that the consumer dispatcher is running
sendMessages(session, destination, 1);
assertTrue(done1.await(1, TimeUnit.SECONDS));
assertEquals(1, counter.get());
// Stop the consumer.
consumer.stop();
// Send a message, but should not get delivered.
sendMessages(session, destination, 1);
assertFalse(done2.await(1, TimeUnit.SECONDS));
assertEquals(1, counter.get());
// Start the consumer, and the message should now get delivered.
consumer.start();
assertTrue(done2.await(1, TimeUnit.SECONDS));
assertEquals(2, counter.get());
}
public void initCombosForTestMutiReceiveWithPrefetch1() {
addCombinationValues("deliveryMode", new Object[] {