From 18f97733267bc32ec18b5038ca618363daf2f4c2 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 23 Apr 2008 06:57:51 +0000 Subject: [PATCH] Fixes for https://issues.apache.org/activemq/browse/AMQ-1678 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@650763 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/BaseDestination.java | 4 +- .../apache/activemq/broker/region/Queue.java | 43 +++++++++++-------- .../apache/activemq/JmsQueueBrowserTest.java | 1 - .../apache/activemq/broker/jmx/MBeanTest.java | 27 ++++++------ 4 files changed, 39 insertions(+), 36 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 27ad5c7251..31e967c01d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -30,7 +30,7 @@ import org.apache.activemq.usage.SystemUsage; * @version $Revision: 1.12 $ */ public abstract class BaseDestination implements Destination { - + public static final int DEFAULT_PAGE_SIZE=100; protected final ActiveMQDestination destination; protected final Broker broker; protected final MessageStore store; @@ -40,7 +40,7 @@ public abstract class BaseDestination implements Destination { private int maxProducersToAudit=1024; private int maxAuditDepth=2048; private boolean enableAudit=true; - private int maxPageSize=100; + private int maxPageSize=DEFAULT_PAGE_SIZE; private boolean useCache=true; private int minimumMessageSize=1024; private boolean lazyDispatch=false; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index eaa8f232cd..d95f44f02b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -23,14 +23,14 @@ import java.util.Comparator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.ReentrantLock; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; - import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -69,6 +69,7 @@ import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + /** * The Queue is a List of MessageEntry objects that are dispatched to matching * subscriptions. @@ -749,27 +750,27 @@ public class Queue extends BaseDestination implements Task { */ public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { int movedCounter = 0; - int count = 0; + Set set = new CopyOnWriteArraySet(); ConnectionContext context = createConnectionContext(); - List list = null; do { pageInMessages(); synchronized (pagedInMessages) { - list = new ArrayList(pagedInMessages.values()); + set.addAll(pagedInMessages.values()); } + List list = new ArrayList(set); for (MessageReference ref : list) { IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { removeMessage(context, r); + set.remove(r); if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; } } - count++; } - } while (count < this.destinationStatistics.getMessages().getCount()); + } while (set.size() < this.destinationStatistics.getMessages().getCount()); return movedCounter; } @@ -808,16 +809,21 @@ public class Queue extends BaseDestination implements Task { public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int movedCounter = 0; int count = 0; - List list = null; + Set set = new CopyOnWriteArraySet(); do { + int oldMaxSize=getMaxPageSize(); + setMaxPageSize((int) this.destinationStatistics.getMessages().getCount()); pageInMessages(); + setMaxPageSize(oldMaxSize); synchronized (pagedInMessages) { - list = new ArrayList(pagedInMessages.values()); + set.addAll(pagedInMessages.values()); } + List list = new ArrayList(set); for (MessageReference ref : list) { IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { - r.incrementReferenceCount(); + + r.incrementReferenceCount(); try { Message m = r.getMessage(); BrokerSupport.resend(context, m, dest); @@ -865,14 +871,14 @@ public class Queue extends BaseDestination implements Task { */ public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter, ActiveMQDestination dest,int maximumMessages) throws Exception { int movedCounter = 0; - int count = 0; - List list = null; + Set set = new CopyOnWriteArraySet(); do { pageInMessages(); synchronized (pagedInMessages) { - list = new ArrayList(pagedInMessages.values()); + set.addAll(pagedInMessages.values()); } - for (MessageReference ref : list) { + List list = new ArrayList(set); + for (MessageReference ref:list) { IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { // We should only move messages that can be locked. @@ -881,6 +887,7 @@ public class Queue extends BaseDestination implements Task { Message m = r.getMessage(); BrokerSupport.resend(context, m, dest); removeMessage(context, r); + set.remove(r); if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; @@ -889,9 +896,9 @@ public class Queue extends BaseDestination implements Task { r.decrementReferenceCount(); } } - count++; + } - } while (count < this.destinationStatistics.getMessages().getCount()); + } while (set.size() < this.destinationStatistics.getMessages().getCount()); return movedCounter; } @@ -1065,12 +1072,12 @@ public class Queue extends BaseDestination implements Task { } } } - + + private List doPageIn(boolean force) throws Exception { List result = null; dispatchLock.lock(); try{ - int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size(); if (isLazyDispatch()&& !force) { // Only page in the minimum number of messages which can be dispatched immediately. diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java index 77dba50af8..cc78343329 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java @@ -43,7 +43,6 @@ public class JmsQueueBrowserTest extends JmsTestSupport { * @throws Exception */ public void testReceiveBrowseReceive() throws Exception { - Session session = connection.createSession(false, 0); ActiveMQQueue destination = new ActiveMQQueue("TEST"); MessageProducer producer = session.createProducer(destination); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 717b443e09..02cda75cbe 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.broker.jmx; import java.io.BufferedReader; import java.io.InputStreamReader; - import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageProducer; @@ -29,11 +28,10 @@ import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; - import junit.textui.TestRunner; import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.advisory.TempDestDeleteTest; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.BaseDestination; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,7 +54,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { protected Connection connection; protected boolean transacted; protected int authMode = Session.AUTO_ACKNOWLEDGE; - protected int messageCount = 10; + protected static final int MESSAGE_COUNT = 2*BaseDestination.DEFAULT_PAGE_SIZE; /** * When you run this test case from the command line it will pause before @@ -93,8 +91,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0); + int movedSize = MESSAGE_COUNT-3; + assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); // now lets remove them by selector queue.removeMatchingMessages("counter > 2"); @@ -114,16 +112,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { long queueSize = queue.getQueueSize(); queue.copyMatchingMessagesTo("counter > 2", newDestination); - assertEquals("Should have same number of messages in the queue: " + queueViewMBeanName, queueSize, queueSize); + queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); - - assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0); - + assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); // now lets remove them by selector queue.removeMatchingMessages("counter > 2"); @@ -165,20 +161,20 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { echo("Found tabular data: " + table); assertTrue("Table should not be empty!", table.size() > 0); - assertEquals("Queue size", 10, proxy.getQueueSize()); + assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize()); String messageID = messageIDs[0]; String newDestinationName = "queue://dummy.test.cheese"; echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName); proxy.copyMessageTo(messageID, newDestinationName); - assertEquals("Queue size", 10, proxy.getQueueSize()); + assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize()); messageID = messageIDs[1]; echo("Attempting to remove: " + messageID); proxy.removeMessage(messageID); - assertEquals("Queue size", 9, proxy.getQueueSize()); + assertEquals("Queue size", MESSAGE_COUNT-1, proxy.getQueueSize()); echo("Worked!"); } @@ -296,8 +292,9 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { protected BrokerService createBroker() throws Exception { BrokerService answer = new BrokerService(); + answer.setDeleteAllMessagesOnStartup(true); answer.setUseJmx(true); - answer.setEnableStatistics(true); + //answer.setEnableStatistics(true); answer.setPersistent(false); answer.addConnector(bindAddress); return answer; @@ -309,7 +306,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { Session session = connection.createSession(transacted, authMode); destination = createDestination(); MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < messageCount; i++) { + for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = session.createTextMessage("Message: " + i); message.setIntProperty("counter", i); producer.send(message);