https://issues.apache.org/jira/browse/AMQ-5229 - implement ability to pause/resume dispatch of message to all consumers of a queue

This commit is contained in:
gtully 2015-02-23 21:41:10 +00:00
parent 1406d40ac3
commit 85b9c81a3f
5 changed files with 123 additions and 6 deletions

View File

@ -226,4 +226,22 @@ public class QueueView extends DestinationView implements QueueViewMBean {
Queue queue = (Queue) destination; Queue queue = (Queue) destination;
queue.getMessageGroupOwners().removeAll(); queue.getMessageGroupOwners().removeAll();
} }
@Override
public void pause() {
Queue queue = (Queue) destination;
queue.pauseDispatch();
}
@Override
public void resume() {
Queue queue = (Queue) destination;
queue.resumeDispatch();
}
@Override
public boolean isPaused() {
Queue queue = (Queue) destination;
return queue.isDispatchPaused();
}
} }

View File

@ -209,4 +209,15 @@ public interface QueueViewMBean extends DestinationViewMBean {
*/ */
@MBeanInfo("emove all the message groups - will rebalance all message groups across consumers") @MBeanInfo("emove all the message groups - will rebalance all message groups across consumers")
void removeAllMessageGroups(); void removeAllMessageGroups();
@MBeanInfo("pause dispatch to consumers")
void pause();
@MBeanInfo("resume dispatch to consumers if paused")
void resume();
@MBeanInfo("Dispatch to consumers is paused")
boolean isPaused();
} }

View File

@ -147,6 +147,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
private final Object iteratingMutex = new Object(); private final Object iteratingMutex = new Object();
class TimeoutMessage implements Delayed { class TimeoutMessage implements Delayed {
Message message; Message message;
@ -1649,6 +1651,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
} }
public void pauseDispatch() {
dispatchSelector.pause();
}
public void resumeDispatch() {
dispatchSelector.resume();
}
public boolean isDispatchPaused() {
return dispatchSelector.isPaused();
}
protected MessageReferenceFilter createMessageIdFilter(final String messageId) { protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
return new MessageReferenceFilter() { return new MessageReferenceFilter() {
@Override @Override

View File

@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
public class QueueDispatchSelector extends SimpleDispatchSelector { public class QueueDispatchSelector extends SimpleDispatchSelector {
private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class); private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class);
private Subscription exclusiveConsumer; private Subscription exclusiveConsumer;
private boolean paused;
/** /**
* @param destination * @param destination
*/ */
@ -54,11 +54,22 @@ public class QueueDispatchSelector extends SimpleDispatchSelector {
public boolean canSelect(Subscription subscription, public boolean canSelect(Subscription subscription,
MessageReference m) throws Exception { MessageReference m) throws Exception {
boolean result = super.canDispatch(subscription, m); boolean result = !paused && super.canDispatch(subscription, m);
if (result && !subscription.isBrowser()) { if (result && !subscription.isBrowser()) {
result = exclusiveConsumer == null || exclusiveConsumer == subscription; result = exclusiveConsumer == null || exclusiveConsumer == subscription;
} }
return result; return result;
} }
public void pause() {
paused = true;
}
public void resume() {
paused = false;
}
public boolean isPaused() {
return paused;
}
} }

View File

@ -818,13 +818,13 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
return answer; return answer;
} }
protected void useConnection(Connection connection) throws Exception { protected void useConnection(Connection connection, int numToSend) throws Exception {
connection.setClientID(clientID); connection.setClientID(clientID);
connection.start(); connection.start();
Session session = connection.createSession(transacted, authMode); Session session = connection.createSession(transacted, authMode);
destination = createDestination(); destination = createDestination();
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < MESSAGE_COUNT; i++) { for (int i = 0; i < numToSend; i++) {
Message message = session.createTextMessage("Message: " + i); Message message = session.createTextMessage("Message: " + i);
message.setIntProperty("counter", i); message.setIntProperty("counter", i);
message.setJMSCorrelationID("MyCorrelationID"); message.setJMSCorrelationID("MyCorrelationID");
@ -836,6 +836,10 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
Thread.sleep(1000); Thread.sleep(1000);
} }
protected void useConnection(Connection connection) throws Exception {
useConnection(connection, MESSAGE_COUNT);
}
protected void useConnectionWithBlobMessage(Connection connection) throws Exception { protected void useConnectionWithBlobMessage(Connection connection) throws Exception {
connection.setClientID(clientID); connection.setClientID(clientID);
connection.start(); connection.start();
@ -1505,4 +1509,63 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals(mbeans.size(), 1); assertEquals(mbeans.size(), 1);
sub.close(); sub.close();
} }
public void testQueuePauseResume() throws Exception {
connection = connectionFactory.createConnection();
final int numToSend = 20;
useConnection(connection, numToSend);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
CompositeData[] compdatalist = queue.browse();
int initialQueueSize = compdatalist.length;
assertEquals("expected", numToSend, initialQueueSize);
echo("Attempting to consume 5 bytes messages from: " + destination);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
for (int i=0; i<5; i++) {
assertNotNull("Message: " + i, consumer.receive(5000));
}
consumer.close();
session.close();
compdatalist = queue.browse();
assertEquals("expected", numToSend -5, compdatalist.length);
echo("pause");
queue.pause();
assertTrue("queue is paused", queue.isPaused());
// verify no consume while paused
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(destination);
assertNull("cannot get message while paused", consumer.receive(2000));
consumer.close();
session.close();
connection.close();
// verify send while paused
connection = connectionFactory.createConnection();
useConnection(connection, numToSend);
// verify browse
compdatalist = queue.browse();
assertEquals("expected browse", (2*numToSend)-5, compdatalist.length);
assertEquals("expected message count", compdatalist.length, queue.getQueueSize());
echo("resume");
queue.resume();
assertFalse("queue is not paused", queue.isPaused());
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(destination);
for (int i = 0; i < compdatalist.length; i++) {
assertNotNull("Message: " + i, consumer.receive(5000));
}
}
} }