mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3404 - purge command with selector
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1149095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9b32b71754
commit
7ea64b42f2
|
@ -92,6 +92,34 @@
|
|||
<artifactId>xbean-spring</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- needed for TestPurgeCommand, but not for compile. -->
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-context</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
@ -141,8 +169,26 @@
|
|||
<forkMode>pertest</forkMode>
|
||||
<childDelegation>false</childDelegation>
|
||||
<useFile>true</useFile>
|
||||
<systemProperties>
|
||||
<property>
|
||||
<name>org.apache.activemq.default.directory.prefix</name>
|
||||
<value>target/</value>
|
||||
</property>
|
||||
<!-- Uncomment the following if you want to configure custom logging (using src/test/resources/log4j.properties)
|
||||
while running mvn:test
|
||||
Note: if you want to see log messages on the console window remove
|
||||
"redirectTestOutputToFile" from the parent pom
|
||||
-->
|
||||
<!--
|
||||
<property>
|
||||
<name>log4j.configuration</name>
|
||||
<value>file:target/test-classes/log4j.properties</value>
|
||||
</property>
|
||||
-->
|
||||
</systemProperties>
|
||||
|
||||
<includes>
|
||||
<include>**/*Test.*</include>
|
||||
<include>**/*Test*</include>
|
||||
</includes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -22,11 +22,13 @@ import java.util.List;
|
|||
import java.util.StringTokenizer;
|
||||
|
||||
import javax.management.MBeanServerConnection;
|
||||
import javax.management.MBeanServerInvocationHandler;
|
||||
import javax.management.ObjectInstance;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.remote.JMXConnector;
|
||||
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.console.util.JmxMBeansUtil;
|
||||
|
||||
public class PurgeCommand extends AbstractJmxCommand {
|
||||
|
@ -84,8 +86,13 @@ public class PurgeCommand extends AbstractJmxCommand {
|
|||
if (queryAddObjects.isEmpty()) {
|
||||
purgeQueue(queueName);
|
||||
} else {
|
||||
List messages = JmxMBeansUtil.createMessageQueryFilter(createJmxConnection(), queueName).query(queryAddObjects);
|
||||
purgeMessages(queueName, messages);
|
||||
QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(createJmxConnection(), queueName, QueueViewMBean.class, true);
|
||||
int removed = 0;
|
||||
for (String remove : queryAddObjects) {
|
||||
removed = proxy.removeMatchingMessages(remove);
|
||||
context.printInfo("Removed: " + removed
|
||||
+ " messages for msgsel" + remove);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -106,25 +113,6 @@ public class PurgeCommand extends AbstractJmxCommand {
|
|||
createJmxConnection().invoke(queue, "purge", new Object[] {}, new String[] {});
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge selected messages in the queue
|
||||
*
|
||||
* @param queue - ObjectName of the queue to purge the messages from
|
||||
* @param messages - List of messages to purge
|
||||
* @throws Exception
|
||||
*/
|
||||
public void purgeMessages(ObjectName queue, List messages) throws Exception {
|
||||
Object[] param = new Object[1];
|
||||
for (Iterator i = messages.iterator(); i.hasNext();) {
|
||||
CompositeData msg = (CompositeData)i.next();
|
||||
param[0] = "" + msg.get("JMSMessageID");
|
||||
context.printInfo("Removing message: " + param[0] + " from queue: " + queue.getKeyProperty("Destination"));
|
||||
createJmxConnection().invoke(queue, "removeMessage", param, new String[] {
|
||||
"java.lang.String"
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the --msgsel, --xmsgsel.
|
||||
*
|
||||
|
|
|
@ -26,18 +26,35 @@
|
|||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||
|
||||
<!-- Default configuration -->
|
||||
<broker useJmx="false" xmlns="http://activemq.apache.org/schema/core">
|
||||
<broker id="default" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
|
||||
|
||||
<persistenceFactory>
|
||||
<journalPersistenceAdapterFactory journalLogFiles="2" dataDirectory="target/foo"/>
|
||||
</persistenceFactory>
|
||||
|
||||
<transportConnectors>
|
||||
<transportConnector uri="tcp://localhost:61616"/>
|
||||
</transportConnector>
|
||||
<transportConnector uri="tcp://localhost:61616" />
|
||||
</transportConnectors>
|
||||
|
||||
</broker>
|
||||
|
||||
<bean id="localbroker" class="org.apache.activemq.broker.BrokerService"
|
||||
init-method="start">
|
||||
<property name="brokerName" value = "localbroker"/>
|
||||
<property name="persistent" value = "false"/>
|
||||
<property name="transportConnectorURIs">
|
||||
<list>
|
||||
<value>tcp://localhost:61234</value>
|
||||
</list>
|
||||
</property>
|
||||
</bean>
|
||||
|
||||
<!-- JMS ConnectionFactory to use local broker -->
|
||||
<bean id="localFactory"
|
||||
class="org.apache.activemq.ActiveMQConnectionFactory">
|
||||
<property name="brokerURL" value="tcp://localhost:61234" />
|
||||
</bean>
|
||||
|
||||
<!-- Example of broker configuration that uses new logging options and dynamic management of logging
|
||||
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" persistent="false" deleteAllMessagesOnStartup="true">
|
||||
|
||||
|
|
Loading…
Reference in New Issue