mirror of https://github.com/apache/activemq.git
fix for https://issues.apache.org/jira/browse/AMQ-3486 with test case.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1166216 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
acde68e3ab
commit
251dc7bb23
|
@ -87,6 +87,7 @@ import org.apache.activemq.thread.Scheduler;
|
||||||
import org.apache.activemq.transaction.Synchronization;
|
import org.apache.activemq.transaction.Synchronization;
|
||||||
import org.apache.activemq.usage.MemoryUsage;
|
import org.apache.activemq.usage.MemoryUsage;
|
||||||
import org.apache.activemq.util.Callback;
|
import org.apache.activemq.util.Callback;
|
||||||
|
import org.apache.activemq.util.JMSExceptionSupport;
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -172,21 +173,21 @@ import org.slf4j.LoggerFactory;
|
||||||
* transactions directly, it is unlikely that many JMS clients will do this.
|
* transactions directly, it is unlikely that many JMS clients will do this.
|
||||||
* Support for JTA in the JMS API is targeted at systems vendors who will be
|
* Support for JTA in the JMS API is targeted at systems vendors who will be
|
||||||
* integrating the JMS API into their application server products.
|
* integrating the JMS API into their application server products.
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* @see javax.jms.Session
|
* @see javax.jms.Session
|
||||||
* @see javax.jms.QueueSession
|
* @see javax.jms.QueueSession
|
||||||
* @see javax.jms.TopicSession
|
* @see javax.jms.TopicSession
|
||||||
* @see javax.jms.XASession
|
* @see javax.jms.XASession
|
||||||
*/
|
*/
|
||||||
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
|
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only acknowledge an individual message - using message.acknowledge()
|
* Only acknowledge an individual message - using message.acknowledge()
|
||||||
* as opposed to CLIENT_ACKNOWLEDGE which
|
* as opposed to CLIENT_ACKNOWLEDGE which
|
||||||
* acknowledges all messages consumed by a session at when acknowledge()
|
* acknowledges all messages consumed by a session at when acknowledge()
|
||||||
* is called
|
* is called
|
||||||
*/
|
*/
|
||||||
public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
|
public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
|
||||||
public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
|
public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
|
||||||
|
|
||||||
|
@ -229,7 +230,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct the Session
|
* Construct the Session
|
||||||
*
|
*
|
||||||
* @param connection
|
* @param connection
|
||||||
* @param sessionId
|
* @param sessionId
|
||||||
* @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
|
* @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
|
||||||
|
@ -253,7 +254,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
this.scheduler=connection.getScheduler();
|
this.scheduler=connection.getScheduler();
|
||||||
this.connectionExecutor=connection.getExecutor();
|
this.connectionExecutor=connection.getExecutor();
|
||||||
this.executor = new ActiveMQSessionExecutor(this);
|
this.executor = new ActiveMQSessionExecutor(this);
|
||||||
connection.addSession(this);
|
connection.addSession(this);
|
||||||
if (connection.isStarted()) {
|
if (connection.isStarted()) {
|
||||||
start();
|
start();
|
||||||
}
|
}
|
||||||
|
@ -266,7 +267,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the transaction context of the session.
|
* Sets the transaction context of the session.
|
||||||
*
|
*
|
||||||
* @param transactionContext - provides the means to control a JMS
|
* @param transactionContext - provides the means to control a JMS
|
||||||
* transaction.
|
* transaction.
|
||||||
*/
|
*/
|
||||||
|
@ -276,7 +277,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the transaction context of the session.
|
* Returns the transaction context of the session.
|
||||||
*
|
*
|
||||||
* @return transactionContext - session's transaction context.
|
* @return transactionContext - session's transaction context.
|
||||||
*/
|
*/
|
||||||
public TransactionContext getTransactionContext() {
|
public TransactionContext getTransactionContext() {
|
||||||
|
@ -285,7 +286,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (non-Javadoc)
|
* (non-Javadoc)
|
||||||
*
|
*
|
||||||
* @see org.apache.activemq.management.StatsCapable#getStats()
|
* @see org.apache.activemq.management.StatsCapable#getStats()
|
||||||
*/
|
*/
|
||||||
public StatsImpl getStats() {
|
public StatsImpl getStats() {
|
||||||
|
@ -294,7 +295,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the session's statistics.
|
* Returns the session's statistics.
|
||||||
*
|
*
|
||||||
* @return stats - session's statistics.
|
* @return stats - session's statistics.
|
||||||
*/
|
*/
|
||||||
public JMSSessionStatsImpl getSessionStats() {
|
public JMSSessionStatsImpl getSessionStats() {
|
||||||
|
@ -305,7 +306,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
|
* Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
|
||||||
* object is used to send a message containing a stream of uninterpreted
|
* object is used to send a message containing a stream of uninterpreted
|
||||||
* bytes.
|
* bytes.
|
||||||
*
|
*
|
||||||
* @return the an ActiveMQBytesMessage
|
* @return the an ActiveMQBytesMessage
|
||||||
* @throws JMSException if the JMS provider fails to create this message due
|
* @throws JMSException if the JMS provider fails to create this message due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
|
@ -321,7 +322,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* object is used to send a self-defining set of name-value pairs, where
|
* object is used to send a self-defining set of name-value pairs, where
|
||||||
* names are <CODE>String</CODE> objects and values are primitive values
|
* names are <CODE>String</CODE> objects and values are primitive values
|
||||||
* in the Java programming language.
|
* in the Java programming language.
|
||||||
*
|
*
|
||||||
* @return an ActiveMQMapMessage
|
* @return an ActiveMQMapMessage
|
||||||
* @throws JMSException if the JMS provider fails to create this message due
|
* @throws JMSException if the JMS provider fails to create this message due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
|
@ -338,7 +339,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* <CODE>Message</CODE> object holds all the standard message header
|
* <CODE>Message</CODE> object holds all the standard message header
|
||||||
* information. It can be sent when a message containing only header
|
* information. It can be sent when a message containing only header
|
||||||
* information is sufficient.
|
* information is sufficient.
|
||||||
*
|
*
|
||||||
* @return an ActiveMQMessage
|
* @return an ActiveMQMessage
|
||||||
* @throws JMSException if the JMS provider fails to create this message due
|
* @throws JMSException if the JMS provider fails to create this message due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
|
@ -353,7 +354,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Creates an <CODE>ObjectMessage</CODE> object. An
|
* Creates an <CODE>ObjectMessage</CODE> object. An
|
||||||
* <CODE>ObjectMessage</CODE> object is used to send a message that
|
* <CODE>ObjectMessage</CODE> object is used to send a message that
|
||||||
* contains a serializable Java object.
|
* contains a serializable Java object.
|
||||||
*
|
*
|
||||||
* @return an ActiveMQObjectMessage
|
* @return an ActiveMQObjectMessage
|
||||||
* @throws JMSException if the JMS provider fails to create this message due
|
* @throws JMSException if the JMS provider fails to create this message due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
|
@ -368,7 +369,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Creates an initialized <CODE>ObjectMessage</CODE> object. An
|
* Creates an initialized <CODE>ObjectMessage</CODE> object. An
|
||||||
* <CODE>ObjectMessage</CODE> object is used to send a message that
|
* <CODE>ObjectMessage</CODE> object is used to send a message that
|
||||||
* contains a serializable Java object.
|
* contains a serializable Java object.
|
||||||
*
|
*
|
||||||
* @param object the object to use to initialize this message
|
* @param object the object to use to initialize this message
|
||||||
* @return an ActiveMQObjectMessage
|
* @return an ActiveMQObjectMessage
|
||||||
* @throws JMSException if the JMS provider fails to create this message due
|
* @throws JMSException if the JMS provider fails to create this message due
|
||||||
|
@ -385,7 +386,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Creates a <CODE>StreamMessage</CODE> object. A
|
* Creates a <CODE>StreamMessage</CODE> object. A
|
||||||
* <CODE>StreamMessage</CODE> object is used to send a self-defining
|
* <CODE>StreamMessage</CODE> object is used to send a self-defining
|
||||||
* stream of primitive values in the Java programming language.
|
* stream of primitive values in the Java programming language.
|
||||||
*
|
*
|
||||||
* @return an ActiveMQStreamMessage
|
* @return an ActiveMQStreamMessage
|
||||||
* @throws JMSException if the JMS provider fails to create this message due
|
* @throws JMSException if the JMS provider fails to create this message due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
|
@ -400,7 +401,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
|
* Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
|
||||||
* object is used to send a message containing a <CODE>String</CODE>
|
* object is used to send a message containing a <CODE>String</CODE>
|
||||||
* object.
|
* object.
|
||||||
*
|
*
|
||||||
* @return an ActiveMQTextMessage
|
* @return an ActiveMQTextMessage
|
||||||
* @throws JMSException if the JMS provider fails to create this message due
|
* @throws JMSException if the JMS provider fails to create this message due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
|
@ -415,7 +416,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Creates an initialized <CODE>TextMessage</CODE> object. A
|
* Creates an initialized <CODE>TextMessage</CODE> object. A
|
||||||
* <CODE>TextMessage</CODE> object is used to send a message containing a
|
* <CODE>TextMessage</CODE> object is used to send a message containing a
|
||||||
* <CODE>String</CODE>.
|
* <CODE>String</CODE>.
|
||||||
*
|
*
|
||||||
* @param text the string used to initialize this message
|
* @param text the string used to initialize this message
|
||||||
* @return an ActiveMQTextMessage
|
* @return an ActiveMQTextMessage
|
||||||
* @throws JMSException if the JMS provider fails to create this message due
|
* @throws JMSException if the JMS provider fails to create this message due
|
||||||
|
@ -432,7 +433,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Creates an initialized <CODE>BlobMessage</CODE> object. A
|
* Creates an initialized <CODE>BlobMessage</CODE> object. A
|
||||||
* <CODE>BlobMessage</CODE> object is used to send a message containing a
|
* <CODE>BlobMessage</CODE> object is used to send a message containing a
|
||||||
* <CODE>URL</CODE> which points to some network addressible BLOB.
|
* <CODE>URL</CODE> which points to some network addressible BLOB.
|
||||||
*
|
*
|
||||||
* @param url the network addressable URL used to pass directly to the
|
* @param url the network addressable URL used to pass directly to the
|
||||||
* consumer
|
* consumer
|
||||||
* @return a BlobMessage
|
* @return a BlobMessage
|
||||||
|
@ -447,7 +448,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Creates an initialized <CODE>BlobMessage</CODE> object. A
|
* Creates an initialized <CODE>BlobMessage</CODE> object. A
|
||||||
* <CODE>BlobMessage</CODE> object is used to send a message containing a
|
* <CODE>BlobMessage</CODE> object is used to send a message containing a
|
||||||
* <CODE>URL</CODE> which points to some network addressible BLOB.
|
* <CODE>URL</CODE> which points to some network addressible BLOB.
|
||||||
*
|
*
|
||||||
* @param url the network addressable URL used to pass directly to the
|
* @param url the network addressable URL used to pass directly to the
|
||||||
* consumer
|
* consumer
|
||||||
* @param deletedByBroker indicates whether or not the resource is deleted
|
* @param deletedByBroker indicates whether or not the resource is deleted
|
||||||
|
@ -471,7 +472,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* the <CODE>File</CODE> content. Before the message is sent the file
|
* the <CODE>File</CODE> content. Before the message is sent the file
|
||||||
* conent will be uploaded to the broker or some other remote repository
|
* conent will be uploaded to the broker or some other remote repository
|
||||||
* depending on the {@link #getBlobTransferPolicy()}.
|
* depending on the {@link #getBlobTransferPolicy()}.
|
||||||
*
|
*
|
||||||
* @param file the file to be uploaded to some remote repo (or the broker)
|
* @param file the file to be uploaded to some remote repo (or the broker)
|
||||||
* depending on the strategy
|
* depending on the strategy
|
||||||
* @return a BlobMessage
|
* @return a BlobMessage
|
||||||
|
@ -494,7 +495,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* the <CODE>File</CODE> content. Before the message is sent the file
|
* the <CODE>File</CODE> content. Before the message is sent the file
|
||||||
* conent will be uploaded to the broker or some other remote repository
|
* conent will be uploaded to the broker or some other remote repository
|
||||||
* depending on the {@link #getBlobTransferPolicy()}.
|
* depending on the {@link #getBlobTransferPolicy()}.
|
||||||
*
|
*
|
||||||
* @param in the stream to be uploaded to some remote repo (or the broker)
|
* @param in the stream to be uploaded to some remote repo (or the broker)
|
||||||
* depending on the strategy
|
* depending on the strategy
|
||||||
* @return a BlobMessage
|
* @return a BlobMessage
|
||||||
|
@ -512,7 +513,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Indicates whether the session is in transacted mode.
|
* Indicates whether the session is in transacted mode.
|
||||||
*
|
*
|
||||||
* @return true if the session is in transacted mode
|
* @return true if the session is in transacted mode
|
||||||
* @throws JMSException if there is some internal error.
|
* @throws JMSException if there is some internal error.
|
||||||
*/
|
*/
|
||||||
|
@ -525,7 +526,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Returns the acknowledgement mode of the session. The acknowledgement mode
|
* Returns the acknowledgement mode of the session. The acknowledgement mode
|
||||||
* is set at the time that the session is created. If the session is
|
* is set at the time that the session is created. If the session is
|
||||||
* transacted, the acknowledgement mode is ignored.
|
* transacted, the acknowledgement mode is ignored.
|
||||||
*
|
*
|
||||||
* @return If the session is not transacted, returns the current
|
* @return If the session is not transacted, returns the current
|
||||||
* acknowledgement mode for the session. If the session is
|
* acknowledgement mode for the session. If the session is
|
||||||
* transacted, returns SESSION_TRANSACTED.
|
* transacted, returns SESSION_TRANSACTED.
|
||||||
|
@ -541,7 +542,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Commits all messages done in this transaction and releases any locks
|
* Commits all messages done in this transaction and releases any locks
|
||||||
* currently held.
|
* currently held.
|
||||||
*
|
*
|
||||||
* @throws JMSException if the JMS provider fails to commit the transaction
|
* @throws JMSException if the JMS provider fails to commit the transaction
|
||||||
* due to some internal error.
|
* due to some internal error.
|
||||||
* @throws TransactionRolledBackException if the transaction is rolled back
|
* @throws TransactionRolledBackException if the transaction is rolled back
|
||||||
|
@ -563,7 +564,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Rolls back any messages done in this transaction and releases any locks
|
* Rolls back any messages done in this transaction and releases any locks
|
||||||
* currently held.
|
* currently held.
|
||||||
*
|
*
|
||||||
* @throws JMSException if the JMS provider fails to roll back the
|
* @throws JMSException if the JMS provider fails to roll back the
|
||||||
* transaction due to some internal error.
|
* transaction due to some internal error.
|
||||||
* @throws javax.jms.IllegalStateException if the method is not called by a
|
* @throws javax.jms.IllegalStateException if the method is not called by a
|
||||||
|
@ -604,7 +605,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Invoking any other <CODE>Session</CODE> method on a closed session must
|
* Invoking any other <CODE>Session</CODE> method on a closed session must
|
||||||
* throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
|
* throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
|
||||||
* closed session must <I>not </I> throw an exception.
|
* closed session must <I>not </I> throw an exception.
|
||||||
*
|
*
|
||||||
* @throws JMSException if the JMS provider fails to close the session due
|
* @throws JMSException if the JMS provider fails to close the session due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
*/
|
*/
|
||||||
|
@ -643,13 +644,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearMessagesInProgress() {
|
void clearMessagesInProgress() {
|
||||||
executor.clearMessagesInProgress();
|
executor.clearMessagesInProgress();
|
||||||
// we are called from inside the transport reconnection logic
|
// we are called from inside the transport reconnection logic
|
||||||
// which involves us clearing all the connections' consumers
|
// which involves us clearing all the connections' consumers
|
||||||
// dispatch and delivered lists. So rather than trying to
|
// dispatch and delivered lists. So rather than trying to
|
||||||
// grab a mutex (which could be already owned by the message
|
// grab a mutex (which could be already owned by the message
|
||||||
// listener calling the send or an ack) we allow it to complete in
|
// listener calling the send or an ack) we allow it to complete in
|
||||||
// a separate thread via the scheduler and notify us via
|
// a separate thread via the scheduler and notify us via
|
||||||
// connection.transportInterruptionProcessingComplete()
|
// connection.transportInterruptionProcessingComplete()
|
||||||
//
|
//
|
||||||
for (final ActiveMQMessageConsumer consumer : consumers) {
|
for (final ActiveMQMessageConsumer consumer : consumers) {
|
||||||
|
@ -714,7 +715,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Check if the session is closed. It is used for ensuring that the session
|
* Check if the session is closed. It is used for ensuring that the session
|
||||||
* is open before performing various operations.
|
* is open before performing various operations.
|
||||||
*
|
*
|
||||||
* @throws IllegalStateException if the Session is closed
|
* @throws IllegalStateException if the Session is closed
|
||||||
*/
|
*/
|
||||||
protected void checkClosed() throws IllegalStateException {
|
protected void checkClosed() throws IllegalStateException {
|
||||||
|
@ -749,7 +750,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* that had been previously delivered. Redelivered messages do not have to
|
* that had been previously delivered. Redelivered messages do not have to
|
||||||
* be delivered in exactly their original delivery order.
|
* be delivered in exactly their original delivery order.
|
||||||
* </UL>
|
* </UL>
|
||||||
*
|
*
|
||||||
* @throws JMSException if the JMS provider fails to stop and restart
|
* @throws JMSException if the JMS provider fails to stop and restart
|
||||||
* message delivery due to some internal error.
|
* message delivery due to some internal error.
|
||||||
* @throws IllegalStateException if the method is called by a transacted
|
* @throws IllegalStateException if the method is called by a transacted
|
||||||
|
@ -771,7 +772,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the session's distinguished message listener (optional).
|
* Returns the session's distinguished message listener (optional).
|
||||||
*
|
*
|
||||||
* @return the message listener associated with this session
|
* @return the message listener associated with this session
|
||||||
* @throws JMSException if the JMS provider fails to get the message
|
* @throws JMSException if the JMS provider fails to get the message
|
||||||
* listener due to an internal error.
|
* listener due to an internal error.
|
||||||
|
@ -792,7 +793,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* messages are still supported.
|
* messages are still supported.
|
||||||
* <P>
|
* <P>
|
||||||
* This is an expert facility not used by regular JMS clients.
|
* This is an expert facility not used by regular JMS clients.
|
||||||
*
|
*
|
||||||
* @param listener the message listener to associate with this session
|
* @param listener the message listener to associate with this session
|
||||||
* @throws JMSException if the JMS provider fails to set the message
|
* @throws JMSException if the JMS provider fails to set the message
|
||||||
* listener due to an internal error.
|
* listener due to an internal error.
|
||||||
|
@ -812,7 +813,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Optional operation, intended to be used only by Application Servers, not
|
* Optional operation, intended to be used only by Application Servers, not
|
||||||
* by ordinary JMS clients.
|
* by ordinary JMS clients.
|
||||||
*
|
*
|
||||||
* @see javax.jms.ServerSession
|
* @see javax.jms.ServerSession
|
||||||
*/
|
*/
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -876,7 +877,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
ack.setFirstMessageId(md.getMessage().getMessageId());
|
ack.setFirstMessageId(md.getMessage().getMessageId());
|
||||||
asyncSendPacket(ack);
|
asyncSendPacket(ack);
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
|
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
|
||||||
ack.setFirstMessageId(md.getMessage().getMessageId());
|
ack.setFirstMessageId(md.getMessage().getMessageId());
|
||||||
asyncSendPacket(ack);
|
asyncSendPacket(ack);
|
||||||
|
@ -916,7 +917,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
|
* a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
|
||||||
* inherit from <CODE>Destination</CODE>, they can be used in the
|
* inherit from <CODE>Destination</CODE>, they can be used in the
|
||||||
* destination parameter to create a <CODE>MessageProducer</CODE> object.
|
* destination parameter to create a <CODE>MessageProducer</CODE> object.
|
||||||
*
|
*
|
||||||
* @param destination the <CODE>Destination</CODE> to send to, or null if
|
* @param destination the <CODE>Destination</CODE> to send to, or null if
|
||||||
* this is a producer which does not have a specified
|
* this is a producer which does not have a specified
|
||||||
* destination.
|
* destination.
|
||||||
|
@ -942,7 +943,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
|
* Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
|
||||||
* <CODE>Destination</CODE>, they can be used in the destination
|
* <CODE>Destination</CODE>, they can be used in the destination
|
||||||
* parameter to create a <CODE>MessageConsumer</CODE>.
|
* parameter to create a <CODE>MessageConsumer</CODE>.
|
||||||
*
|
*
|
||||||
* @param destination the <CODE>Destination</CODE> to access.
|
* @param destination the <CODE>Destination</CODE> to access.
|
||||||
* @return the MessageConsumer
|
* @return the MessageConsumer
|
||||||
* @throws JMSException if the session fails to create a consumer due to
|
* @throws JMSException if the session fails to create a consumer due to
|
||||||
|
@ -964,7 +965,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* <P>
|
* <P>
|
||||||
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages
|
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages
|
||||||
* that have been sent to a destination.
|
* that have been sent to a destination.
|
||||||
*
|
*
|
||||||
* @param destination the <CODE>Destination</CODE> to access
|
* @param destination the <CODE>Destination</CODE> to access
|
||||||
* @param messageSelector only messages with properties matching the message
|
* @param messageSelector only messages with properties matching the message
|
||||||
* selector expression are delivered. A value of null or an
|
* selector expression are delivered. A value of null or an
|
||||||
|
@ -1047,7 +1048,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* inhibit the delivery of messages published by its own connection. The
|
* inhibit the delivery of messages published by its own connection. The
|
||||||
* default value for this attribute is False. The <CODE>noLocal</CODE>
|
* default value for this attribute is False. The <CODE>noLocal</CODE>
|
||||||
* value must be supported by destinations that are topics.
|
* value must be supported by destinations that are topics.
|
||||||
*
|
*
|
||||||
* @param destination the <CODE>Destination</CODE> to access
|
* @param destination the <CODE>Destination</CODE> to access
|
||||||
* @param messageSelector only messages with properties matching the message
|
* @param messageSelector only messages with properties matching the message
|
||||||
* selector expression are delivered. A value of null or an
|
* selector expression are delivered. A value of null or an
|
||||||
|
@ -1139,7 +1140,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* initiated by the JMS API. The one exception is the creation of temporary
|
* initiated by the JMS API. The one exception is the creation of temporary
|
||||||
* queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
|
* queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
|
||||||
* method.
|
* method.
|
||||||
*
|
*
|
||||||
* @param queueName the name of this <CODE>Queue</CODE>
|
* @param queueName the name of this <CODE>Queue</CODE>
|
||||||
* @return a <CODE>Queue</CODE> with the given name
|
* @return a <CODE>Queue</CODE> with the given name
|
||||||
* @throws JMSException if the session fails to create a queue due to some
|
* @throws JMSException if the session fails to create a queue due to some
|
||||||
|
@ -1167,7 +1168,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* initiated by the JMS API. The one exception is the creation of temporary
|
* initiated by the JMS API. The one exception is the creation of temporary
|
||||||
* topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
|
* topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
|
||||||
* method.
|
* method.
|
||||||
*
|
*
|
||||||
* @param topicName the name of this <CODE>Topic</CODE>
|
* @param topicName the name of this <CODE>Topic</CODE>
|
||||||
* @return a <CODE>Topic</CODE> with the given name
|
* @return a <CODE>Topic</CODE> with the given name
|
||||||
* @throws JMSException if the session fails to create a topic due to some
|
* @throws JMSException if the session fails to create a topic due to some
|
||||||
|
@ -1185,7 +1186,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
|
* Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
|
||||||
* the specified queue.
|
* the specified queue.
|
||||||
*
|
*
|
||||||
* @param queue the <CODE>queue</CODE> to access
|
* @param queue the <CODE>queue</CODE> to access
|
||||||
* @exception InvalidDestinationException if an invalid destination is
|
* @exception InvalidDestinationException if an invalid destination is
|
||||||
* specified
|
* specified
|
||||||
|
@ -1216,7 +1217,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
|
* The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
|
||||||
* inhibit the delivery of messages published by its own connection. The
|
* inhibit the delivery of messages published by its own connection. The
|
||||||
* default value for this attribute is false.
|
* default value for this attribute is false.
|
||||||
*
|
*
|
||||||
* @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
|
* @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
|
||||||
* @param name the name used to identify this subscription
|
* @param name the name used to identify this subscription
|
||||||
* @return the TopicSubscriber
|
* @return the TopicSubscriber
|
||||||
|
@ -1254,7 +1255,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
|
* durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
|
||||||
* and/or message selector. Changing a durable subscriber is equivalent to
|
* and/or message selector. Changing a durable subscriber is equivalent to
|
||||||
* unsubscribing (deleting) the old one and creating a new one.
|
* unsubscribing (deleting) the old one and creating a new one.
|
||||||
*
|
*
|
||||||
* @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
|
* @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
|
||||||
* @param name the name used to identify this subscription
|
* @param name the name used to identify this subscription
|
||||||
* @param messageSelector only messages with properties matching the message
|
* @param messageSelector only messages with properties matching the message
|
||||||
|
@ -1273,6 +1274,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
|
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
|
|
||||||
|
if (isIndividualAcknowledge()) {
|
||||||
|
throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
|
||||||
|
"INDIVIDUAL_ACKNOWLEDGE mode.", null);
|
||||||
|
}
|
||||||
|
|
||||||
if (topic instanceof CustomDestination) {
|
if (topic instanceof CustomDestination) {
|
||||||
CustomDestination customDestination = (CustomDestination)topic;
|
CustomDestination customDestination = (CustomDestination)topic;
|
||||||
return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
|
return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
|
||||||
|
@ -1289,7 +1295,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
|
* Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
|
||||||
* the specified queue.
|
* the specified queue.
|
||||||
*
|
*
|
||||||
* @param queue the <CODE>queue</CODE> to access
|
* @param queue the <CODE>queue</CODE> to access
|
||||||
* @return the Queue Browser
|
* @return the Queue Browser
|
||||||
* @throws JMSException if the session fails to create a browser due to some
|
* @throws JMSException if the session fails to create a browser due to some
|
||||||
|
@ -1306,7 +1312,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
|
* Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
|
||||||
* the specified queue using a message selector.
|
* the specified queue using a message selector.
|
||||||
*
|
*
|
||||||
* @param queue the <CODE>queue</CODE> to access
|
* @param queue the <CODE>queue</CODE> to access
|
||||||
* @param messageSelector only messages with properties matching the message
|
* @param messageSelector only messages with properties matching the message
|
||||||
* selector expression are delivered. A value of null or an
|
* selector expression are delivered. A value of null or an
|
||||||
|
@ -1328,7 +1334,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
|
* Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
|
||||||
* of the <CODE>Connection</CODE> unless it is deleted earlier.
|
* of the <CODE>Connection</CODE> unless it is deleted earlier.
|
||||||
*
|
*
|
||||||
* @return a temporary queue identity
|
* @return a temporary queue identity
|
||||||
* @throws JMSException if the session fails to create a temporary queue due
|
* @throws JMSException if the session fails to create a temporary queue due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
|
@ -1342,7 +1348,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
|
* Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
|
||||||
* of the <CODE>Connection</CODE> unless it is deleted earlier.
|
* of the <CODE>Connection</CODE> unless it is deleted earlier.
|
||||||
*
|
*
|
||||||
* @return a temporary topic identity
|
* @return a temporary topic identity
|
||||||
* @throws JMSException if the session fails to create a temporary topic due
|
* @throws JMSException if the session fails to create a temporary topic due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
|
@ -1356,7 +1362,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Creates a <CODE>QueueReceiver</CODE> object to receive messages from
|
* Creates a <CODE>QueueReceiver</CODE> object to receive messages from
|
||||||
* the specified queue.
|
* the specified queue.
|
||||||
*
|
*
|
||||||
* @param queue the <CODE>Queue</CODE> to access
|
* @param queue the <CODE>Queue</CODE> to access
|
||||||
* @return
|
* @return
|
||||||
* @throws JMSException if the session fails to create a receiver due to
|
* @throws JMSException if the session fails to create a receiver due to
|
||||||
|
@ -1372,7 +1378,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Creates a <CODE>QueueReceiver</CODE> object to receive messages from
|
* Creates a <CODE>QueueReceiver</CODE> object to receive messages from
|
||||||
* the specified queue using a message selector.
|
* the specified queue using a message selector.
|
||||||
*
|
*
|
||||||
* @param queue the <CODE>Queue</CODE> to access
|
* @param queue the <CODE>Queue</CODE> to access
|
||||||
* @param messageSelector only messages with properties matching the message
|
* @param messageSelector only messages with properties matching the message
|
||||||
* selector expression are delivered. A value of null or an
|
* selector expression are delivered. A value of null or an
|
||||||
|
@ -1400,7 +1406,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
/**
|
/**
|
||||||
* Creates a <CODE>QueueSender</CODE> object to send messages to the
|
* Creates a <CODE>QueueSender</CODE> object to send messages to the
|
||||||
* specified queue.
|
* specified queue.
|
||||||
*
|
*
|
||||||
* @param queue the <CODE>Queue</CODE> to access, or null if this is an
|
* @param queue the <CODE>Queue</CODE> to access, or null if this is an
|
||||||
* unidentified producer
|
* unidentified producer
|
||||||
* @return QueueSender
|
* @return QueueSender
|
||||||
|
@ -1431,7 +1437,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
|
* The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
|
||||||
* inhibit the delivery of messages published by its own connection. The
|
* inhibit the delivery of messages published by its own connection. The
|
||||||
* default value for this attribute is false.
|
* default value for this attribute is false.
|
||||||
*
|
*
|
||||||
* @param topic the <CODE>Topic</CODE> to subscribe to
|
* @param topic the <CODE>Topic</CODE> to subscribe to
|
||||||
* @return TopicSubscriber
|
* @return TopicSubscriber
|
||||||
* @throws JMSException if the session fails to create a subscriber due to
|
* @throws JMSException if the session fails to create a subscriber due to
|
||||||
|
@ -1462,7 +1468,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
|
* The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
|
||||||
* inhibit the delivery of messages published by its own connection. The
|
* inhibit the delivery of messages published by its own connection. The
|
||||||
* default value for this attribute is false.
|
* default value for this attribute is false.
|
||||||
*
|
*
|
||||||
* @param topic the <CODE>Topic</CODE> to subscribe to
|
* @param topic the <CODE>Topic</CODE> to subscribe to
|
||||||
* @param messageSelector only messages with properties matching the message
|
* @param messageSelector only messages with properties matching the message
|
||||||
* selector expression are delivered. A value of null or an
|
* selector expression are delivered. A value of null or an
|
||||||
|
@ -1496,7 +1502,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
|
* on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
|
||||||
* a topic, it defines a new sequence of messages that have no ordering
|
* a topic, it defines a new sequence of messages that have no ordering
|
||||||
* relationship with the messages it has previously sent.
|
* relationship with the messages it has previously sent.
|
||||||
*
|
*
|
||||||
* @param topic the <CODE>Topic</CODE> to publish to, or null if this is
|
* @param topic the <CODE>Topic</CODE> to publish to, or null if this is
|
||||||
* an unidentified producer
|
* an unidentified producer
|
||||||
* @return TopicPublisher
|
* @return TopicPublisher
|
||||||
|
@ -1526,7 +1532,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
|
* <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
|
||||||
* message is part of a pending transaction or has not been acknowledged in
|
* message is part of a pending transaction or has not been acknowledged in
|
||||||
* the session.
|
* the session.
|
||||||
*
|
*
|
||||||
* @param name the name used to identify this subscription
|
* @param name the name used to identify this subscription
|
||||||
* @throws JMSException if the session fails to unsubscribe to the durable
|
* @throws JMSException if the session fails to unsubscribe to the durable
|
||||||
* subscription due to some internal error.
|
* subscription due to some internal error.
|
||||||
|
@ -1567,7 +1573,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* group, thereby acknowledging all messages consumed by the session.)
|
* group, thereby acknowledging all messages consumed by the session.)
|
||||||
* <P>
|
* <P>
|
||||||
* Messages that have been received but not acknowledged may be redelivered.
|
* Messages that have been received but not acknowledged may be redelivered.
|
||||||
*
|
*
|
||||||
* @throws JMSException if the JMS provider fails to acknowledge the
|
* @throws JMSException if the JMS provider fails to acknowledge the
|
||||||
* messages due to some internal error.
|
* messages due to some internal error.
|
||||||
* @throws javax.jms.IllegalStateException if this method is called on a
|
* @throws javax.jms.IllegalStateException if this method is called on a
|
||||||
|
@ -1583,7 +1589,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a message consumer.
|
* Add a message consumer.
|
||||||
*
|
*
|
||||||
* @param consumer - message consumer.
|
* @param consumer - message consumer.
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
|
@ -1597,7 +1603,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the message consumer.
|
* Remove the message consumer.
|
||||||
*
|
*
|
||||||
* @param consumer - consumer to be removed.
|
* @param consumer - consumer to be removed.
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
|
@ -1612,7 +1618,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a message producer.
|
* Adds a message producer.
|
||||||
*
|
*
|
||||||
* @param producer - message producer to be added.
|
* @param producer - message producer to be added.
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
|
@ -1623,7 +1629,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes a message producer.
|
* Removes a message producer.
|
||||||
*
|
*
|
||||||
* @param producer - message producer to be removed.
|
* @param producer - message producer to be removed.
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
|
@ -1634,7 +1640,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start this Session.
|
* Start this Session.
|
||||||
*
|
*
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
protected void start() throws JMSException {
|
protected void start() throws JMSException {
|
||||||
|
@ -1648,7 +1654,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stops this session.
|
* Stops this session.
|
||||||
*
|
*
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
protected void stop() throws JMSException {
|
protected void stop() throws JMSException {
|
||||||
|
@ -1664,7 +1670,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the session id.
|
* Returns the session id.
|
||||||
*
|
*
|
||||||
* @return value - session id.
|
* @return value - session id.
|
||||||
*/
|
*/
|
||||||
protected SessionId getSessionId() {
|
protected SessionId getSessionId() {
|
||||||
|
@ -1687,7 +1693,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends the message for dispatch by the broker.
|
* Sends the message for dispatch by the broker.
|
||||||
*
|
*
|
||||||
* @param producer - message producer.
|
* @param producer - message producer.
|
||||||
* @param destination - message destination.
|
* @param destination - message destination.
|
||||||
* @param message - message to be sent.
|
* @param message - message to be sent.
|
||||||
|
@ -1775,7 +1781,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send TransactionInfo to indicate transaction has started
|
* Send TransactionInfo to indicate transaction has started
|
||||||
*
|
*
|
||||||
* @throws JMSException if some internal error occurs
|
* @throws JMSException if some internal error occurs
|
||||||
*/
|
*/
|
||||||
protected void doStartTransaction() throws JMSException {
|
protected void doStartTransaction() throws JMSException {
|
||||||
|
@ -1786,7 +1792,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks whether the session has unconsumed messages.
|
* Checks whether the session has unconsumed messages.
|
||||||
*
|
*
|
||||||
* @return true - if there are unconsumed messages.
|
* @return true - if there are unconsumed messages.
|
||||||
*/
|
*/
|
||||||
public boolean hasUncomsumedMessages() {
|
public boolean hasUncomsumedMessages() {
|
||||||
|
@ -1795,7 +1801,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks whether the session uses transactions.
|
* Checks whether the session uses transactions.
|
||||||
*
|
*
|
||||||
* @return true - if the session uses transactions.
|
* @return true - if the session uses transactions.
|
||||||
*/
|
*/
|
||||||
public boolean isTransacted() {
|
public boolean isTransacted() {
|
||||||
|
@ -1804,7 +1810,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks whether the session used client acknowledgment.
|
* Checks whether the session used client acknowledgment.
|
||||||
*
|
*
|
||||||
* @return true - if the session uses client acknowledgment.
|
* @return true - if the session uses client acknowledgment.
|
||||||
*/
|
*/
|
||||||
protected boolean isClientAcknowledge() {
|
protected boolean isClientAcknowledge() {
|
||||||
|
@ -1813,7 +1819,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks whether the session used auto acknowledgment.
|
* Checks whether the session used auto acknowledgment.
|
||||||
*
|
*
|
||||||
* @return true - if the session uses client acknowledgment.
|
* @return true - if the session uses client acknowledgment.
|
||||||
*/
|
*/
|
||||||
public boolean isAutoAcknowledge() {
|
public boolean isAutoAcknowledge() {
|
||||||
|
@ -1822,20 +1828,20 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks whether the session used dup ok acknowledgment.
|
* Checks whether the session used dup ok acknowledgment.
|
||||||
*
|
*
|
||||||
* @return true - if the session uses client acknowledgment.
|
* @return true - if the session uses client acknowledgment.
|
||||||
*/
|
*/
|
||||||
public boolean isDupsOkAcknowledge() {
|
public boolean isDupsOkAcknowledge() {
|
||||||
return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
|
return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isIndividualAcknowledge(){
|
public boolean isIndividualAcknowledge(){
|
||||||
return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
|
return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the message delivery listener.
|
* Returns the message delivery listener.
|
||||||
*
|
*
|
||||||
* @return deliveryListener - message delivery listener.
|
* @return deliveryListener - message delivery listener.
|
||||||
*/
|
*/
|
||||||
public DeliveryListener getDeliveryListener() {
|
public DeliveryListener getDeliveryListener() {
|
||||||
|
@ -1844,7 +1850,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the message delivery listener.
|
* Sets the message delivery listener.
|
||||||
*
|
*
|
||||||
* @param deliveryListener - message delivery listener.
|
* @param deliveryListener - message delivery listener.
|
||||||
*/
|
*/
|
||||||
public void setDeliveryListener(DeliveryListener deliveryListener) {
|
public void setDeliveryListener(DeliveryListener deliveryListener) {
|
||||||
|
@ -1853,7 +1859,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the SessionInfo bean.
|
* Returns the SessionInfo bean.
|
||||||
*
|
*
|
||||||
* @return info - SessionInfo bean.
|
* @return info - SessionInfo bean.
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
|
@ -1864,7 +1870,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send the asynchronus command.
|
* Send the asynchronus command.
|
||||||
*
|
*
|
||||||
* @param command - command to be executed.
|
* @param command - command to be executed.
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
|
@ -1874,7 +1880,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send the synchronus command.
|
* Send the synchronus command.
|
||||||
*
|
*
|
||||||
* @param command - command to be executed.
|
* @param command - command to be executed.
|
||||||
* @return Response
|
* @return Response
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
|
@ -2019,7 +2025,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* highest sequence id of the last message delivered by this session.
|
* highest sequence id of the last message delivered by this session.
|
||||||
* Passed to the broker in the close command, maintained by dispose()
|
* Passed to the broker in the close command, maintained by dispose()
|
||||||
|
@ -2028,11 +2034,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
public long getLastDeliveredSequenceId() {
|
public long getLastDeliveredSequenceId() {
|
||||||
return lastDeliveredSequenceId;
|
return lastDeliveredSequenceId;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendAck(MessageAck ack) throws JMSException {
|
protected void sendAck(MessageAck ack) throws JMSException {
|
||||||
sendAck(ack,false);
|
sendAck(ack,false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
|
protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
|
||||||
if (lazy || connection.isSendAcksAsync() || getTransacted()) {
|
if (lazy || connection.isSendAcksAsync() || getTransacted()) {
|
||||||
asyncSendPacket(ack);
|
asyncSendPacket(ack);
|
||||||
|
@ -2040,11 +2046,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
syncSendPacket(ack);
|
syncSendPacket(ack);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Scheduler getScheduler() {
|
protected Scheduler getScheduler() {
|
||||||
return this.scheduler;
|
return this.scheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ThreadPoolExecutor getConnectionExecutor() {
|
protected ThreadPoolExecutor getConnectionExecutor() {
|
||||||
return this.connectionExecutor;
|
return this.connectionExecutor;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,9 +24,10 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class JMSIndividualAckTest extends TestSupport {
|
public class JMSIndividualAckTest extends TestSupport {
|
||||||
|
|
||||||
|
@ -100,7 +101,7 @@ public class JMSIndividualAckTest extends TestSupport {
|
||||||
Message msg = consumer.receive(1000);
|
Message msg = consumer.receive(1000);
|
||||||
assertNotNull(msg);
|
assertNotNull(msg);
|
||||||
msg = consumer.receive(1000);
|
msg = consumer.receive(1000);
|
||||||
assertNotNull(msg);
|
assertNotNull(msg);
|
||||||
msg = consumer.receive(1000);
|
msg = consumer.receive(1000);
|
||||||
assertNotNull(msg);
|
assertNotNull(msg);
|
||||||
msg.acknowledge();
|
msg.acknowledge();
|
||||||
|
@ -121,10 +122,10 @@ public class JMSIndividualAckTest extends TestSupport {
|
||||||
assertNull(msg);
|
assertNull(msg);
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests if unacknowledged messages are being re-delivered when the consumer connects again.
|
* Tests if unacknowledged messages are being re-delivered when the consumer connects again.
|
||||||
*
|
*
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
|
public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
|
||||||
|
@ -137,22 +138,39 @@ public class JMSIndividualAckTest extends TestSupport {
|
||||||
// Consume the message...
|
// Consume the message...
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
Message msg = consumer.receive(1000);
|
Message msg = consumer.receive(1000);
|
||||||
assertNotNull(msg);
|
assertNotNull(msg);
|
||||||
// Don't ack the message.
|
// Don't ack the message.
|
||||||
|
|
||||||
// Reset the session. This should cause the unacknowledged message to be re-delivered.
|
// Reset the session. This should cause the unacknowledged message to be re-delivered.
|
||||||
session.close();
|
session.close();
|
||||||
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
|
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
|
||||||
|
|
||||||
// Attempt to Consume the message...
|
// Attempt to Consume the message...
|
||||||
consumer = session.createConsumer(queue);
|
consumer = session.createConsumer(queue);
|
||||||
msg = consumer.receive(2000);
|
msg = consumer.receive(2000);
|
||||||
assertNotNull(msg);
|
assertNotNull(msg);
|
||||||
msg.acknowledge();
|
msg.acknowledge();
|
||||||
|
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that a durable consumer cannot be created for Individual Ack mode.
|
||||||
|
*
|
||||||
|
* @throws JMSException
|
||||||
|
*/
|
||||||
|
public void testCreateDurableConsumerFails() throws JMSException {
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
|
||||||
|
Topic dest = session.createTopic(getName());
|
||||||
|
|
||||||
|
try {
|
||||||
|
session.createDurableSubscriber(dest, getName());
|
||||||
|
fail("Should not be able to create duable subscriber.");
|
||||||
|
} catch(Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected String getQueueName() {
|
protected String getQueueName() {
|
||||||
return getClass().getName() + "." + getName();
|
return getClass().getName() + "." + getName();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue