diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java index d37b1a323c..f2ba3f6c2f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java @@ -20,6 +20,7 @@ package org.apache.activemq.broker.jmx; import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -39,16 +40,56 @@ public class QueueView extends DestinationView implements QueueViewMBean{ return OpenTypeSupport.convert(rc); } - public boolean removeMessage(String messageId){ - return ((Queue) destination).removeMessage(messageId); - } - public void purge(){ ((Queue) destination).purge(); } - public boolean copyMessageTo(String messageId, String destinationName) throws Exception { - return ((Queue) destination).copyMessageTo(BrokerView.getConnectionContext(broker.getContextBroker()), messageId, ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE)); + public boolean removeMessage(String messageId) throws Exception{ + return ((Queue) destination).removeMessage(messageId); + } + + public int removeMatchingMessages(String selector) throws Exception { + return ((Queue) destination).removeMatchingMessages(selector); } + public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { + return ((Queue) destination).removeMatchingMessages(selector, maximumMessages); + } + + public boolean copyMessageTo(String messageId, String destinationName) throws Exception { + ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); + ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); + return ((Queue) destination).copyMessageTo(context, messageId, toDestination); + } + + public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception { + ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); + ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); + return ((Queue) destination).copyMatchingMessagesTo(context, selector, toDestination); + } + + public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { + ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); + ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); + return ((Queue) destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages); + } + + public boolean moveMessageTo(String messageId, String destinationName) throws Exception { + ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); + ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); + return ((Queue) destination).moveMessageTo(context, messageId, toDestination); + } + + public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception { + ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); + ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); + return ((Queue) destination).moveMatchingMessagesTo(context, selector, toDestination); + } + + public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { + ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); + ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); + return ((Queue) destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java index a67a22df38..5b63b29f46 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java @@ -24,37 +24,92 @@ import javax.management.openmbean.OpenDataException; public interface QueueViewMBean extends DestinationViewMBean { /** - * Retrieve a message from the destination's queue. - * - * @param messageId the message id of the message to retreive - * @return A CompositeData object which is a JMX version of the messages - * @throws OpenDataException - */ + * Retrieve a message from the destination's queue. + * + * @param messageId + * the message id of the message to retrieve + * @return A CompositeData object which is a JMX version of the messages + * @throws OpenDataException + */ public CompositeData getMessage(String messageId) throws OpenDataException; /** - * Removes a message from the queue. If the message has allready been dispatched - * to another consumer, the message cannot be delted and this method will return - * false. + * Removes a message from the queue. If the message has already been + * dispatched to another consumer, the message cannot be deleted and this + * method will return false. * - * @param messageId - * @return true if the message was found and could be succesfully deleted. + * @param messageId + * @return true if the message was found and could be successfully deleted. + * @throws Exception */ - public boolean removeMessage(String messageId); - + public boolean removeMessage(String messageId) throws Exception; + /** - * Emptys out all the messages in the queue. + * Removes the messages matching the given selector + * + * @return the number of messages removed + */ + public int removeMatchingMessages(String selector) throws Exception; + + /** + * Removes the messages matching the given selector up to the maximum number of matched messages + * + * @return the number of messages removed + */ + public int removeMatchingMessages(String selector, int maximumMessages) throws Exception; + + + /** + * Removes all of the messages in the queue. */ public void purge(); /** - * Copys a given message to another destination. + * Copies a given message to another destination. * * @param messageId * @param destinationName - * @return true if the message was found and was successfuly copied to the other destination. + * @return true if the message was found and was successfully copied to the + * other destination. * @throws Exception */ public boolean copyMessageTo(String messageId, String destinationName) throws Exception; + /** + * Copies the messages matching the given selector + * + * @return the number of messages copied + */ + public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception; + + /** + * Copies the messages matching the given selector up to the maximum number of matched messages + * + * @return the number of messages copied + */ + public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception; + + /** + * Moves the message to another destination. + * + * @param messageId + * @param destinationName + * @return true if the message was found and was successfully copied to the + * other destination. + * @throws Exception + */ + public boolean moveMessageTo(String messageId, String destinationName) throws Exception; + + /** + * Moves the messages matching the given selector + * + * @return the number of messages removed + */ + public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception; + + /** + * Moves the messages matching the given selector up to the maximum number of matched messages + */ + public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception; + } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java new file mode 100644 index 0000000000..33e9044551 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import org.apache.activemq.broker.ConnectionContext; + +import javax.jms.JMSException; + +/** + * Represents a filter on message references + * + * @version $Revision$ + */ +public interface MessageReferenceFilter { + + public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException; +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index d19f7dfd3b..00efc92962 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -33,8 +33,10 @@ import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.TaskRunnerFactory; @@ -44,6 +46,9 @@ import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -77,20 +82,21 @@ public class Queue implements Destination { protected int highestSubscriptionPriority; private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); - - public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, - DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { + + public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, + TaskRunnerFactory taskFactory) throws Exception { this.destination = destination; this.usageManager = new UsageManager(memoryManager); this.usageManager.setLimit(Long.MAX_VALUE); this.store = store; - // Let the store know what usage manager we are using so that he can flush messages to disk + // Let the store know what usage manager we are using so that he can + // flush messages to disk // when usage gets high. - if( store!=null ) { + if (store != null) { store.setUsageManager(usageManager); } - + destinationStatistics.setParent(parentStats); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); @@ -110,8 +116,8 @@ public class Queue implements Destination { public void recoverMessageReference(String messageReference) throws Exception { throw new RuntimeException("Should not be called."); } - - public void finished(){ + + public void finished() { } }); } @@ -164,13 +170,15 @@ public class Queue implements Destination { if (sub.matches(node, msgContext)) { sub.add(node); } - } catch (IOException e) { + } + catch (IOException e) { log.warn("Could not load message: " + e, e); } } } - } finally { + } + finally { msgContext.clear(); dispatchValve.turnOn(); } @@ -225,8 +233,9 @@ public class Queue implements Destination { } } } - - // now lets dispatch from the copy of the collection to avoid deadlocks + + // now lets dispatch from the copy of the collection to + // avoid deadlocks for (Iterator iter = messagesToDispatch.iterator(); iter.hasNext();) { IndirectMessageReference node = (IndirectMessageReference) iter.next(); node.incrementRedeliveryCounter(); @@ -239,7 +248,8 @@ public class Queue implements Destination { msgContext.clear(); } } - } finally { + } + finally { dispatchValve.turnOn(); } @@ -250,9 +260,10 @@ public class Queue implements Destination { if (context.isProducerFlowControl()) { if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) { throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); - } else { - usageManager.waitForSpace(); - } + } + else { + usageManager.waitForSpace(); + } } message.setRegionDestination(this); @@ -269,10 +280,12 @@ public class Queue implements Destination { dispatch(context, node, message); } }); - } else { + } + else { dispatch(context, node, message); } - } finally { + } + finally { node.decrementReferenceCount(); } } @@ -315,9 +328,10 @@ public class Queue implements Destination { public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { if (store != null && node.isPersistent()) { - // the original ack may be a ranged ack, but we are trying to delete a specific + // the original ack may be a ranged ack, but we are trying to delete + // a specific // message store here so we need to convert to a non ranged ack. - if( ack.getMessageCount() > 0 ) { + if (ack.getMessageCount() > 0) { // Dup the ack MessageAck a = new MessageAck(); ack.copy(a); @@ -344,9 +358,8 @@ public class Queue implements Destination { synchronized (messages) { size = messages.size(); } - return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() - + ", memory=" + usageManager.getPercentUsage() + "%, size=" + size + ", in flight groups=" - + messageGroupOwners; + return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + usageManager.getPercentUsage() + + "%, size=" + size + ", in flight groups=" + messageGroupOwners; } public void start() throws Exception { @@ -364,7 +377,7 @@ public class Queue implements Destination { public String getDestination() { return destination.getPhysicalName(); } - + public UsageManager getUsageManager() { return usageManager; } @@ -443,8 +456,7 @@ public class Queue implements Destination { public void setMemoryLimit(long limit) { getUsageManager().setLimit(limit); } - - + // Implementation methods // ------------------------------------------------------------------------- private MessageReference createMessageReference(Message message) { @@ -472,7 +484,8 @@ public class Queue implements Destination { msgContext.setMessageReference(node); dispatchPolicy.dispatch(context, node, msgContext, consumers); - } finally { + } + finally { msgContext.clear(); dispatchValve.decrement(); } @@ -508,10 +521,12 @@ public class Queue implements Destination { if (m != null) { l.add(m); } - } finally { + } + finally { r.decrementReferenceCount(); } - } catch (IOException e) { + } + catch (IOException e) { } } } @@ -519,33 +534,6 @@ public class Queue implements Destination { return (Message[]) l.toArray(new Message[l.size()]); } - public boolean removeMessage(String messageId) { - synchronized (messages) { - ConnectionContext c = new ConnectionContext(); - for (Iterator iter = messages.iterator(); iter.hasNext();) { - try { - IndirectMessageReference r = (IndirectMessageReference) iter.next(); - if (messageId.equals(r.getMessageId().toString())) { - - // We should only delete messages that can be locked. - if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) ) { - MessageAck ack = new MessageAck(); - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); - ack.setDestination(destination); - ack.setMessageID(r.getMessageId()); - acknowledge(c, null, ack, r); - r.drop(); - dropEvent(); - return true; - } - } - } catch (IOException e) { - } - } - } - return false; - } - public Message getMessage(String messageId) { synchronized (messages) { for (Iterator iter = messages.iterator(); iter.hasNext();) { @@ -558,12 +546,14 @@ public class Queue implements Destination { if (m != null) { return m; } - } finally { + } + finally { r.decrementReferenceCount(); } break; } - } catch (IOException e) { + } + catch (IOException e) { } } } @@ -572,13 +562,13 @@ public class Queue implements Destination { public void purge() { synchronized (messages) { - ConnectionContext c = new ConnectionContext(); + ConnectionContext c = createConnectionContext(); for (Iterator iter = messages.iterator(); iter.hasNext();) { try { IndirectMessageReference r = (IndirectMessageReference) iter.next(); - + // We should only delete messages that can be locked. - if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) ) { + if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { MessageAck ack = new MessageAck(); ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setDestination(destination); @@ -587,7 +577,8 @@ public class Queue implements Destination { r.drop(); dropEvent(true); } - } catch (IOException e) { + } + catch (IOException e) { } } @@ -596,27 +587,207 @@ public class Queue implements Destination { gc(); } } + - public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception { + /** + * Removes the message matching the given messageId + */ + public boolean removeMessage(String messageId) throws Exception { + return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0; + } + + /** + * Removes the messages matching the given selector + * + * @return the number of messages removed + */ + public int removeMatchingMessages(String selector) throws Exception { + return removeMatchingMessages(selector, -1); + } + + /** + * Removes the messages matching the given selector up to the maximum number of matched messages + * + * @return the number of messages removed + */ + public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { + return removeMatchingMessages(createSelectorFilter(selector), maximumMessages); + } + + /** + * Removes the messages matching the given filter up to the maximum number of matched messages + * + * @return the number of messages removed + */ + public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { + int counter = 0; synchronized (messages) { + ConnectionContext c = createConnectionContext(); for (Iterator iter = messages.iterator(); iter.hasNext();) { - try { - MessageReference r = (MessageReference) iter.next(); - if (messageId.equals(r.getMessageId().toString())) { - r.incrementReferenceCount(); - try { - Message m = r.getMessage(); - BrokerSupport.resend(context, m, dest); - } finally { - r.decrementReferenceCount(); - } - return true; + IndirectMessageReference r = (IndirectMessageReference) iter.next(); + if (filter.evaluate(c, r)) { + // We should only delete messages that can be locked. + if (lockMessage(r)) { + removeMessage(c, r); + if (++counter >= maximumMessages && maximumMessages > 0) { + break; + } } - } catch (IOException e) { } } } - return false; + return counter; + } + + /** + * Copies the message matching the given messageId + */ + public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception { + return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0; + } + + /** + * Copies the messages matching the given selector + * + * @return the number of messages copied + */ + public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception { + return copyMatchingMessagesTo(context, selector, dest, -1); + } + + /** + * Copies the messages matching the given selector up to the maximum number of matched messages + * + * @return the number of messages copied + */ + public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception { + return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages); + } + + /** + * Copies the messages matching the given filter up to the maximum number of matched messages + * + * @return the number of messages copied + */ + public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { + int counter = 0; + synchronized (messages) { + for (Iterator iter = messages.iterator(); iter.hasNext();) { + MessageReference r = (MessageReference) iter.next(); + if (filter.evaluate(context, r)) { + r.incrementReferenceCount(); + try { + Message m = r.getMessage(); + BrokerSupport.resend(context, m, dest); + if (++counter >= maximumMessages && maximumMessages > 0) { + break; + } + } + finally { + r.decrementReferenceCount(); + } + } + } + } + return counter; + } + + /** + * Moves the message matching the given messageId + */ + public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception { + return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0; + } + + /** + * Moves the messages matching the given selector + * + * @return the number of messages removed + */ + public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception { + return moveMatchingMessagesTo(context, selector, dest, -1); + } + + /** + * Moves the messages matching the given selector up to the maximum number of matched messages + */ + public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception { + return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages); + } + + /** + * Moves the messages matching the given filter up to the maximum number of matched messages + */ + public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { + int counter = 0; + synchronized (messages) { + for (Iterator iter = messages.iterator(); iter.hasNext();) { + IndirectMessageReference r = (IndirectMessageReference) iter.next(); + if (filter.evaluate(context, r)) { + // We should only move messages that can be locked. + if (lockMessage(r)) { + r.incrementReferenceCount(); + try { + Message m = r.getMessage(); + BrokerSupport.resend(context, m, dest); + removeMessage(context, r); + if (++counter >= maximumMessages && maximumMessages > 0) { + break; + } + } + finally { + r.decrementReferenceCount(); + } + } + } + } + } + return counter; + } + + protected MessageReferenceFilter createMessageIdFilter(final String messageId) { + return new MessageReferenceFilter() { + public boolean evaluate(ConnectionContext context, MessageReference r) { + return messageId.equals(r.getMessageId().toString()); + } + }; + } + + protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException { + final BooleanExpression selectorExpression = new SelectorParser().parse(selector); + + return new MessageReferenceFilter() { + public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException { + MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext(); + + messageEvaluationContext.setMessageReference(r); + if (messageEvaluationContext.getDestination() == null) { + messageEvaluationContext.setDestination(getActiveMQDestination()); + } + + return selectorExpression.matches(messageEvaluationContext); + } + }; + } + + protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws IOException { + MessageAck ack = new MessageAck(); + ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + ack.setDestination(destination); + ack.setMessageID(r.getMessageId()); + acknowledge(c, null, ack, r); + r.drop(); + dropEvent(); + } + + protected boolean lockMessage(IndirectMessageReference r) { + return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER); + } + + protected ConnectionContext createConnectionContext() { + ConnectionContext answer = new ConnectionContext(); + answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); + return answer; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 28420977b4..4991f4f34f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -33,7 +33,6 @@ import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import java.io.IOException; -import java.util.Iterator; public class QueueSubscription extends PrefetchSubscription implements LockOwner { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index b392d5dd90..9adc5976e2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -74,6 +74,58 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertQueueBrowseWorks(); assertCreateAndDestroyDurableSubscriptions(); } + + public void testMoveMessagesBySelector() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); + + QueueViewMBean queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + String newDestination = "test.new.destination." + getClass() + "." + getName(); + queue.moveMatchingMessagesTo("counter > 2", newDestination ); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); + + queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0); + + // now lets remove them by selector + queue.removeMatchingMessages("counter > 2"); + + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); + } + + public void testCopyMessagesBySelector() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); + + QueueViewMBean queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + String newDestination = "test.new.destination." + getClass() + "." + getName(); + long queueSize = queue.getQueueSize(); + queue.copyMatchingMessagesTo("counter > 2", newDestination); + + assertEquals("Should have same number of messages in the queue: " + queueViewMBeanName, queueSize, queueSize); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); + + queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + log.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); + + assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0); + + // now lets remove them by selector + queue.removeMatchingMessages("counter > 2"); + + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); + } + protected void assertQueueBrowseWorks() throws Exception { Integer mbeancnt = mbeanServer.getMBeanCount(); @@ -205,6 +257,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { MessageProducer producer = session.createProducer(destination); for (int i = 0; i < messageCount; i++) { Message message = session.createTextMessage("Message: " + i); + message.setIntProperty("counter", i); producer.send(message); } Thread.sleep(1000);