resolve https://issues.apache.org/activemq/browse/AMQ-2468 - limit pagedInPendingDispatch to maxPageSize and bypass dispatch for jmx queue modifications like purge and remove matching messages so they are not limited by pending messages and can page through all messages. Resolve intermittent deadlock in AMQ2102Test. Note: sparse selectors may need to increase maxPageSize as ever increasing pagedInPendingDispatch was exceeding that limit in error

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@831258 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-10-30 10:51:41 +00:00
parent d2a9f7dba2
commit 89eecadd9d
2 changed files with 81 additions and 23 deletions

View File

@ -863,8 +863,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public void purge() throws Exception {
ConnectionContext c = createConnectionContext();
List<MessageReference> list = null;
do {
pageInMessages();
do {
doPageIn(true);
synchronized (pagedInMessages) {
list = new ArrayList<MessageReference>(pagedInMessages.values());
}
@ -876,6 +876,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} catch (IOException e) {
}
}
} while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0);
gc();
this.destinationStatistics.getMessages().setCount(0);
@ -919,7 +920,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
ConnectionContext context = createConnectionContext();
do {
pageInMessages();
doPageIn(true);
synchronized (pagedInMessages) {
set.addAll(pagedInMessages.values());
}
@ -979,7 +980,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
do {
int oldMaxSize=getMaxPageSize();
setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
pageInMessages();
doPageIn(true);
setMaxPageSize(oldMaxSize);
synchronized (pagedInMessages) {
set.addAll(pagedInMessages.values());
@ -1170,7 +1171,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
}
// Perhaps we should page always into the pagedInPendingDispatch list is
// Perhaps we should page always into the pagedInPendingDispatch list if
// !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
if (pageInMoreMessages) {
@ -1215,6 +1216,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
removeMessage(c, null, r);
synchronized(dispatchMutex) {
synchronized (pagedInPendingDispatch) {
pagedInPendingDispatch.remove(r);
}
}
}
protected void removeMessage(ConnectionContext c, Subscription subs,QueueMessageReference r) throws IOException {
@ -1349,12 +1355,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
+ ", pagedInMessages.size " + pagedInMessages.size());
}
if (isLazyDispatch()&& !force) {
if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size() < getMaxPageSize()))) {
int count = 0;
result = new ArrayList<QueueMessageReference>(toPageIn);
synchronized (messages) {
@ -1405,8 +1410,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
// and now see if we can dispatch the new stuff.. and append to
// the pending
// and now see if we can dispatch the new stuff.. and append to the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
if (pagedInPendingDispatch.isEmpty()) {
@ -1423,7 +1427,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
if (doWakeUp) {
wakeup();
// avoid lock order contention
asyncWakeup();
}
}
@ -1495,9 +1500,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return rc;
}
private void pageInMessages() throws Exception {
pageInMessages(true);
}
protected void pageInMessages(boolean force) throws Exception {
doDispatch(doPageIn(force));

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.region;
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@ -25,15 +27,25 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class QueuePurgeTest extends TestCase {
private static final Log LOG = LogFactory.getLog(QueuePurgeTest.class);
private final String MESSAGE_TEXT = new String(new byte[1024]);
BrokerService broker;
ConnectionFactory factory;
Connection connection;
@ -43,17 +55,23 @@ public class QueuePurgeTest extends TestCase {
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setDataDirectory("target/activemq-data");
broker.setUseJmx(true);
broker.setPersistent(false);
broker.setDeleteAllMessagesOnStartup(true);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/QueuePurgeTest"));
broker.setPersistenceAdapter(persistenceAdapter);
broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory("vm://localhost");
factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
connection = factory.createConnection();
connection.start();
}
protected void tearDown() throws Exception {
consumer.close();
if (consumer != null) {
consumer.close();
}
session.close();
connection.stop();
connection.close();
@ -61,13 +79,48 @@ public class QueuePurgeTest extends TestCase {
}
public void testPurgeQueueWithActiveConsumer() throws Exception {
createProducerAndSendMessages();
createProducerAndSendMessages(10000);
QueueViewMBean proxy = getProxyToQueueViewMBean();
createConsumer();
proxy.purge();
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
proxy.getQueueSize());
}
public void testPurgeLargeQueue() throws Exception {
applyBrokerSpoolingPolicy();
createProducerAndSendMessages(90000);
QueueViewMBean proxy = getProxyToQueueViewMBean();
LOG.info("purging..");
proxy.purge();
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
proxy.getQueueSize());
}
private void applyBrokerSpoolingPolicy() {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setProducerFlowControl(false);
PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy();
defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
}
public void testPurgeLargeQueueWithConsumer() throws Exception {
applyBrokerSpoolingPolicy();
createProducerAndSendMessages(90000);
QueueViewMBean proxy = getProxyToQueueViewMBean();
createConsumer();
long start = System.currentTimeMillis();
LOG.info("purging..");
proxy.purge();
LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms");
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
proxy.getQueueSize());
}
private QueueViewMBean getProxyToQueueViewMBean()
throws MalformedObjectNameException, JMSException {
@ -80,12 +133,15 @@ public class QueuePurgeTest extends TestCase {
return proxy;
}
private void createProducerAndSendMessages() throws Exception {
private void createProducerAndSendMessages(int numToSend) throws Exception {
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue("test1");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10000; i++) {
TextMessage message = session.createTextMessage("message " + i);
for (int i = 0; i < numToSend; i++) {
TextMessage message = session.createTextMessage(MESSAGE_TEXT + i);
if (i != 0 && i % 50000 == 0) {
LOG.info("sent: " + i);
}
producer.send(message);
}
producer.close();
@ -95,7 +151,7 @@ public class QueuePurgeTest extends TestCase {
consumer = session.createConsumer(queue);
// wait for buffer fill out
Thread.sleep(5 * 1000);
for (int i = 0; i < 100; ++i) {
for (int i = 0; i < 500; ++i) {
Message message = consumer.receive();
message.acknowledge();
}