Allow for durable topic consumers to use individual ack mode.
This commit is contained in:
Timothy Bish 2014-04-28 14:03:43 -04:00
parent d60022ec65
commit a88e19e7cd
3 changed files with 107 additions and 37 deletions

View File

@ -90,7 +90,6 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -291,6 +290,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
*
* @see org.apache.activemq.management.StatsCapable#getStats()
*/
@Override
public StatsImpl getStats() {
return stats;
}
@ -313,6 +313,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to create this message due
* to some internal error.
*/
@Override
public BytesMessage createBytesMessage() throws JMSException {
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
configureMessage(message);
@ -329,6 +330,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to create this message due
* to some internal error.
*/
@Override
public MapMessage createMapMessage() throws JMSException {
ActiveMQMapMessage message = new ActiveMQMapMessage();
configureMessage(message);
@ -346,6 +348,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to create this message due
* to some internal error.
*/
@Override
public Message createMessage() throws JMSException {
ActiveMQMessage message = new ActiveMQMessage();
configureMessage(message);
@ -361,6 +364,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to create this message due
* to some internal error.
*/
@Override
public ObjectMessage createObjectMessage() throws JMSException {
ActiveMQObjectMessage message = new ActiveMQObjectMessage();
configureMessage(message);
@ -377,6 +381,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to create this message due
* to some internal error.
*/
@Override
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
ActiveMQObjectMessage message = new ActiveMQObjectMessage();
configureMessage(message);
@ -393,6 +398,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to create this message due
* to some internal error.
*/
@Override
public StreamMessage createStreamMessage() throws JMSException {
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
configureMessage(message);
@ -408,6 +414,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to create this message due
* to some internal error.
*/
@Override
public TextMessage createTextMessage() throws JMSException {
ActiveMQTextMessage message = new ActiveMQTextMessage();
configureMessage(message);
@ -424,6 +431,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to create this message due
* to some internal error.
*/
@Override
public TextMessage createTextMessage(String text) throws JMSException {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(text);
@ -519,6 +527,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @return true if the session is in transacted mode
* @throws JMSException if there is some internal error.
*/
@Override
public boolean getTransacted() throws JMSException {
checkClosed();
return isTransacted();
@ -536,6 +545,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @see javax.jms.Connection#createSession(boolean,int)
* @since 1.1 exception JMSException if there is some internal error.
*/
@Override
public int getAcknowledgeMode() throws JMSException {
checkClosed();
return this.acknowledgementMode;
@ -552,6 +562,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws javax.jms.IllegalStateException if the method is not called by a
* transacted session.
*/
@Override
public void commit() throws JMSException {
checkClosed();
if (!getTransacted()) {
@ -572,6 +583,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws javax.jms.IllegalStateException if the method is not called by a
* transacted session.
*/
@Override
public void rollback() throws JMSException {
checkClosed();
if (!getTransacted()) {
@ -611,6 +623,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException if the JMS provider fails to close the session due
* to some internal error.
*/
@Override
public void close() throws JMSException {
if (!closed) {
if (getTransactionContext().isInXATransaction()) {
@ -787,6 +800,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws IllegalStateException if the method is called by a transacted
* session.
*/
@Override
public void recover() throws JMSException {
checkClosed();
@ -811,6 +825,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @see javax.jms.ServerSessionPool
* @see javax.jms.ServerSession
*/
@Override
public MessageListener getMessageListener() throws JMSException {
checkClosed();
return this.messageListener;
@ -837,6 +852,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @see javax.jms.ServerSessionPool
* @see javax.jms.ServerSession
*/
@Override
public void setMessageListener(MessageListener listener) throws JMSException {
// only check for closed if we set a new listener, as we allow to clear
// the listener, such as when an application is shutting down, and is
@ -857,6 +873,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
*
* @see javax.jms.ServerSession
*/
@Override
public void run() {
MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) {
@ -885,6 +902,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
@Override
public void execute() throws Exception {
}
});
@ -905,6 +923,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
getTransactionContext().addSynchronization(new Synchronization() {
final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
@Override
public void beforeEnd() throws Exception {
// validate our consumer so we don't push stale acks that get ignored
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
@ -961,6 +980,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
connection.getScheduler().executeAfterDelay(new Runnable() {
@Override
public void run() {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
}
@ -1017,6 +1037,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* specified.
* @since 1.1
*/
@Override
public MessageProducer createProducer(Destination destination) throws JMSException {
checkClosed();
if (destination instanceof CustomDestination) {
@ -1041,6 +1062,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* specified.
* @since 1.1
*/
@Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return createConsumer(destination, (String) null);
}
@ -1068,6 +1090,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws InvalidSelectorException if the message selector is invalid.
* @since 1.1
*/
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
return createConsumer(destination, messageSelector, false);
}
@ -1155,6 +1178,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws InvalidSelectorException if the message selector is invalid.
* @since 1.1
*/
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
return createConsumer(destination, messageSelector, noLocal, null);
}
@ -1236,6 +1260,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* internal error.
* @since 1.1
*/
@Override
public Queue createQueue(String queueName) throws JMSException {
checkClosed();
if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
@ -1264,6 +1289,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* internal error.
* @since 1.1
*/
@Override
public Topic createTopic(String topicName) throws JMSException {
checkClosed();
if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
@ -1315,6 +1341,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws InvalidDestinationException if an invalid topic is specified.
* @since 1.1
*/
@Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
checkClosed();
return createDurableSubscriber(topic, name, null, false);
@ -1360,6 +1387,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws InvalidSelectorException if the message selector is invalid.
* @since 1.1
*/
@Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
checkClosed();
@ -1367,11 +1395,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
throw new InvalidDestinationException("Topic cannot be null");
}
if (isIndividualAcknowledge()) {
throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
"INDIVIDUAL_ACKNOWLEDGE mode.", null);
}
if (topic instanceof CustomDestination) {
CustomDestination customDestination = (CustomDestination)topic;
return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
@ -1397,6 +1420,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* specified
* @since 1.1
*/
@Override
public QueueBrowser createBrowser(Queue queue) throws JMSException {
checkClosed();
return createBrowser(queue, null);
@ -1419,6 +1443,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws InvalidSelectorException if the message selector is invalid.
* @since 1.1
*/
@Override
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
checkClosed();
return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
@ -1433,6 +1458,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* to some internal error.
* @since 1.1
*/
@Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
checkClosed();
return (TemporaryQueue)connection.createTempDestination(false);
@ -1447,6 +1473,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* to some internal error.
* @since 1.1
*/
@Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
checkClosed();
return (TemporaryTopic)connection.createTempDestination(true);
@ -1463,6 +1490,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException
* @throws InvalidDestinationException if an invalid queue is specified.
*/
@Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
checkClosed();
return createReceiver(queue, null);
@ -1483,6 +1511,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws InvalidDestinationException if an invalid queue is specified.
* @throws InvalidSelectorException if the message selector is invalid.
*/
@Override
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
checkClosed();
@ -1507,6 +1536,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* internal error.
* @throws InvalidDestinationException if an invalid queue is specified.
*/
@Override
public QueueSender createSender(Queue queue) throws JMSException {
checkClosed();
if (queue instanceof CustomDestination) {
@ -1537,6 +1567,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* some internal error.
* @throws InvalidDestinationException if an invalid topic is specified.
*/
@Override
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
checkClosed();
return createSubscriber(topic, null, false);
@ -1575,6 +1606,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws InvalidDestinationException if an invalid topic is specified.
* @throws InvalidSelectorException if the message selector is invalid.
*/
@Override
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
checkClosed();
@ -1603,6 +1635,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* some internal error.
* @throws InvalidDestinationException if an invalid topic is specified.
*/
@Override
public TopicPublisher createPublisher(Topic topic) throws JMSException {
checkClosed();
@ -1633,11 +1666,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* specified.
* @since 1.1
*/
@Override
public void unsubscribe(String name) throws JMSException {
checkClosed();
connection.unsubscribe(name);
}
@Override
public void dispatch(MessageDispatch messageDispatch) {
try {
executor.execute(messageDispatch);

View File

@ -24,7 +24,6 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
/**
*
@ -33,6 +32,7 @@ public class JMSIndividualAckTest extends TestSupport {
private Connection connection;
@Override
protected void setUp() throws Exception {
super.setUp();
connection = createConnection();
@ -41,6 +41,7 @@ public class JMSIndividualAckTest extends TestSupport {
/**
* @see junit.framework.TestCase#tearDown()
*/
@Override
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
@ -154,25 +155,7 @@ public class JMSIndividualAckTest extends TestSupport {
session.close();
}
/**
* Tests that a durable consumer cannot be created for Individual Ack mode.
*
* @throws JMSException
*/
public void testCreateDurableConsumerFails() throws JMSException {
connection.start();
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
Topic dest = session.createTopic(getName());
try {
session.createDurableSubscriber(dest, getName());
fail("Should not be able to create duable subscriber.");
} catch(Exception e) {
}
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
}

View File

@ -29,12 +29,13 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
/**
*
*
*/
public abstract class DurableSubscriptionTestSupport extends TestSupport {
@ -44,21 +45,25 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
private MessageProducer producer;
private BrokerService broker;
@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://durable-broker");
}
@Override
protected Connection createConnection() throws Exception {
Connection rc = super.createConnection();
rc.setClientID(getName());
return rc;
}
@Override
protected void setUp() throws Exception {
createBroker();
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
destroyBroker();
@ -104,7 +109,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
}
protected abstract PersistenceAdapter createPersistenceAdapter() throws Exception;
public void testMessageExpire() throws Exception {
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("TestTopic");
@ -117,12 +122,12 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
// Make sure it works when the durable sub is active.
producer.send(session.createTextMessage("Msg:1"));
assertTextMessageEquals("Msg:1", consumer.receive(1000));
consumer.close();
producer.send(session.createTextMessage("Msg:2"));
producer.send(session.createTextMessage("Msg:3"));
consumer = session.createDurableSubscriber(topic, "sub1");
// Try to get the message.
@ -225,7 +230,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
assertTextMessageEquals("Msg:2", consumer.receive(5000));
assertNull(consumer.receive(5000));
}
public void testDurableSubscriptionBrokerRestart() throws Exception {
// Create the durable sub.
@ -235,12 +240,12 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
// Ensure that consumer will receive messages sent before it was created
Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
consumer = session.createDurableSubscriber(topic, "sub1");
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("Msg:1"));
assertTextMessageEquals("Msg:1", consumer.receive(5000));
// Make sure cleanup kicks in
Thread.sleep(1000);
@ -428,8 +433,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
consumer = session.createDurableSubscriber(topic, "sub1");
Message msg = consumer.receive(1000);
assertNotNull(msg);
assertEquals("Message 1", ((TextMessage)msg).getText());
assertEquals("Message 1", ((TextMessage) msg).getText());
}
public void testDurableSubWorksInNewConnection() throws Exception {
@ -459,8 +463,56 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
consumer = session.createDurableSubscriber(topic, "sub1");
Message msg = consumer.receive(1000);
assertNotNull(msg);
assertEquals("Message 1", ((TextMessage)msg).getText());
assertEquals("Message 1", ((TextMessage) msg).getText());
}
public void testIndividualAckWithDurableSubs() throws Exception {
// Create the consumer.
connection.start();
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
Topic topic = session.createTopic("topic-" + getName());
MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
// Drain any messages that may allready be in the sub
while (consumer.receive(1000) != null) {
}
consumer.close();
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("Message 1"));
producer.send(session.createTextMessage("Message 2"));
producer.send(session.createTextMessage("Message 3"));
producer.close();
connection.close();
connection = createConnection();
connection.start();
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "sub1");
Message message = null;
for (int i = 0; i < 3; ++i) {
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("Message " + (i + 1), ((TextMessage) message).getText());
}
message.acknowledge();
connection.close();
connection = createConnection();
connection.start();
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "sub1");
for (int i = 0; i < 2; ++i) {
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("Message " + (i + 1), ((TextMessage) message).getText());
}
}
private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
@ -476,7 +528,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
private void assertTextMessageEquals(String string, Message message) throws JMSException {
assertNotNull("Message was null", message);
assertTrue("Message is not a TextMessage", message instanceof TextMessage);
assertEquals(string, ((TextMessage)message).getText());
assertEquals(string, ((TextMessage) message).getText());
}
}