AMQ-3404: corrected PurgeCommand to correctly deal with message selectors

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1215056 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Torsten Mielke 2011-12-16 08:18:57 +00:00
parent 68bcf0fb15
commit e282571099
4 changed files with 282 additions and 14 deletions

View File

@ -65,10 +65,12 @@ public class BrowseCommand extends AbstractJmxCommand {
" Main browse -Vheader --view custom:MyField queue:FOO.BAR",
" - Print the message header and the custom field 'MyField' of all messages in the queue FOO.BAR",
"",
" Main browse --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.BAR",
" Main browse --msgsel \"JMSMessageID='*:10',JMSPriority>5\" FOO.BAR",
" - Print all the message fields that has a JMSMessageID in the header field that matches the",
" wildcard *:10, and has a JMSPriority field > 5 in the queue FOO.BAR",
" wildcard *:10, and has a JMSPriority field > 5 in the queue FOO.BAR.",
" SLQ92 syntax is also supported.",
" * To use wildcard queries, the field must be a string and the query enclosed in ''",
" Use double quotes \"\" around the entire message selector string.",
""
};

View File

@ -16,11 +16,14 @@
*/
package org.apache.activemq.console.command;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import javax.jms.Destination;
import javax.jms.Message;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectInstance;
@ -29,6 +32,8 @@ import javax.management.openmbean.CompositeData;
import javax.management.remote.JMXConnector;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.console.util.AmqMessagesUtil;
import org.apache.activemq.console.util.JmxMBeansUtil;
public class PurgeCommand extends AbstractJmxCommand {
@ -52,12 +57,14 @@ public class PurgeCommand extends AbstractJmxCommand {
" Main purge FOO.BAR",
" - Delete all the messages in queue FOO.BAR",
" Main purge --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.*",
" Main purge --msgsel \"JMSMessageID='*:10',JMSPriority>5\" FOO.*",
" - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in",
" the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the",
" queue FOO.BAR",
" queue FOO.BAR.",
" SLQ92 syntax is also supported.",
" * To use wildcard queries, the field must be a string and the query enclosed in ''",
"",
" Use double quotes \"\" around the entire message selector string.",
""
};
private final List<String> queryAddObjects = new ArrayList<String>(10);
@ -86,13 +93,36 @@ public class PurgeCommand extends AbstractJmxCommand {
if (queryAddObjects.isEmpty()) {
purgeQueue(queueName);
} else {
QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(createJmxConnection(), queueName, QueueViewMBean.class, true);
QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler.
newProxyInstance(createJmxConnection(),
queueName,
QueueViewMBean.class,
true);
int removed = 0;
for (String remove : queryAddObjects) {
removed = proxy.removeMatchingMessages(remove);
context.printInfo("Removed: " + removed
+ " messages for msgsel" + remove);
// AMQ-3404: We support two syntaxes for the message
// selector query:
// 1) AMQ specific:
// "JMSPriority>2,MyHeader='Foo'"
//
// 2) SQL-92 syntax:
// "(JMSPriority>2) AND (MyHeader='Foo')"
//
// If syntax style 1) is used, the comma separated
// criterias are broken into List<String> elements.
// We then need to construct the SQL-92 query out of
// this list.
String sqlQuery = null;
if (queryAddObjects.size() > 1) {
sqlQuery = convertToSQL92(queryAddObjects);
} else {
sqlQuery = queryAddObjects.get(0);
}
removed = proxy.removeMatchingMessages(sqlQuery);
context.printInfo("Removed: " + removed
+ " messages for message selector " + sqlQuery.toString());
}
}
}
@ -102,6 +132,7 @@ public class PurgeCommand extends AbstractJmxCommand {
}
}
/**
* Purge all the messages in the queue
*
@ -156,6 +187,34 @@ public class PurgeCommand extends AbstractJmxCommand {
}
}
/**
* Converts the message selector as provided on command line
* argument to activem-admin into an SQL-92 conform string.
* E.g.
* "JMSMessageID='*:10',JMSPriority>5"
* gets converted into
* "(JMSMessageID='%:10') AND (JMSPriority>5)"
*
* @param tokens - List of message selector query parameters
* @return SQL-92 string of that query.
*/
public String convertToSQL92(List<String> tokens) {
String selector = "";
// Convert to message selector
for (Iterator i = tokens.iterator(); i.hasNext(); ) {
selector = selector + "(" + i.next().toString() + ") AND ";
}
// Remove last AND and replace '*' with '%'
if (!selector.equals("")) {
selector = selector.substring(0, selector.length() - 5);
selector = selector.replace('*', '%');
}
return selector;
}
/**
* Print the help messages for the browse command
*/

