diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java index 76d82a3a04..076b4fc950 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java @@ -226,4 +226,22 @@ public class QueueView extends DestinationView implements QueueViewMBean { Queue queue = (Queue) destination; queue.getMessageGroupOwners().removeAll(); } + + @Override + public void pause() { + Queue queue = (Queue) destination; + queue.pauseDispatch(); + } + + @Override + public void resume() { + Queue queue = (Queue) destination; + queue.resumeDispatch(); + } + + @Override + public boolean isPaused() { + Queue queue = (Queue) destination; + return queue.isDispatchPaused(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java index 3f99162a90..27ef61c9f0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java @@ -209,4 +209,15 @@ public interface QueueViewMBean extends DestinationViewMBean { */ @MBeanInfo("emove all the message groups - will rebalance all message groups across consumers") void removeAllMessageGroups(); + + @MBeanInfo("pause dispatch to consumers") + void pause(); + + @MBeanInfo("resume dispatch to consumers if paused") + void resume(); + + @MBeanInfo("Dispatch to consumers is paused") + boolean isPaused(); + + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index c0a237f5fa..67b9119cb5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -147,6 +147,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index private final Object iteratingMutex = new Object(); + + class TimeoutMessage implements Delayed { Message message; @@ -1649,6 +1651,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } + public void pauseDispatch() { + dispatchSelector.pause(); + } + + public void resumeDispatch() { + dispatchSelector.resume(); + } + + public boolean isDispatchPaused() { + return dispatchSelector.isPaused(); + } + protected MessageReferenceFilter createMessageIdFilter(final String messageId) { return new MessageReferenceFilter() { @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java index c73d9601e4..56f6076a33 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java @@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory; public class QueueDispatchSelector extends SimpleDispatchSelector { private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class); private Subscription exclusiveConsumer; - - + private boolean paused; + /** * @param destination */ @@ -54,11 +54,22 @@ public class QueueDispatchSelector extends SimpleDispatchSelector { public boolean canSelect(Subscription subscription, MessageReference m) throws Exception { - boolean result = super.canDispatch(subscription, m); + boolean result = !paused && super.canDispatch(subscription, m); if (result && !subscription.isBrowser()) { result = exclusiveConsumer == null || exclusiveConsumer == subscription; } return result; } - + + public void pause() { + paused = true; + } + + public void resume() { + paused = false; + } + + public boolean isPaused() { + return paused; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index e2b0c51696..95b7ff0f9c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -818,13 +818,13 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { return answer; } - protected void useConnection(Connection connection) throws Exception { + protected void useConnection(Connection connection, int numToSend) throws Exception { connection.setClientID(clientID); connection.start(); Session session = connection.createSession(transacted, authMode); destination = createDestination(); MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < MESSAGE_COUNT; i++) { + for (int i = 0; i < numToSend; i++) { Message message = session.createTextMessage("Message: " + i); message.setIntProperty("counter", i); message.setJMSCorrelationID("MyCorrelationID"); @@ -836,6 +836,10 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { Thread.sleep(1000); } + protected void useConnection(Connection connection) throws Exception { + useConnection(connection, MESSAGE_COUNT); + } + protected void useConnectionWithBlobMessage(Connection connection) throws Exception { connection.setClientID(clientID); connection.start(); @@ -1505,4 +1509,63 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertEquals(mbeans.size(), 1); sub.close(); } + + public void testQueuePauseResume() throws Exception { + connection = connectionFactory.createConnection(); + final int numToSend = 20; + useConnection(connection, numToSend); + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + CompositeData[] compdatalist = queue.browse(); + int initialQueueSize = compdatalist.length; + assertEquals("expected", numToSend, initialQueueSize); + + + echo("Attempting to consume 5 bytes messages from: " + destination); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + for (int i=0; i<5; i++) { + assertNotNull("Message: " + i, consumer.receive(5000)); + } + consumer.close(); + session.close(); + + compdatalist = queue.browse(); + assertEquals("expected", numToSend -5, compdatalist.length); + + echo("pause"); + queue.pause(); + + assertTrue("queue is paused", queue.isPaused()); + + // verify no consume while paused + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(destination); + assertNull("cannot get message while paused", consumer.receive(2000)); + consumer.close(); + session.close(); + connection.close(); + + // verify send while paused + connection = connectionFactory.createConnection(); + useConnection(connection, numToSend); + + // verify browse + compdatalist = queue.browse(); + assertEquals("expected browse", (2*numToSend)-5, compdatalist.length); + assertEquals("expected message count", compdatalist.length, queue.getQueueSize()); + + echo("resume"); + queue.resume(); + + assertFalse("queue is not paused", queue.isPaused()); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(destination); + for (int i = 0; i < compdatalist.length; i++) { + assertNotNull("Message: " + i, consumer.receive(5000)); + } + } }