mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2553 - browsing dlq over transacted session
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@894347 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
57843ea8a9
commit
155495485e
|
@ -113,6 +113,9 @@ public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
if (session.getTransacted()) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
consumer.close();
|
consumer.close();
|
||||||
consumer = null;
|
consumer = null;
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
public class DeadLetterTest extends DeadLetterTestSupport {
|
public class DeadLetterTest extends DeadLetterTestSupport {
|
||||||
private static final Log LOG = LogFactory.getLog(DeadLetterTest.class);
|
private static final Log LOG = LogFactory.getLog(DeadLetterTest.class);
|
||||||
|
|
||||||
private int rollbackCount;
|
protected int rollbackCount;
|
||||||
|
|
||||||
protected void doTest() throws Exception {
|
protected void doTest() throws Exception {
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
|
@ -23,6 +23,8 @@ import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.QueueBrowser;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
@ -51,6 +53,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
||||||
protected boolean durableSubscriber;
|
protected boolean durableSubscriber;
|
||||||
protected Destination dlqDestination;
|
protected Destination dlqDestination;
|
||||||
protected MessageConsumer dlqConsumer;
|
protected MessageConsumer dlqConsumer;
|
||||||
|
protected QueueBrowser dlqBrowser;
|
||||||
protected BrokerService broker;
|
protected BrokerService broker;
|
||||||
protected boolean transactedMode;
|
protected boolean transactedMode;
|
||||||
protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
|
protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
|
||||||
|
@ -109,6 +112,13 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
||||||
dlqConsumer = session.createConsumer(dlqDestination);
|
dlqConsumer = session.createConsumer(dlqDestination);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void makeDlqBrowser() throws JMSException {
|
||||||
|
dlqDestination = createDlqDestination();
|
||||||
|
|
||||||
|
LOG.info("Browsing dead letter on: " + dlqDestination);
|
||||||
|
dlqBrowser = session.createBrowser((Queue)dlqDestination);
|
||||||
|
}
|
||||||
|
|
||||||
protected void sendMessages() throws JMSException {
|
protected void sendMessages() throws JMSException {
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
producer = session.createProducer(getDestination());
|
producer = session.createProducer(getDestination());
|
||||||
|
|
|
@ -16,19 +16,29 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.policy;
|
package org.apache.activemq.broker.policy;
|
||||||
|
|
||||||
import javax.jms.Destination;
|
import java.util.Enumeration;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
*/
|
*/
|
||||||
public class IndividualDeadLetterTest extends DeadLetterTest {
|
public class IndividualDeadLetterTest extends DeadLetterTest {
|
||||||
|
private static final Log LOG = LogFactory.getLog(IndividualDeadLetterTest.class);
|
||||||
|
|
||||||
protected BrokerService createBroker() throws Exception {
|
protected BrokerService createBroker() throws Exception {
|
||||||
BrokerService broker = super.createBroker();
|
BrokerService broker = super.createBroker();
|
||||||
|
@ -50,4 +60,48 @@ public class IndividualDeadLetterTest extends DeadLetterTest {
|
||||||
String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
|
String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
|
||||||
return new ActiveMQQueue(prefix + getClass().getName() + "." + getName());
|
return new ActiveMQQueue(prefix + getClass().getName() + "." + getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDLQBrowsing() throws Exception {
|
||||||
|
super.topic = false;
|
||||||
|
deliveryMode = DeliveryMode.PERSISTENT;
|
||||||
|
durableSubscriber = false;
|
||||||
|
messageCount = 1;
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
|
||||||
|
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
|
||||||
|
LOG.info("Will redeliver messages: " + rollbackCount + " times");
|
||||||
|
|
||||||
|
sendMessages();
|
||||||
|
|
||||||
|
// now lets receive and rollback N times
|
||||||
|
for (int i = 0; i < rollbackCount; i++) {
|
||||||
|
makeConsumer();
|
||||||
|
Message message = consumer.receive(5000);
|
||||||
|
assertNotNull("No message received: ", message);
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
LOG.info("Rolled back: " + rollbackCount + " times");
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
makeDlqBrowser();
|
||||||
|
browseDlq();
|
||||||
|
dlqBrowser.close();
|
||||||
|
session.close();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
session = connection.createSession(transactedMode, acknowledgeMode);
|
||||||
|
Queue testQueue = new ActiveMQQueue("ActiveMQ.DLQ.Queue.ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName());
|
||||||
|
MessageConsumer testConsumer = session.createConsumer(testQueue);
|
||||||
|
assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void browseDlq() throws Exception {
|
||||||
|
Enumeration messages = dlqBrowser.getEnumeration();
|
||||||
|
while (messages.hasMoreElements()) {
|
||||||
|
LOG.info("Browsing: " + messages.nextElement());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,8 +29,8 @@ public class StompLoadTest extends TestCase {
|
||||||
final int msgCount = 10000;
|
final int msgCount = 10000;
|
||||||
final int producerCount = 5;
|
final int producerCount = 5;
|
||||||
final int consumerCount = 5;
|
final int consumerCount = 5;
|
||||||
final int testTime = 10 * 60 * 1000;
|
final int testTime = 30 * 60 * 1000;
|
||||||
final String bindAddress = "stomp://0.0.0.0:61613";
|
final String bindAddress = "stomp://0.0.0.0:61612";
|
||||||
|
|
||||||
public void testLoad() throws Exception {
|
public void testLoad() throws Exception {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue