diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java index 8c18dc5b61..7f2fb92144 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java @@ -113,6 +113,9 @@ public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration { return; } try { + if (session.getTransacted()) { + session.commit(); + } consumer.close(); consumer = null; } catch (JMSException e) { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java index 403434f9bb..b4ba8b38c4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java @@ -33,7 +33,7 @@ import org.apache.commons.logging.LogFactory; public class DeadLetterTest extends DeadLetterTestSupport { private static final Log LOG = LogFactory.getLog(DeadLetterTest.class); - private int rollbackCount; + protected int rollbackCount; protected void doTest() throws Exception { connection.start(); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java index b2b543655a..f4479d34d4 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java @@ -23,6 +23,8 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; @@ -51,6 +53,7 @@ public abstract class DeadLetterTestSupport extends TestSupport { protected boolean durableSubscriber; protected Destination dlqDestination; protected MessageConsumer dlqConsumer; + protected QueueBrowser dlqBrowser; protected BrokerService broker; protected boolean transactedMode; protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE; @@ -108,6 +111,13 @@ public abstract class DeadLetterTestSupport extends TestSupport { LOG.info("Consuming from dead letter on: " + dlqDestination); dlqConsumer = session.createConsumer(dlqDestination); } + + protected void makeDlqBrowser() throws JMSException { + dlqDestination = createDlqDestination(); + + LOG.info("Browsing dead letter on: " + dlqDestination); + dlqBrowser = session.createBrowser((Queue)dlqDestination); + } protected void sendMessages() throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java index d7927b9c07..b378c219d2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java @@ -16,19 +16,29 @@ */ package org.apache.activemq.broker.policy; -import javax.jms.Destination; +import java.util.Enumeration; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; + +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * @version $Revision$ */ public class IndividualDeadLetterTest extends DeadLetterTest { + private static final Log LOG = LogFactory.getLog(IndividualDeadLetterTest.class); protected BrokerService createBroker() throws Exception { BrokerService broker = super.createBroker(); @@ -50,4 +60,48 @@ public class IndividualDeadLetterTest extends DeadLetterTest { String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue."; return new ActiveMQQueue(prefix + getClass().getName() + "." + getName()); } + + public void testDLQBrowsing() throws Exception { + super.topic = false; + deliveryMode = DeliveryMode.PERSISTENT; + durableSubscriber = false; + messageCount = 1; + + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + LOG.info("Will redeliver messages: " + rollbackCount + " times"); + + sendMessages(); + + // now lets receive and rollback N times + for (int i = 0; i < rollbackCount; i++) { + makeConsumer(); + Message message = consumer.receive(5000); + assertNotNull("No message received: ", message); + + session.rollback(); + LOG.info("Rolled back: " + rollbackCount + " times"); + consumer.close(); + } + + makeDlqBrowser(); + browseDlq(); + dlqBrowser.close(); + session.close(); + Thread.sleep(1000); + session = connection.createSession(transactedMode, acknowledgeMode); + Queue testQueue = new ActiveMQQueue("ActiveMQ.DLQ.Queue.ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName()); + MessageConsumer testConsumer = session.createConsumer(testQueue); + assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000)); + + } + + protected void browseDlq() throws Exception { + Enumeration messages = dlqBrowser.getEnumeration(); + while (messages.hasMoreElements()) { + LOG.info("Browsing: " + messages.nextElement()); + } + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompLoadTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompLoadTest.java index 2b1605a148..be52c5f17a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompLoadTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompLoadTest.java @@ -29,8 +29,8 @@ public class StompLoadTest extends TestCase { final int msgCount = 10000; final int producerCount = 5; final int consumerCount = 5; - final int testTime = 10 * 60 * 1000; - final String bindAddress = "stomp://0.0.0.0:61613"; + final int testTime = 30 * 60 * 1000; + final String bindAddress = "stomp://0.0.0.0:61612"; public void testLoad() throws Exception {