diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 3c95e31fa0..a01f8bdc1b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -87,6 +87,7 @@ import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.util.Callback; +import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,21 +173,21 @@ import org.slf4j.LoggerFactory; * transactions directly, it is unlikely that many JMS clients will do this. * Support for JTA in the JMS API is targeted at systems vendors who will be * integrating the JMS API into their application server products. - * - * + * + * * @see javax.jms.Session * @see javax.jms.QueueSession * @see javax.jms.TopicSession * @see javax.jms.XASession */ public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { - - /** - * Only acknowledge an individual message - using message.acknowledge() - * as opposed to CLIENT_ACKNOWLEDGE which - * acknowledges all messages consumed by a session at when acknowledge() - * is called - */ + + /** + * Only acknowledge an individual message - using message.acknowledge() + * as opposed to CLIENT_ACKNOWLEDGE which + * acknowledges all messages consumed by a session at when acknowledge() + * is called + */ public static final int INDIVIDUAL_ACKNOWLEDGE = 4; public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; @@ -229,7 +230,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Construct the Session - * + * * @param connection * @param sessionId * @param acknowledgeMode n.b if transacted - the acknowledgeMode == @@ -253,7 +254,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta this.scheduler=connection.getScheduler(); this.connectionExecutor=connection.getExecutor(); this.executor = new ActiveMQSessionExecutor(this); - connection.addSession(this); + connection.addSession(this); if (connection.isStarted()) { start(); } @@ -266,7 +267,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Sets the transaction context of the session. - * + * * @param transactionContext - provides the means to control a JMS * transaction. */ @@ -276,7 +277,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Returns the transaction context of the session. - * + * * @return transactionContext - session's transaction context. */ public TransactionContext getTransactionContext() { @@ -285,7 +286,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /* * (non-Javadoc) - * + * * @see org.apache.activemq.management.StatsCapable#getStats() */ public StatsImpl getStats() { @@ -294,7 +295,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Returns the session's statistics. - * + * * @return stats - session's statistics. */ public JMSSessionStatsImpl getSessionStats() { @@ -305,7 +306,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Creates a BytesMessage object. A BytesMessage * object is used to send a message containing a stream of uninterpreted * bytes. - * + * * @return the an ActiveMQBytesMessage * @throws JMSException if the JMS provider fails to create this message due * to some internal error. @@ -321,7 +322,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * object is used to send a self-defining set of name-value pairs, where * names are String objects and values are primitive values * in the Java programming language. - * + * * @return an ActiveMQMapMessage * @throws JMSException if the JMS provider fails to create this message due * to some internal error. @@ -338,7 +339,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Message object holds all the standard message header * information. It can be sent when a message containing only header * information is sufficient. - * + * * @return an ActiveMQMessage * @throws JMSException if the JMS provider fails to create this message due * to some internal error. @@ -353,7 +354,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Creates an ObjectMessage object. An * ObjectMessage object is used to send a message that * contains a serializable Java object. - * + * * @return an ActiveMQObjectMessage * @throws JMSException if the JMS provider fails to create this message due * to some internal error. @@ -368,7 +369,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Creates an initialized ObjectMessage object. An * ObjectMessage object is used to send a message that * contains a serializable Java object. - * + * * @param object the object to use to initialize this message * @return an ActiveMQObjectMessage * @throws JMSException if the JMS provider fails to create this message due @@ -385,7 +386,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Creates a StreamMessage object. A * StreamMessage object is used to send a self-defining * stream of primitive values in the Java programming language. - * + * * @return an ActiveMQStreamMessage * @throws JMSException if the JMS provider fails to create this message due * to some internal error. @@ -400,7 +401,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Creates a TextMessage object. A TextMessage * object is used to send a message containing a String * object. - * + * * @return an ActiveMQTextMessage * @throws JMSException if the JMS provider fails to create this message due * to some internal error. @@ -415,7 +416,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Creates an initialized TextMessage object. A * TextMessage object is used to send a message containing a * String. - * + * * @param text the string used to initialize this message * @return an ActiveMQTextMessage * @throws JMSException if the JMS provider fails to create this message due @@ -432,7 +433,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Creates an initialized BlobMessage object. A * BlobMessage object is used to send a message containing a * URL which points to some network addressible BLOB. - * + * * @param url the network addressable URL used to pass directly to the * consumer * @return a BlobMessage @@ -447,7 +448,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Creates an initialized BlobMessage object. A * BlobMessage object is used to send a message containing a * URL which points to some network addressible BLOB. - * + * * @param url the network addressable URL used to pass directly to the * consumer * @param deletedByBroker indicates whether or not the resource is deleted @@ -471,7 +472,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * the File content. Before the message is sent the file * conent will be uploaded to the broker or some other remote repository * depending on the {@link #getBlobTransferPolicy()}. - * + * * @param file the file to be uploaded to some remote repo (or the broker) * depending on the strategy * @return a BlobMessage @@ -494,7 +495,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * the File content. Before the message is sent the file * conent will be uploaded to the broker or some other remote repository * depending on the {@link #getBlobTransferPolicy()}. - * + * * @param in the stream to be uploaded to some remote repo (or the broker) * depending on the strategy * @return a BlobMessage @@ -512,7 +513,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Indicates whether the session is in transacted mode. - * + * * @return true if the session is in transacted mode * @throws JMSException if there is some internal error. */ @@ -525,7 +526,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Returns the acknowledgement mode of the session. The acknowledgement mode * is set at the time that the session is created. If the session is * transacted, the acknowledgement mode is ignored. - * + * * @return If the session is not transacted, returns the current * acknowledgement mode for the session. If the session is * transacted, returns SESSION_TRANSACTED. @@ -541,7 +542,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Commits all messages done in this transaction and releases any locks * currently held. - * + * * @throws JMSException if the JMS provider fails to commit the transaction * due to some internal error. * @throws TransactionRolledBackException if the transaction is rolled back @@ -563,7 +564,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Rolls back any messages done in this transaction and releases any locks * currently held. - * + * * @throws JMSException if the JMS provider fails to roll back the * transaction due to some internal error. * @throws javax.jms.IllegalStateException if the method is not called by a @@ -604,7 +605,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Invoking any other Session method on a closed session must * throw a JMSException.IllegalStateException. Closing a * closed session must not throw an exception. - * + * * @throws JMSException if the JMS provider fails to close the session due * to some internal error. */ @@ -643,13 +644,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } void clearMessagesInProgress() { - executor.clearMessagesInProgress(); + executor.clearMessagesInProgress(); // we are called from inside the transport reconnection logic // which involves us clearing all the connections' consumers - // dispatch and delivered lists. So rather than trying to - // grab a mutex (which could be already owned by the message - // listener calling the send or an ack) we allow it to complete in - // a separate thread via the scheduler and notify us via + // dispatch and delivered lists. So rather than trying to + // grab a mutex (which could be already owned by the message + // listener calling the send or an ack) we allow it to complete in + // a separate thread via the scheduler and notify us via // connection.transportInterruptionProcessingComplete() // for (final ActiveMQMessageConsumer consumer : consumers) { @@ -714,7 +715,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Check if the session is closed. It is used for ensuring that the session * is open before performing various operations. - * + * * @throws IllegalStateException if the Session is closed */ protected void checkClosed() throws IllegalStateException { @@ -749,7 +750,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * that had been previously delivered. Redelivered messages do not have to * be delivered in exactly their original delivery order. * - * + * * @throws JMSException if the JMS provider fails to stop and restart * message delivery due to some internal error. * @throws IllegalStateException if the method is called by a transacted @@ -771,7 +772,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Returns the session's distinguished message listener (optional). - * + * * @return the message listener associated with this session * @throws JMSException if the JMS provider fails to get the message * listener due to an internal error. @@ -792,7 +793,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * messages are still supported. *

* This is an expert facility not used by regular JMS clients. - * + * * @param listener the message listener to associate with this session * @throws JMSException if the JMS provider fails to set the message * listener due to an internal error. @@ -812,7 +813,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Optional operation, intended to be used only by Application Servers, not * by ordinary JMS clients. - * + * * @see javax.jms.ServerSession */ public void run() { @@ -876,7 +877,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta ack.setFirstMessageId(md.getMessage().getMessageId()); asyncSendPacket(ack); } else { - + MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); asyncSendPacket(ack); @@ -916,7 +917,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * a destination. Since Queue and Topic both * inherit from Destination, they can be used in the * destination parameter to create a MessageProducer object. - * + * * @param destination the Destination to send to, or null if * this is a producer which does not have a specified * destination. @@ -942,7 +943,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * Since Queue and Topic both inherit from * Destination, they can be used in the destination * parameter to create a MessageConsumer. - * + * * @param destination the Destination to access. * @return the MessageConsumer * @throws JMSException if the session fails to create a consumer due to @@ -964,7 +965,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta *

* A client uses a MessageConsumer object to receive messages * that have been sent to a destination. - * + * * @param destination the Destination to access * @param messageSelector only messages with properties matching the message * selector expression are delivered. A value of null or an @@ -1047,7 +1048,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * inhibit the delivery of messages published by its own connection. The * default value for this attribute is False. The noLocal * value must be supported by destinations that are topics. - * + * * @param destination the Destination to access * @param messageSelector only messages with properties matching the message * selector expression are delivered. A value of null or an @@ -1139,7 +1140,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * initiated by the JMS API. The one exception is the creation of temporary * queues, which is accomplished with the createTemporaryQueue * method. - * + * * @param queueName the name of this Queue * @return a Queue with the given name * @throws JMSException if the session fails to create a queue due to some @@ -1167,7 +1168,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * initiated by the JMS API. The one exception is the creation of temporary * topics, which is accomplished with the createTemporaryTopic * method. - * + * * @param topicName the name of this Topic * @return a Topic with the given name * @throws JMSException if the session fails to create a topic due to some @@ -1185,7 +1186,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Creates a QueueBrowser object to peek at the messages on * the specified queue. - * + * * @param queue the queue to access * @exception InvalidDestinationException if an invalid destination is * specified @@ -1216,7 +1217,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * The subscriber NoLocal attribute allows a subscriber to * inhibit the delivery of messages published by its own connection. The * default value for this attribute is false. - * + * * @param topic the non-temporary Topic to subscribe to * @param name the name used to identify this subscription * @return the TopicSubscriber @@ -1254,7 +1255,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * durable TopicSubscriber with the same name and a new topic * and/or message selector. Changing a durable subscriber is equivalent to * unsubscribing (deleting) the old one and creating a new one. - * + * * @param topic the non-temporary Topic to subscribe to * @param name the name used to identify this subscription * @param messageSelector only messages with properties matching the message @@ -1273,6 +1274,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { checkClosed(); + if (isIndividualAcknowledge()) { + throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+ + "INDIVIDUAL_ACKNOWLEDGE mode.", null); + } + if (topic instanceof CustomDestination) { CustomDestination customDestination = (CustomDestination)topic; return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); @@ -1289,7 +1295,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Creates a QueueBrowser object to peek at the messages on * the specified queue. - * + * * @param queue the queue to access * @return the Queue Browser * @throws JMSException if the session fails to create a browser due to some @@ -1306,7 +1312,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Creates a QueueBrowser object to peek at the messages on * the specified queue using a message selector. - * + * * @param queue the queue to access * @param messageSelector only messages with properties matching the message * selector expression are delivered. A value of null or an @@ -1328,7 +1334,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Creates a TemporaryQueue object. Its lifetime will be that * of the Connection unless it is deleted earlier. - * + * * @return a temporary queue identity * @throws JMSException if the session fails to create a temporary queue due * to some internal error. @@ -1342,7 +1348,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Creates a TemporaryTopic object. Its lifetime will be that * of the Connection unless it is deleted earlier. - * + * * @return a temporary topic identity * @throws JMSException if the session fails to create a temporary topic due * to some internal error. @@ -1356,7 +1362,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Creates a QueueReceiver object to receive messages from * the specified queue. - * + * * @param queue the Queue to access * @return * @throws JMSException if the session fails to create a receiver due to @@ -1372,7 +1378,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Creates a QueueReceiver object to receive messages from * the specified queue using a message selector. - * + * * @param queue the Queue to access * @param messageSelector only messages with properties matching the message * selector expression are delivered. A value of null or an @@ -1400,7 +1406,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Creates a QueueSender object to send messages to the * specified queue. - * + * * @param queue the Queue to access, or null if this is an * unidentified producer * @return QueueSender @@ -1431,7 +1437,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * The subscriber NoLocal attribute allows a subscriber to * inhibit the delivery of messages published by its own connection. The * default value for this attribute is false. - * + * * @param topic the Topic to subscribe to * @return TopicSubscriber * @throws JMSException if the session fails to create a subscriber due to @@ -1462,7 +1468,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * The subscriber NoLocal attribute allows a subscriber to * inhibit the delivery of messages published by its own connection. The * default value for this attribute is false. - * + * * @param topic the Topic to subscribe to * @param messageSelector only messages with properties matching the message * selector expression are delivered. A value of null or an @@ -1496,7 +1502,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * on a topic. Each time a client creates a TopicPublisher on * a topic, it defines a new sequence of messages that have no ordering * relationship with the messages it has previously sent. - * + * * @param topic the Topic to publish to, or null if this is * an unidentified producer * @return TopicPublisher @@ -1526,7 +1532,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * TopicSubscriber for the subscription, or while a consumed * message is part of a pending transaction or has not been acknowledged in * the session. - * + * * @param name the name used to identify this subscription * @throws JMSException if the session fails to unsubscribe to the durable * subscription due to some internal error. @@ -1567,7 +1573,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * group, thereby acknowledging all messages consumed by the session.) *

