git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@650763 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-23 06:57:51 +00:00
parent 6aa07faceb
commit 18f9773326
4 changed files with 39 additions and 36 deletions

View File

@ -30,7 +30,7 @@ import org.apache.activemq.usage.SystemUsage;
* @version $Revision: 1.12 $
*/
public abstract class BaseDestination implements Destination {
public static final int DEFAULT_PAGE_SIZE=100;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
@ -40,7 +40,7 @@ public abstract class BaseDestination implements Destination {
private int maxProducersToAudit=1024;
private int maxAuditDepth=2048;
private boolean enableAudit=true;
private int maxPageSize=100;
private int maxPageSize=DEFAULT_PAGE_SIZE;
private boolean useCache=true;
private int minimumMessageSize=1024;
private boolean lazyDispatch=false;

View File

@ -23,14 +23,14 @@ import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -69,6 +69,7 @@ import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
* subscriptions.
@ -749,27 +750,27 @@ public class Queue extends BaseDestination implements Task {
*/
public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
ConnectionContext context = createConnectionContext();
List<MessageReference> list = null;
do {
pageInMessages();
synchronized (pagedInMessages) {
list = new ArrayList<MessageReference>(pagedInMessages.values());
set.addAll(pagedInMessages.values());
}
List <MessageReference>list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
removeMessage(context, r);
set.remove(r);
if (++movedCounter >= maximumMessages
&& maximumMessages > 0) {
return movedCounter;
}
}
count++;
}
} while (count < this.destinationStatistics.getMessages().getCount());
} while (set.size() < this.destinationStatistics.getMessages().getCount());
return movedCounter;
}
@ -808,16 +809,21 @@ public class Queue extends BaseDestination implements Task {
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
List<MessageReference> list = null;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
int oldMaxSize=getMaxPageSize();
setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
pageInMessages();
setMaxPageSize(oldMaxSize);
synchronized (pagedInMessages) {
list = new ArrayList<MessageReference>(pagedInMessages.values());
set.addAll(pagedInMessages.values());
}
List <MessageReference>list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
r.incrementReferenceCount();
r.incrementReferenceCount();
try {
Message m = r.getMessage();
BrokerSupport.resend(context, m, dest);
@ -865,14 +871,14 @@ public class Queue extends BaseDestination implements Task {
*/
public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter, ActiveMQDestination dest,int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
List<MessageReference> list = null;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
pageInMessages();
synchronized (pagedInMessages) {
list = new ArrayList<MessageReference>(pagedInMessages.values());
set.addAll(pagedInMessages.values());
}
for (MessageReference ref : list) {
List <MessageReference>list = new ArrayList<MessageReference>(set);
for (MessageReference ref:list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
@ -881,6 +887,7 @@ public class Queue extends BaseDestination implements Task {
Message m = r.getMessage();
BrokerSupport.resend(context, m, dest);
removeMessage(context, r);
set.remove(r);
if (++movedCounter >= maximumMessages
&& maximumMessages > 0) {
return movedCounter;
@ -889,9 +896,9 @@ public class Queue extends BaseDestination implements Task {
r.decrementReferenceCount();
}
}
count++;
}
} while (count < this.destinationStatistics.getMessages().getCount());
} while (set.size() < this.destinationStatistics.getMessages().getCount());
return movedCounter;
}
@ -1065,12 +1072,12 @@ public class Queue extends BaseDestination implements Task {
}
}
}
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
List<QueueMessageReference> result = null;
dispatchLock.lock();
try{
int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be dispatched immediately.

View File

@ -43,7 +43,6 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
* @throws Exception
*/
public void testReceiveBrowseReceive() throws Exception {
Session session = connection.createSession(false, 0);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination);

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker.jmx;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
@ -29,11 +28,10 @@ import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import junit.textui.TestRunner;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.advisory.TempDestDeleteTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -56,7 +54,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
protected Connection connection;
protected boolean transacted;
protected int authMode = Session.AUTO_ACKNOWLEDGE;
protected int messageCount = 10;
protected static final int MESSAGE_COUNT = 2*BaseDestination.DEFAULT_PAGE_SIZE;
/**
* When you run this test case from the command line it will pause before
@ -93,8 +91,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0);
int movedSize = MESSAGE_COUNT-3;
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
// now lets remove them by selector
queue.removeMatchingMessages("counter > 2");
@ -114,16 +112,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
long queueSize = queue.getQueueSize();
queue.copyMatchingMessagesTo("counter > 2", newDestination);
assertEquals("Should have same number of messages in the queue: " + queueViewMBeanName, queueSize, queueSize);
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0);
assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
// now lets remove them by selector
queue.removeMatchingMessages("counter > 2");
@ -165,20 +161,20 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
echo("Found tabular data: " + table);
assertTrue("Table should not be empty!", table.size() > 0);
assertEquals("Queue size", 10, proxy.getQueueSize());
assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize());
String messageID = messageIDs[0];
String newDestinationName = "queue://dummy.test.cheese";
echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName);
proxy.copyMessageTo(messageID, newDestinationName);
assertEquals("Queue size", 10, proxy.getQueueSize());
assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize());
messageID = messageIDs[1];
echo("Attempting to remove: " + messageID);
proxy.removeMessage(messageID);
assertEquals("Queue size", 9, proxy.getQueueSize());
assertEquals("Queue size", MESSAGE_COUNT-1, proxy.getQueueSize());
echo("Worked!");
}
@ -296,8 +292,9 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setDeleteAllMessagesOnStartup(true);
answer.setUseJmx(true);
answer.setEnableStatistics(true);
//answer.setEnableStatistics(true);
answer.setPersistent(false);
answer.addConnector(bindAddress);
return answer;
@ -309,7 +306,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
Session session = connection.createSession(transacted, authMode);
destination = createDestination();
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < messageCount; i++) {
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = session.createTextMessage("Message: " + i);
message.setIntProperty("counter", i);
producer.send(message);