View File

@ -77,7 +77,7 @@ public class MessagesQueryFilter extends AbstractQueryFilter {
* @throws Exception
*/
protected List queryMessages(String selector) throws Exception {
CompositeData[] messages = (CompositeData[]) jmxConnection.invoke(destName, "browse", new Object[] {}, new String[] {});
CompositeData[] messages = (CompositeData[]) jmxConnection.invoke(destName, "browse", new Object[] {selector}, new String[] {});
return Arrays.asList(messages);
}

View File

@ -57,12 +57,31 @@ public class TestPurgeCommand extends TestCase {
protected static final int MESSAGE_COUNT = 10;
protected static final String PROPERTY_NAME = "XTestProperty";
protected static final String PROPERTY_VALUE = "1";
protected static final String PROPERTY_VALUE = "1:1";
// check for existence of property
protected static final String MSG_SEL_WITH_PROPERTY = PROPERTY_NAME
+ " is not null";
// check for non-existence of property
protected static final String MSG_SEL_WITHOUT_PROPERTY = PROPERTY_NAME
+ " is null";
// complex message selector query using XTestProperty and JMSPriority
protected static final String MSG_SEL_COMPLEX = PROPERTY_NAME + "='" +
"1:1" + "',JMSPriority>3";
// complex message selector query using XTestProperty AND JMSPriority
// but in SQL-92 syntax
protected static final String MSG_SEL_COMPLEX_SQL_AND = "(" + PROPERTY_NAME + "='" +
"1:1" + "') AND (JMSPriority>3)";
// complex message selector query using XTestProperty OR JMSPriority
// but in SQL-92 syntax
protected static final String MSG_SEL_COMPLEX_SQL_OR = "(" + PROPERTY_NAME + "='" +
"1:1" + "') OR (JMSPriority>3)";
protected static final String QUEUE_NAME = "org.apache.activemq.network.jms.QueueBridgeTest";
protected AbstractApplicationContext context;
@ -234,7 +253,7 @@ public class TestPurgeCommand extends TestCase {
}
}
public void testPurgeCommand() throws Exception {
public void testPurgeCommandSimpleSelector() throws Exception {
try {
PurgeCommand purgeCommand = new PurgeCommand();
CommandContext context = new CommandContext();
@ -258,4 +277,192 @@ public class TestPurgeCommand extends TestCase {
purgeAllMessages();
}
}
public void testPurgeCommandComplexSelector() throws Exception {
try {
PurgeCommand purgeCommand = new PurgeCommand();
CommandContext context = new CommandContext();
context.setFormatter(new CommandShellOutputFormatter(System.out));
purgeCommand.setCommandContext(context);
purgeCommand.setJmxUseLocal(true);
List<String> tokens = new ArrayList<String>();
tokens.add("--msgsel");
tokens.add(MSG_SEL_COMPLEX);
addMessages();
validateCounts(MESSAGE_COUNT, MESSAGE_COUNT, MESSAGE_COUNT * 2);
purgeCommand.execute(tokens);
QueueBrowser withPropertyBrowser = requestServerSession.createBrowser(
theQueue, MSG_SEL_COMPLEX);
QueueBrowser allBrowser = requestServerSession.createBrowser(theQueue);
int withCount = getMessageCount(withPropertyBrowser, "withProperty ");
int allCount = getMessageCount(allBrowser, "allMessages ");
withPropertyBrowser.close();
allBrowser.close();
assertEquals("Expected withCount to be " + "0" + " was "
+ withCount, 0, withCount);
assertEquals("Expected allCount to be " + MESSAGE_COUNT + " was "
+ allCount, MESSAGE_COUNT, allCount);
LOG.info("withCount = " + withCount + "\n allCount = " +
allCount + "\n = " + "\n");
} finally {
purgeAllMessages();
}
}
public void testPurgeCommandComplexSQLSelector_AND() throws Exception {
try {
String one = "ID:mac.fritz.box:1213242.3231.1:1:1:100";
String two = "\\*:100";
try {
if (one.matches(two))
LOG.info("String matches.");
else
LOG.info("string does not match.");
} catch (Exception ex) {
LOG.error(ex.getMessage());
}
PurgeCommand purgeCommand = new PurgeCommand();
CommandContext context = new CommandContext();
context.setFormatter(new CommandShellOutputFormatter(System.out));
purgeCommand.setCommandContext(context);
purgeCommand.setJmxUseLocal(true);
List<String> tokens = new ArrayList<String>();
tokens.add("--msgsel");
tokens.add(MSG_SEL_COMPLEX_SQL_AND);
addMessages();
validateCounts(MESSAGE_COUNT, MESSAGE_COUNT, MESSAGE_COUNT * 2);
purgeCommand.execute(tokens);
QueueBrowser withPropertyBrowser = requestServerSession.createBrowser(
theQueue, MSG_SEL_COMPLEX_SQL_AND);
QueueBrowser allBrowser = requestServerSession.createBrowser(theQueue);
int withCount = getMessageCount(withPropertyBrowser, "withProperty ");
int allCount = getMessageCount(allBrowser, "allMessages ");
withPropertyBrowser.close();
allBrowser.close();
assertEquals("Expected withCount to be " + "0" + " was "
+ withCount, 0, withCount);
assertEquals("Expected allCount to be " + MESSAGE_COUNT + " was "
+ allCount, MESSAGE_COUNT, allCount);
LOG.info("withCount = " + withCount + "\n allCount = " +
allCount + "\n = " + "\n");
} finally {
purgeAllMessages();
}
}
public void testPurgeCommandComplexSQLSelector_OR() throws Exception {
try {
PurgeCommand purgeCommand = new PurgeCommand();
CommandContext context = new CommandContext();
context.setFormatter(new CommandShellOutputFormatter(System.out));
purgeCommand.setCommandContext(context);
purgeCommand.setJmxUseLocal(true);
List<String> tokens = new ArrayList<String>();
tokens.add("--msgsel");
tokens.add(MSG_SEL_COMPLEX_SQL_OR);
addMessages();
validateCounts(MESSAGE_COUNT, MESSAGE_COUNT, MESSAGE_COUNT * 2);
purgeCommand.execute(tokens);
QueueBrowser withPropertyBrowser = requestServerSession.createBrowser(
theQueue, MSG_SEL_COMPLEX_SQL_OR);
QueueBrowser allBrowser = requestServerSession.createBrowser(theQueue);
int withCount = getMessageCount(withPropertyBrowser, "withProperty ");
int allCount = getMessageCount(allBrowser, "allMessages ");
withPropertyBrowser.close();
allBrowser.close();
assertEquals("Expected withCount to be 0 but was "
+ withCount, 0, withCount);
assertEquals("Expected allCount to be 0 but was "
+ allCount, 0, allCount);
LOG.info("withCount = " + withCount + "\n allCount = " +
allCount + "\n = " + "\n");
} finally {
purgeAllMessages();
}
}
public void testDummy() throws Exception {
try {
String one = "ID:mac.fritz.box:1213242.3231.1:1:1:100";
String two = "ID*:100";
try {
if (one.matches(two))
LOG.info("String matches.");
else
LOG.info("string does not match.");
} catch (Exception ex) {
LOG.error(ex.getMessage());
}
PurgeCommand purgeCommand = new PurgeCommand();
CommandContext context = new CommandContext();
context.setFormatter(new CommandShellOutputFormatter(System.out));
purgeCommand.setCommandContext(context);
purgeCommand.setJmxUseLocal(true);
List<String> tokens = new ArrayList<String>();
tokens.add("--msgsel");
tokens.add("(XTestProperty LIKE '1:*') AND (JMSPriority>3)");
addMessages();
purgeCommand.execute(tokens);
/*
QueueBrowser withPropertyBrowser = requestServerSession.createBrowser(
theQueue, MSG_SEL_COMPLEX_SQL_AND);
QueueBrowser allBrowser = requestServerSession.createBrowser(theQueue);
int withCount = getMessageCount(withPropertyBrowser, "withProperty ");
int allCount = getMessageCount(allBrowser, "allMessages ");
withPropertyBrowser.close();
allBrowser.close();
assertEquals("Expected withCount to be " + "0" + " was "
+ withCount, 0, withCount);
assertEquals("Expected allCount to be " + MESSAGE_COUNT + " was "
+ allCount, MESSAGE_COUNT, allCount);
LOG.info("withCount = " + withCount + "\n allCount = " +
allCount + "\n = " + "\n");
*/
} finally {
purgeAllMessages();
}
}
}