* Messages that have been received but not acknowledged may be redelivered. - * + * * @throws JMSException if the JMS provider fails to acknowledge the * messages due to some internal error. * @throws javax.jms.IllegalStateException if this method is called on a @@ -1583,7 +1589,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Add a message consumer. - * + * * @param consumer - message consumer. * @throws JMSException */ @@ -1597,7 +1603,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Remove the message consumer. - * + * * @param consumer - consumer to be removed. * @throws JMSException */ @@ -1612,7 +1618,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Adds a message producer. - * + * * @param producer - message producer to be added. * @throws JMSException */ @@ -1623,7 +1629,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Removes a message producer. - * + * * @param producer - message producer to be removed. * @throws JMSException */ @@ -1634,7 +1640,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Start this Session. - * + * * @throws JMSException */ protected void start() throws JMSException { @@ -1648,7 +1654,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Stops this session. - * + * * @throws JMSException */ protected void stop() throws JMSException { @@ -1664,7 +1670,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Returns the session id. - * + * * @return value - session id. */ protected SessionId getSessionId() { @@ -1687,7 +1693,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Sends the message for dispatch by the broker. - * + * * @param producer - message producer. * @param destination - message destination. * @param message - message to be sent. @@ -1775,7 +1781,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Send TransactionInfo to indicate transaction has started - * + * * @throws JMSException if some internal error occurs */ protected void doStartTransaction() throws JMSException { @@ -1786,7 +1792,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Checks whether the session has unconsumed messages. - * + * * @return true - if there are unconsumed messages. */ public boolean hasUncomsumedMessages() { @@ -1795,7 +1801,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Checks whether the session uses transactions. - * + * * @return true - if the session uses transactions. */ public boolean isTransacted() { @@ -1804,7 +1810,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Checks whether the session used client acknowledgment. - * + * * @return true - if the session uses client acknowledgment. */ protected boolean isClientAcknowledge() { @@ -1813,7 +1819,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Checks whether the session used auto acknowledgment. - * + * * @return true - if the session uses client acknowledgment. */ public boolean isAutoAcknowledge() { @@ -1822,20 +1828,20 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Checks whether the session used dup ok acknowledgment. - * + * * @return true - if the session uses client acknowledgment. */ public boolean isDupsOkAcknowledge() { return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; } - + public boolean isIndividualAcknowledge(){ - return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; + return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; } /** * Returns the message delivery listener. - * + * * @return deliveryListener - message delivery listener. */ public DeliveryListener getDeliveryListener() { @@ -1844,7 +1850,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Sets the message delivery listener. - * + * * @param deliveryListener - message delivery listener. */ public void setDeliveryListener(DeliveryListener deliveryListener) { @@ -1853,7 +1859,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Returns the SessionInfo bean. - * + * * @return info - SessionInfo bean. * @throws JMSException */ @@ -1864,7 +1870,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Send the asynchronus command. - * + * * @param command - command to be executed. * @throws JMSException */ @@ -1874,7 +1880,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta /** * Send the synchronus command. - * + * * @param command - command to be executed. * @return Response * @throws JMSException @@ -2019,7 +2025,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } return false; } - + /** * highest sequence id of the last message delivered by this session. * Passed to the broker in the close command, maintained by dispose() @@ -2028,11 +2034,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta public long getLastDeliveredSequenceId() { return lastDeliveredSequenceId; } - + protected void sendAck(MessageAck ack) throws JMSException { sendAck(ack,false); } - + protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { if (lazy || connection.isSendAcksAsync() || getTransacted()) { asyncSendPacket(ack); @@ -2040,11 +2046,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta syncSendPacket(ack); } } - + protected Scheduler getScheduler() { return this.scheduler; } - + protected ThreadPoolExecutor getConnectionExecutor() { return this.connectionExecutor; } diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java index 51d0fa1572..90a7dee759 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java @@ -24,9 +24,10 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; /** - * + * */ public class JMSIndividualAckTest extends TestSupport { @@ -100,7 +101,7 @@ public class JMSIndividualAckTest extends TestSupport { Message msg = consumer.receive(1000); assertNotNull(msg); msg = consumer.receive(1000); - assertNotNull(msg); + assertNotNull(msg); msg = consumer.receive(1000); assertNotNull(msg); msg.acknowledge(); @@ -121,10 +122,10 @@ public class JMSIndividualAckTest extends TestSupport { assertNull(msg); session.close(); } - + /** * Tests if unacknowledged messages are being re-delivered when the consumer connects again. - * + * * @throws JMSException */ public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException { @@ -137,22 +138,39 @@ public class JMSIndividualAckTest extends TestSupport { // Consume the message... MessageConsumer consumer = session.createConsumer(queue); Message msg = consumer.receive(1000); - assertNotNull(msg); + assertNotNull(msg); // Don't ack the message. - + // Reset the session. This should cause the unacknowledged message to be re-delivered. session.close(); session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); - + // Attempt to Consume the message... consumer = session.createConsumer(queue); msg = consumer.receive(2000); - assertNotNull(msg); + assertNotNull(msg); msg.acknowledge(); - + session.close(); } + /** + * Tests that a durable consumer cannot be created for Individual Ack mode. + * + * @throws JMSException + */ + public void testCreateDurableConsumerFails() throws JMSException { + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + Topic dest = session.createTopic(getName()); + + try { + session.createDurableSubscriber(dest, getName()); + fail("Should not be able to create duable subscriber."); + } catch(Exception e) { + } + } + protected String getQueueName() { return getClass().getName() + "." + getName(); }