git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1354270 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-06-26 22:20:51 +00:00
parent 41c93667dc
commit 3813947879
2 changed files with 118 additions and 3 deletions

View File

@ -24,6 +24,7 @@ import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -1208,7 +1209,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
*/ */
public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
int movedCounter = 0; int movedCounter = 0;
Set<MessageReference> set = new HashSet<MessageReference>(); Set<MessageReference> set = new LinkedHashSet<MessageReference>();
ConnectionContext context = createConnectionContext(); ConnectionContext context = createConnectionContext();
do { do {
doPageIn(true); doPageIn(true);
@ -1273,7 +1274,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
int maximumMessages) throws Exception { int maximumMessages) throws Exception {
int movedCounter = 0; int movedCounter = 0;
int count = 0; int count = 0;
Set<MessageReference> set = new HashSet<MessageReference>(); Set<MessageReference> set = new LinkedHashSet<MessageReference>();
do { do {
int oldMaxSize = getMaxPageSize(); int oldMaxSize = getMaxPageSize();
setMaxPageSize((int) this.destinationStatistics.getMessages().getCount()); setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
@ -1364,7 +1365,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception { ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0; int movedCounter = 0;
Set<QueueMessageReference> set = new HashSet<QueueMessageReference>(); Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
do { do {
doPageIn(true); doPageIn(true);
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();

View File

@ -1078,6 +1078,120 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertTrue("Should find the connection's ManagedTransportConnection", found); assertTrue("Should find the connection's ManagedTransportConnection", found);
} }
public void testMoveMessagesToRetainOrder() throws Exception {
connection = connectionFactory.createConnection();
useConnection(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String newDestination = getSecondDestinationString();
queue.moveMatchingMessagesTo("", newDestination);
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
int movedSize = MESSAGE_COUNT;
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(newDestination);
MessageConsumer consumer = session.createConsumer(destination);
int last = -1;
int current = -1;
Message message = null;
while ((message = consumer.receive(2000)) != null) {
if (message.propertyExists("counter")) {
current = message.getIntProperty("counter");
assertEquals(last, current - 1);
last = current;
}
}
// now lets remove them by selector
queue.removeMatchingMessages("");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testCopyMessagesToRetainOrder() throws Exception {
connection = connectionFactory.createConnection();
useConnection(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String newDestination = getSecondDestinationString();
queue.copyMatchingMessagesTo("", newDestination);
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
int movedSize = MESSAGE_COUNT;
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(newDestination);
MessageConsumer consumer = session.createConsumer(destination);
int last = -1;
int current = -1;
Message message = null;
while ((message = consumer.receive(2000)) != null) {
if (message.propertyExists("counter")) {
current = message.getIntProperty("counter");
assertEquals(last, current - 1);
last = current;
}
}
// now lets remove them by selector
queue.removeMatchingMessages("");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testRemoveMatchingMessageRetainOrder() throws Exception {
connection = connectionFactory.createConnection();
useConnection(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String queueName = getDestinationString();
queue.removeMatchingMessages("counter < 10");
int newSize = MESSAGE_COUNT - 10;
assertEquals("Unexpected number of messages ", newSize, queue.getQueueSize());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
int last = 9;
int current = 0;
Message message = null;
while ((message = consumer.receive(2000)) != null) {
if (message.propertyExists("counter")) {
current = message.getIntProperty("counter");
assertEquals(last, current - 1);
last = current;
}
}
// now lets remove them by selector
queue.removeMatchingMessages("");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testBrowseBytesMessages() throws Exception { public void testBrowseBytesMessages() throws Exception {
connection = connectionFactory.createConnection(); connection = connectionFactory.createConnection();
useConnectionWithByteMessage(connection); useConnectionWithByteMessage(connection);