allow a MessageListener to be specified when creating a consumer; to avoid threading issues when creating lots of consumers after the connection has started

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@568644 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-08-22 15:09:44 +00:00
parent fe6fa04798
commit fba074227d
7 changed files with 177 additions and 97 deletions

View File

@ -388,6 +388,18 @@
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<filesets>
<fileset>
<directory>${basedir}/activemq-data</directory>
</fileset>
</filesets>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>

View File

@ -136,12 +136,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @param noLocal
* @param browser
* @param dispatchAsync
* @param messageListener
* @throws JMSException
*/
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
String name, String selector, int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser,
boolean dispatchAsync) throws JMSException {
String name, String selector, int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser,
boolean dispatchAsync, MessageListener messageListener) throws JMSException {
if (dest == null) {
throw new InvalidDestinationException("Don't understand null destinations");
} else if (dest.getPhysicalName() == null) {
@ -206,6 +207,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
&& !info.isBrowser();
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
if (messageListener != null) {
setMessageListener(messageListener);
}
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);

View File

@ -96,7 +96,7 @@ public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
browseDone.set(false);
ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
.getMaximumPendingMessageLimit(), false, true, dispatchAsync) {
.getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
public void dispatch(MessageDispatch md) {
if (md.getMessage() == null) {
browseDone.set(true);

View File

@ -68,7 +68,7 @@ public class ActiveMQQueueReceiver extends ActiveMQMessageConsumer implements Qu
int maximumPendingMessageCount, boolean asyncDispatch)
throws JMSException {
super(theSession, consumerId, destination, null, selector, prefetch, maximumPendingMessageCount,
false, false, asyncDispatch);
false, false, asyncDispatch, null);
}
/**

View File

@ -16,6 +16,23 @@
*/
package org.apache.activemq;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.command.*;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Message;
import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
@ -26,69 +43,6 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* <P>
* A <CODE>Session</CODE> object is a single-threaded context for producing
@ -866,8 +820,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @since 1.1
*/
public MessageConsumer createConsumer(Destination destination) throws JMSException {
checkClosed();
return createConsumer(destination, null);
return createConsumer(destination, (String) null);
}
/**
@ -894,38 +847,54 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @since 1.1
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
checkClosed();
if (destination instanceof CustomDestination) {
CustomDestination customDestination = (CustomDestination)destination;
return customDestination.createConsumer(this, messageSelector);
}
int prefetch = 0;
ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
if (destination instanceof Topic) {
prefetch = prefetchPolicy.getTopicPrefetch();
} else {
prefetch = prefetchPolicy.getQueuePrefetch();
}
return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetch,
prefetchPolicy.getMaximumPendingMessageLimit(), false, false, asyncDispatch);
return createConsumer(destination, messageSelector, false);
}
/**
* @return
* Creates a <CODE>MessageConsumer</CODE> for the specified destination.
* Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
* <CODE>Destination</CODE>, they can be used in the destination
* parameter to create a <CODE>MessageConsumer</CODE>.
*
* @param destination the <CODE>Destination</CODE> to access.
* @param messageListener the listener to use for async consumption of messages
* @return the MessageConsumer
* @throws JMSException if the session fails to create a consumer due to
* some internal error.
* @throws InvalidDestinationException if an invalid destination is
* specified.
* @since 1.1
*/
protected ConsumerId getNextConsumerId() {
return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
return createConsumer(destination, null, messageListener);
}
/**
* @return
* Creates a <CODE>MessageConsumer</CODE> for the specified destination,
* using a message selector. Since <CODE> Queue</CODE> and
* <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
* can be used in the destination parameter to create a
* <CODE>MessageConsumer</CODE>.
* <P>
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages
* that have been sent to a destination.
*
* @param destination the <CODE>Destination</CODE> to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param messageListener the listener to use for async consumption of messages
* @return the MessageConsumer
* @throws JMSException if the session fails to create a MessageConsumer due
* to some internal error.
* @throws InvalidDestinationException if an invalid destination is
* specified.
* @throws InvalidSelectorException if the message selector is invalid.
* @since 1.1
*/
protected ProducerId getNextProducerId() {
return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
return createConsumer(destination, messageSelector, false, messageListener);
}
/**
@ -965,6 +934,47 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @since 1.1
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
return createConsumer(destination, messageSelector, noLocal, null);
}
/**
* Creates <CODE>MessageConsumer</CODE> for the specified destination,
* using a message selector. This method can specify whether messages
* published by its own connection should be delivered to it, if the
* destination is a topic.
* <P>
* Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
* <CODE>Destination</CODE>, they can be used in the destination
* parameter to create a <CODE>MessageConsumer</CODE>.
* <P>
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages
* that have been published to a destination.
* <P>
* In some cases, a connection may both publish and subscribe to a topic.
* The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
* inhibit the delivery of messages published by its own connection. The
* default value for this attribute is False. The <CODE>noLocal</CODE>
* value must be supported by destinations that are topics.
*
* @param destination the <CODE>Destination</CODE> to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param noLocal - if true, and the destination is a topic, inhibits the
* delivery of messages published by its own connection. The
* behavior for <CODE>NoLocal</CODE> is not specified if
* the destination is a queue.
* @param messageListener the listener to use for async consumption of messages
* @return the MessageConsumer
* @throws JMSException if the session fails to create a MessageConsumer due
* to some internal error.
* @throws InvalidDestinationException if an invalid destination is
* specified.
* @throws InvalidSelectorException if the message selector is invalid.
* @since 1.1
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
checkClosed();
if (destination instanceof CustomDestination) {
@ -973,8 +983,15 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetchPolicy
.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
int prefetch = 0;
if (destination instanceof Topic) {
prefetch = prefetchPolicy.getTopicPrefetch();
} else {
prefetch = prefetchPolicy.getQueuePrefetch();
}
ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch, messageListener);
}
/**
@ -1521,6 +1538,20 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
return info.getSessionId();
}
/**
* @return
*/
protected ConsumerId getNextConsumerId() {
return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
}
/**
* @return
*/
protected ProducerId getNextProducerId() {
return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
}
/**
* Sends the message for dispatch by the broker.
*

View File

@ -114,7 +114,7 @@ public class ActiveMQTopicSubscriber extends ActiveMQMessageConsumer implements
protected ActiveMQTopicSubscriber(ActiveMQSession theSession,
ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount,
boolean noLocalValue, boolean browserValue, boolean asyncDispatch) throws JMSException {
super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch);
super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch, null);
}
/**

View File

@ -251,6 +251,38 @@ public class JMSConsumerTest extends JmsTestSupport {
assertEquals(4, counter.get());
}
public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
}
public void testPassMessageListenerIntoCreateConsumer() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch done = new CountDownLatch(1);
// Receive a message with the JMS API
connection.start();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination, new MessageListener() {
public void onMessage(Message m) {
counter.incrementAndGet();
if (counter.get() == 4) {
done.countDown();
}
}
});
// Send the messages
sendMessages(session, destination, 4);
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
Thread.sleep(200);
// Make sure only 4 messages were delivered.
assertEquals(4, counter.get());
}
public void initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),