mirror of https://github.com/apache/activemq.git
Added initial implementation for JMX queue browsing and purging. They're still unable to filter messages. I'm still not sure if we should use MessageSelector or filter them in the client. Currently, browsing still uses the old AMQ client implementation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@379400 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2bc09bb764
commit
35bfe5c1ae
|
@ -0,0 +1,240 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.console.command;
|
||||
|
||||
import org.apache.activemq.console.util.AmqMessagesUtil;
|
||||
import org.apache.activemq.console.formatter.GlobalWriter;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class AmqBrowseCommand extends AbstractAmqCommand {
|
||||
public static final String QUEUE_PREFIX = "queue:";
|
||||
public static final String TOPIC_PREFIX = "topic:";
|
||||
|
||||
public static final String VIEW_GROUP_HEADER = "header:";
|
||||
public static final String VIEW_GROUP_CUSTOM = "custom:";
|
||||
public static final String VIEW_GROUP_BODY = "body:";
|
||||
|
||||
private final List queryAddObjects = new ArrayList(10);
|
||||
private final List querySubObjects = new ArrayList(10);
|
||||
private final Set groupViews = new HashSet(10);
|
||||
private final Set queryViews = new HashSet(10);
|
||||
|
||||
/**
|
||||
* Execute the browse command, which allows you to browse the messages in a given JMS destination
|
||||
* @param tokens - command arguments
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void runTask(List tokens) throws Exception {
|
||||
try {
|
||||
// If no destination specified
|
||||
if (tokens.isEmpty()) {
|
||||
GlobalWriter.printException(new IllegalArgumentException("No JMS destination specified."));
|
||||
return;
|
||||
}
|
||||
|
||||
// If no broker url specified
|
||||
if (getBrokerUrl() == null) {
|
||||
GlobalWriter.printException(new IllegalStateException("No broker url specified. Use the --amqurl option to specify a broker url."));
|
||||
return;
|
||||
}
|
||||
|
||||
// Display the messages for each destination
|
||||
for (Iterator i=tokens.iterator(); i.hasNext();) {
|
||||
String destName = (String)i.next();
|
||||
Destination dest;
|
||||
|
||||
// If destination has been explicitly specified as a queue
|
||||
if (destName.startsWith(QUEUE_PREFIX)) {
|
||||
dest = new ActiveMQQueue(destName.substring(QUEUE_PREFIX.length()));
|
||||
|
||||
// If destination has been explicitly specified as a topic
|
||||
} else if (destName.startsWith(TOPIC_PREFIX)) {
|
||||
dest = new ActiveMQTopic(destName.substring(TOPIC_PREFIX.length()));
|
||||
|
||||
// By default destination is assumed to be a queue
|
||||
} else {
|
||||
dest = new ActiveMQQueue(destName);
|
||||
}
|
||||
|
||||
// Query for the messages to view
|
||||
List addMsgs = AmqMessagesUtil.getMessages(getBrokerUrl(), dest, queryAddObjects);
|
||||
|
||||
// Query for the messages to remove from view
|
||||
if (querySubObjects.size() > 0) {
|
||||
List subMsgs = AmqMessagesUtil.getMessages(getBrokerUrl(), dest, querySubObjects);
|
||||
addMsgs.removeAll(subMsgs);
|
||||
}
|
||||
|
||||
// Display the messages
|
||||
GlobalWriter.printMessage(AmqMessagesUtil.filterMessagesView(addMsgs, groupViews, queryViews));
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
GlobalWriter.printException(new RuntimeException("Failed to execute browse task. Reason: " + e));
|
||||
throw new Exception(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the --msgsel, --xmsgsel, --view, -V options.
|
||||
* @param token - option token to handle
|
||||
* @param tokens - succeeding command arguments
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void handleOption(String token, List tokens) throws Exception {
|
||||
|
||||
// If token is an additive message selector option
|
||||
if (token.startsWith("--msgsel")) {
|
||||
|
||||
// If no message selector is specified, or next token is a new option
|
||||
if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
|
||||
GlobalWriter.printException(new IllegalArgumentException("Message selector not specified"));
|
||||
return;
|
||||
}
|
||||
|
||||
StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
|
||||
while (queryTokens.hasMoreTokens()) {
|
||||
queryAddObjects.add(queryTokens.nextToken());
|
||||
}
|
||||
}
|
||||
|
||||
// If token is a substractive message selector option
|
||||
else if (token.startsWith("--xmsgsel")) {
|
||||
|
||||
// If no message selector is specified, or next token is a new option
|
||||
if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
|
||||
GlobalWriter.printException(new IllegalArgumentException("Message selector not specified"));
|
||||
return;
|
||||
}
|
||||
|
||||
StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
|
||||
while (queryTokens.hasMoreTokens()) {
|
||||
querySubObjects.add(queryTokens.nextToken());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// If token is a view option
|
||||
else if (token.startsWith("--view")) {
|
||||
|
||||
// If no view specified, or next token is a new option
|
||||
if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
|
||||
GlobalWriter.printException(new IllegalArgumentException("Attributes to view not specified"));
|
||||
return;
|
||||
}
|
||||
|
||||
// Add the attributes to view
|
||||
StringTokenizer viewTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
|
||||
while (viewTokens.hasMoreTokens()) {
|
||||
String viewToken = viewTokens.nextToken();
|
||||
|
||||
// If view is explicitly specified to belong to the JMS header
|
||||
if (viewToken.equals(VIEW_GROUP_HEADER)) {
|
||||
queryViews.add(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + viewToken.substring(VIEW_GROUP_HEADER.length()));
|
||||
|
||||
// If view is explicitly specified to belong to the JMS custom header
|
||||
} else if (viewToken.equals(VIEW_GROUP_CUSTOM)) {
|
||||
queryViews.add(AmqMessagesUtil.JMS_MESSAGE_CUSTOM_PREFIX + viewToken.substring(VIEW_GROUP_CUSTOM.length()));
|
||||
|
||||
// If view is explicitly specified to belong to the JMS body
|
||||
} else if (viewToken.equals(VIEW_GROUP_BODY)) {
|
||||
queryViews.add(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + viewToken.substring(VIEW_GROUP_BODY.length()));
|
||||
|
||||
// If no view explicitly specified, let's check the view for each group
|
||||
} else {
|
||||
queryViews.add(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + viewToken);
|
||||
queryViews.add(AmqMessagesUtil.JMS_MESSAGE_CUSTOM_PREFIX + viewToken);
|
||||
queryViews.add(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + viewToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If token is a predefined group view option
|
||||
else if (token.startsWith("-V")) {
|
||||
String viewGroup = token.substring(2);
|
||||
// If option is a header group view
|
||||
if (viewGroup.equals("header")) {
|
||||
groupViews.add(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX);
|
||||
|
||||
// If option is a custom header group view
|
||||
} else if (viewGroup.equals("custom")) {
|
||||
groupViews.add(AmqMessagesUtil.JMS_MESSAGE_CUSTOM_PREFIX);
|
||||
|
||||
// If option is a body group view
|
||||
} else if (viewGroup.equals("body")) {
|
||||
groupViews.add(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX);
|
||||
|
||||
// Unknown group view
|
||||
} else {
|
||||
GlobalWriter.printInfo("Unknown group view: " + viewGroup + ". Ignoring group view option.");
|
||||
}
|
||||
}
|
||||
|
||||
// Let super class handle unknown option
|
||||
else {
|
||||
super.handleOption(token, tokens);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Print the help messages for the browse command
|
||||
*/
|
||||
protected void printHelp() {
|
||||
GlobalWriter.printHelp(helpFile);
|
||||
}
|
||||
|
||||
protected String[] helpFile = new String[] {
|
||||
"Task Usage: Main browse --amqurl <broker url> [browse-options] <destinations>",
|
||||
"Description: Display selected destination's messages.",
|
||||
"",
|
||||
"Browse Options:",
|
||||
" --amqurl <url> Set the broker URL to connect to.",
|
||||
" --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to",
|
||||
" the messages selector format.",
|
||||
" -V<header|custom|body> Predefined view that allows you to view the message header, custom",
|
||||
" message header, or the message body.",
|
||||
" --view <attr1>,<attr2>,... Select the specific attribute of the message to view.",
|
||||
" --version Display the version information.",
|
||||
" -h,-?,--help Display the browse broker help information.",
|
||||
"",
|
||||
"Examples:",
|
||||
" Main browse --amqurl tcp://localhost:61616 FOO.BAR",
|
||||
" - Print the message header, custom message header, and message body of all messages in the",
|
||||
" queue FOO.BAR",
|
||||
"",
|
||||
" Main browse --amqurl tcp://localhost:61616 -Vheader,body queue:FOO.BAR",
|
||||
" - Print only the message header and message body of all messages in the queue FOO.BAR",
|
||||
"",
|
||||
" Main browse --amqurl tcp://localhost:61616 -Vheader --view custom:MyField queue:FOO.BAR",
|
||||
" - Print the message header and the custom field 'MyField' of all messages in the queue FOO.BAR",
|
||||
"",
|
||||
" Main browse --amqurl tcp://localhost:61616 --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.BAR",
|
||||
" - Print all the message fields that has a JMSMessageID in the header field that matches the",
|
||||
" wildcard *:10, and has a JMSPriority field > 5 in the queue FOO.BAR",
|
||||
" * To use wildcard queries, the field must be a string and the query enclosed in ''",
|
||||
"",
|
||||
};
|
||||
}
|
|
@ -16,20 +16,19 @@
|
|||
*/
|
||||
package org.apache.activemq.console.command;
|
||||
|
||||
import org.apache.activemq.console.util.AmqMessagesUtil;
|
||||
import org.apache.activemq.console.formatter.GlobalWriter;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.console.util.AmqMessagesUtil;
|
||||
import org.apache.activemq.console.util.JmxMBeansUtil;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.management.ObjectInstance;
|
||||
import java.util.List;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class BrowseCommand extends AbstractAmqCommand {
|
||||
public class BrowseCommand extends AbstractJmxCommand {
|
||||
public static final String QUEUE_PREFIX = "queue:";
|
||||
public static final String TOPIC_PREFIX = "topic:";
|
||||
|
||||
|
@ -49,49 +48,21 @@ public class BrowseCommand extends AbstractAmqCommand {
|
|||
*/
|
||||
protected void runTask(List tokens) throws Exception {
|
||||
try {
|
||||
// If no destination specified
|
||||
// If there is no queue name specified, let's select all
|
||||
if (tokens.isEmpty()) {
|
||||
GlobalWriter.printException(new IllegalArgumentException("No JMS destination specified."));
|
||||
return;
|
||||
tokens.add("*");
|
||||
}
|
||||
|
||||
// If no broker url specified
|
||||
if (getBrokerUrl() == null) {
|
||||
GlobalWriter.printException(new IllegalStateException("No broker url specified. Use the --amqurl option to specify a broker url."));
|
||||
return;
|
||||
}
|
||||
|
||||
// Display the messages for each destination
|
||||
// Iterate through the queue names
|
||||
for (Iterator i=tokens.iterator(); i.hasNext();) {
|
||||
String destName = (String)i.next();
|
||||
Destination dest;
|
||||
List queueList = JmxMBeansUtil.queryMBeans(useJmxServiceUrl(), "Type=Queue,Destination=" + i.next() + ",*");
|
||||
|
||||
// If destination has been explicitly specified as a queue
|
||||
if (destName.startsWith(QUEUE_PREFIX)) {
|
||||
dest = new ActiveMQQueue(destName.substring(QUEUE_PREFIX.length()));
|
||||
|
||||
// If destination has been explicitly specified as a topic
|
||||
} else if (destName.startsWith(TOPIC_PREFIX)) {
|
||||
dest = new ActiveMQTopic(destName.substring(TOPIC_PREFIX.length()));
|
||||
|
||||
// By default destination is assumed to be a queue
|
||||
} else {
|
||||
dest = new ActiveMQQueue(destName);
|
||||
// Iterate through the queue result
|
||||
for (Iterator j=queueList.iterator(); j.hasNext();) {
|
||||
List messages = JmxMBeansUtil.createMessageQueryFilter(useJmxServiceUrl(), ((ObjectInstance)j.next()).getObjectName()).query(queryAddObjects);
|
||||
GlobalWriter.printMessage(JmxMBeansUtil.filterMessagesView(messages, groupViews, queryViews));
|
||||
}
|
||||
|
||||
// Query for the messages to view
|
||||
List addMsgs = AmqMessagesUtil.getMessages(getBrokerUrl(), dest, queryAddObjects);
|
||||
|
||||
// Query for the messages to remove from view
|
||||
if (querySubObjects.size() > 0) {
|
||||
List subMsgs = AmqMessagesUtil.getMessages(getBrokerUrl(), dest, querySubObjects);
|
||||
addMsgs.removeAll(subMsgs);
|
||||
}
|
||||
|
||||
// Display the messages
|
||||
GlobalWriter.printMessage(AmqMessagesUtil.filterMessagesView(addMsgs, groupViews, queryViews));
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
GlobalWriter.printException(new RuntimeException("Failed to execute browse task. Reason: " + e));
|
||||
throw new Exception(e);
|
||||
|
@ -207,31 +178,31 @@ public class BrowseCommand extends AbstractAmqCommand {
|
|||
}
|
||||
|
||||
protected String[] helpFile = new String[] {
|
||||
"Task Usage: Main browse --amqurl <broker url> [browse-options] <destinations>",
|
||||
"Task Usage: Main browse [browse-options] <destinations>",
|
||||
"Description: Display selected destination's messages.",
|
||||
"",
|
||||
"Browse Options:",
|
||||
" --amqurl <url> Set the broker URL to connect to.",
|
||||
" --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to",
|
||||
" the messages selector format.",
|
||||
" -V<header|custom|body> Predefined view that allows you to view the message header, custom",
|
||||
" message header, or the message body.",
|
||||
" --view <attr1>,<attr2>,... Select the specific attribute of the message to view.",
|
||||
" --jmxurl <url> Set the JMX URL to connect to.",
|
||||
" --version Display the version information.",
|
||||
" -h,-?,--help Display the browse broker help information.",
|
||||
"",
|
||||
"Examples:",
|
||||
" Main browse --amqurl tcp://localhost:61616 FOO.BAR",
|
||||
" Main browse FOO.BAR",
|
||||
" - Print the message header, custom message header, and message body of all messages in the",
|
||||
" queue FOO.BAR",
|
||||
"",
|
||||
" Main browse --amqurl tcp://localhost:61616 -Vheader,body queue:FOO.BAR",
|
||||
" Main browse -Vheader,body queue:FOO.BAR",
|
||||
" - Print only the message header and message body of all messages in the queue FOO.BAR",
|
||||
"",
|
||||
" Main browse --amqurl tcp://localhost:61616 -Vheader --view custom:MyField queue:FOO.BAR",
|
||||
" Main browse -Vheader --view custom:MyField queue:FOO.BAR",
|
||||
" - Print the message header and the custom field 'MyField' of all messages in the queue FOO.BAR",
|
||||
"",
|
||||
" Main browse --amqurl tcp://localhost:61616 --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.BAR",
|
||||
" Main browse --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.BAR",
|
||||
" - Print all the message fields that has a JMSMessageID in the header field that matches the",
|
||||
" wildcard *:10, and has a JMSPriority field > 5 in the queue FOO.BAR",
|
||||
" * To use wildcard queries, the field must be a string and the query enclosed in ''",
|
||||
|
|
|
@ -0,0 +1,176 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.console.command;
|
||||
|
||||
import org.apache.activemq.console.formatter.GlobalWriter;
|
||||
import org.apache.activemq.console.util.JmxMBeansUtil;
|
||||
|
||||
import javax.management.ObjectInstance;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.MBeanServerConnection;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.remote.JMXConnector;
|
||||
import java.util.List;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class PurgeCommand extends AbstractJmxCommand {
|
||||
|
||||
private final List queryAddObjects = new ArrayList(10);
|
||||
private final List querySubObjects = new ArrayList(10);
|
||||
|
||||
/**
|
||||
* Execute the purge command, which allows you to purge the messages in a given JMS destination
|
||||
* @param tokens - command arguments
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void runTask(List tokens) throws Exception {
|
||||
try {
|
||||
// If there is no queue name specified, let's select all
|
||||
if (tokens.isEmpty()) {
|
||||
tokens.add("*");
|
||||
}
|
||||
|
||||
// Iterate through the queue names
|
||||
for (Iterator i=tokens.iterator(); i.hasNext();) {
|
||||
List queueList = JmxMBeansUtil.queryMBeans(useJmxServiceUrl(), "Type=Queue,Destination=" + i.next() + ",*");
|
||||
|
||||
for (Iterator j=queueList.iterator(); j.hasNext();) {
|
||||
ObjectName queueName = ((ObjectInstance)j.next()).getObjectName();
|
||||
if (queryAddObjects.isEmpty()) {
|
||||
purgeQueue(queueName);
|
||||
} else {
|
||||
List messages = JmxMBeansUtil.createMessageQueryFilter(useJmxServiceUrl(), queueName).query(queryAddObjects);
|
||||
purgeMessages(queueName, messages);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
GlobalWriter.printException(new RuntimeException("Failed to execute purge task. Reason: " + e));
|
||||
throw new Exception(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge all the messages in the queue
|
||||
* @param queue - ObjectName of the queue to purge
|
||||
* @throws Exception
|
||||
*/
|
||||
public void purgeQueue(ObjectName queue) throws Exception {
|
||||
JMXConnector conn = createJmxConnector();
|
||||
MBeanServerConnection server = conn.getMBeanServerConnection();
|
||||
GlobalWriter.printInfo("Purging all messages in queue: " + queue.getKeyProperty("Destination"));
|
||||
server.invoke(queue, "purge", new Object[] {}, new String[] {});
|
||||
conn.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge selected messages in the queue
|
||||
* @param queue - ObjectName of the queue to purge the messages from
|
||||
* @param messages - List of messages to purge
|
||||
* @throws Exception
|
||||
*/
|
||||
public void purgeMessages(ObjectName queue, List messages) throws Exception {
|
||||
JMXConnector conn = createJmxConnector();
|
||||
MBeanServerConnection server = conn.getMBeanServerConnection();
|
||||
|
||||
Object[] param = new Object[1];
|
||||
for (Iterator i=messages.iterator(); i.hasNext();) {
|
||||
CompositeData msg = (CompositeData)i.next();
|
||||
param[0] = "" + msg.get("JMSMessageID");
|
||||
GlobalWriter.printInfo("Removing message: " + param[0] + " from queue: " + queue.getKeyProperty("Destination"));
|
||||
server.invoke(queue, "removeMessage", param, new String[] {"java.lang.String"});
|
||||
}
|
||||
|
||||
conn.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the --msgsel, --xmsgsel.
|
||||
* @param token - option token to handle
|
||||
* @param tokens - succeeding command arguments
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void handleOption(String token, List tokens) throws Exception {
|
||||
// If token is an additive message selector option
|
||||
if (token.startsWith("--msgsel")) {
|
||||
|
||||
// If no message selector is specified, or next token is a new option
|
||||
if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
|
||||
GlobalWriter.printException(new IllegalArgumentException("Message selector not specified"));
|
||||
return;
|
||||
}
|
||||
|
||||
StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
|
||||
while (queryTokens.hasMoreTokens()) {
|
||||
queryAddObjects.add(queryTokens.nextToken());
|
||||
}
|
||||
}
|
||||
|
||||
// If token is a substractive message selector option
|
||||
else if (token.startsWith("--xmsgsel")) {
|
||||
|
||||
// If no message selector is specified, or next token is a new option
|
||||
if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
|
||||
GlobalWriter.printException(new IllegalArgumentException("Message selector not specified"));
|
||||
return;
|
||||
}
|
||||
|
||||
StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
|
||||
while (queryTokens.hasMoreTokens()) {
|
||||
querySubObjects.add(queryTokens.nextToken());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Let super class handle unknown option
|
||||
else {
|
||||
super.handleOption(token, tokens);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Print the help messages for the browse command
|
||||
*/
|
||||
protected void printHelp() {
|
||||
GlobalWriter.printHelp(helpFile);
|
||||
}
|
||||
|
||||
protected String[] helpFile = new String[] {
|
||||
"Task Usage: Main purge [browse-options] <destinations>",
|
||||
"Description: Delete selected destination's messages that matches the message selector.",
|
||||
"",
|
||||
"Browse Options:",
|
||||
" --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to",
|
||||
" the messages selector format.",
|
||||
" --jmxurl <url> Set the JMX URL to connect to.",
|
||||
" --version Display the version information.",
|
||||
" -h,-?,--help Display the browse broker help information.",
|
||||
"",
|
||||
"Examples:",
|
||||
" Main purge FOO.BAR",
|
||||
" - Delete all the messages in queue FOO.BAR",
|
||||
|
||||
" Main purge --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.*",
|
||||
" - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in",
|
||||
" the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the",
|
||||
" queue FOO.BAR",
|
||||
" * To use wildcard queries, the field must be a string and the query enclosed in ''",
|
||||
"",
|
||||
};
|
||||
}
|
|
@ -69,7 +69,9 @@ public class ShellCommand extends AbstractCommand {
|
|||
} else if (taskToken.equals("query")) {
|
||||
new QueryCommand().execute(tokens);
|
||||
} else if (taskToken.equals("browse")) {
|
||||
new BrowseCommand().execute(tokens);
|
||||
new AmqBrowseCommand().execute(tokens);
|
||||
} else if (taskToken.equals("purge")) {
|
||||
new PurgeCommand().execute(tokens);
|
||||
} else {
|
||||
// If not valid task, push back to list
|
||||
tokens.add(0, taskToken);
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.console.filter;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.QueueBrowser;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class AmqMessagesQueryFilter extends AbstractQueryFilter {
|
||||
|
||||
private URI brokerUrl;
|
||||
private Destination destination;
|
||||
|
||||
/**
|
||||
* Create a JMS message query filter
|
||||
* @param brokerUrl - broker url to connect to
|
||||
* @param destination - JMS destination to query
|
||||
*/
|
||||
public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) {
|
||||
super(null);
|
||||
this.brokerUrl = brokerUrl;
|
||||
this.destination = destination;
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries the specified destination using the message selector format query
|
||||
* @param queries - message selector queries
|
||||
* @return list messages that matches the selector
|
||||
* @throws Exception
|
||||
*/
|
||||
public List query(List queries) throws Exception {
|
||||
String selector = "";
|
||||
|
||||
// Convert to message selector
|
||||
for (Iterator i=queries.iterator(); i.hasNext();) {
|
||||
selector = selector + "(" + i.next().toString() + ") AND ";
|
||||
}
|
||||
|
||||
// Remove last AND
|
||||
if (selector != "") {
|
||||
selector = selector.substring(0, selector.length() - 5);
|
||||
}
|
||||
|
||||
if (destination instanceof ActiveMQQueue) {
|
||||
return queryMessages((ActiveMQQueue)destination, selector);
|
||||
} else {
|
||||
return queryMessages((ActiveMQTopic)destination, selector);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the messages of a queue destination using a queue browser
|
||||
* @param queue - queue destination
|
||||
* @param selector - message selector
|
||||
* @return list of messages that matches the selector
|
||||
* @throws Exception
|
||||
*/
|
||||
protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
|
||||
Connection conn = createConnection(getBrokerUrl());
|
||||
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
QueueBrowser browser = sess.createBrowser(queue, selector);
|
||||
|
||||
List messages = Collections.list(browser.getEnumeration());
|
||||
|
||||
conn.close();
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the messages of a topic destination using a message consumer
|
||||
* @param topic - topic destination
|
||||
* @param selector - message selector
|
||||
* @return list of messages that matches the selector
|
||||
* @throws Exception
|
||||
*/
|
||||
protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
|
||||
// TODO: should we use a durable subscriber or a retroactive non-durable subscriber?
|
||||
// TODO: if a durable subscriber is used, how do we manage it? subscribe/unsubscribe tasks?
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and start a JMS connection
|
||||
* @param brokerUrl - broker url to connect to.
|
||||
* @return JMS connection
|
||||
* @throws JMSException
|
||||
*/
|
||||
protected Connection createConnection(URI brokerUrl) throws JMSException {
|
||||
Connection conn = (new ActiveMQConnectionFactory(brokerUrl)).createConnection();
|
||||
conn.start();
|
||||
return conn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the broker url being used.
|
||||
* @return broker url
|
||||
*/
|
||||
public URI getBrokerUrl() {
|
||||
return brokerUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the broker url to use.
|
||||
* @param brokerUrl - broker url
|
||||
*/
|
||||
public void setBrokerUrl(URI brokerUrl) {
|
||||
this.brokerUrl = brokerUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the destination being used.
|
||||
* @return - JMS destination
|
||||
*/
|
||||
public Destination getDestination() {
|
||||
return destination;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the destination to use.
|
||||
* @param destination - JMS destination
|
||||
*/
|
||||
public void setDestination(Destination destination) {
|
||||
this.destination = destination;
|
||||
}
|
||||
|
||||
}
|
|
@ -83,7 +83,7 @@ public class MBeansRegExQueryFilter extends RegExQueryFilter {
|
|||
}
|
||||
|
||||
/**
|
||||
* Try to match the attribute list using teh regular expression map
|
||||
* Try to match the attribute list using the regular expression map
|
||||
* @param data - attribute list to match
|
||||
* @param regex - regex map
|
||||
* @return true if the attribute list matches the regex map
|
||||
|
|
|
@ -30,6 +30,7 @@ import javax.management.ObjectInstance;
|
|||
import javax.management.ObjectName;
|
||||
import javax.management.AttributeList;
|
||||
import javax.management.Attribute;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.DeliveryMode;
|
||||
import java.util.Map;
|
||||
|
@ -256,4 +257,59 @@ public class MapTransformFilter extends ResultTransformFilter {
|
|||
|
||||
return props;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform an openMBean composite data to a Map
|
||||
* @param data - composite data to transform
|
||||
* @return map object
|
||||
*/
|
||||
protected Map transformToMap(CompositeDataSupport data) {
|
||||
Properties props = new Properties();
|
||||
|
||||
String typeName = data.getCompositeType().getTypeName();
|
||||
|
||||
// Retrieve text message
|
||||
if (typeName.equals(ActiveMQTextMessage.class.getName())) {
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + "Text", data.get("Text").toString());
|
||||
|
||||
// Retrieve byte preview
|
||||
} else if (typeName.equals(ActiveMQBytesMessage.class.getName())) {
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + "BodyLength", data.get("BodyLength").toString());
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + "BodyPreview", new String((byte[])data.get("BodyPreview")));
|
||||
|
||||
// Expand content map
|
||||
} else if (typeName.equals(ActiveMQMapMessage.class.getName())) {
|
||||
Map contentMap = (Map)data.get("ContentMap");
|
||||
for (Iterator i=contentMap.keySet().iterator(); i.hasNext();) {
|
||||
String key = (String)i.next();
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + key, contentMap.get(key).toString());
|
||||
}
|
||||
|
||||
// Do nothing
|
||||
} else if (typeName.equals(ActiveMQObjectMessage.class.getName()) ||
|
||||
typeName.equals(ActiveMQStreamMessage.class.getName()) ||
|
||||
typeName.equals(ActiveMQMessage.class.getName())) {
|
||||
|
||||
// Unrecognized composite data. Throw exception.
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unrecognized composite data to transform. composite type: " + typeName);
|
||||
}
|
||||
|
||||
// Process the JMS message header values
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSCorrelationID", "" + data.get("JMSCorrelationID"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSDestination", "" + data.get("JMSDestination"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSMessageID", "" + data.get("JMSMessageID"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSReplyTo", "" + data.get("JMSReplyTo"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSType", "" + data.get("JMSType"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSDeliveryMode", "" + data.get("JMSDeliveryMode"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSExpiration", "" + data.get("JMSExpiration"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSPriority", "" + data.get("JMSPriority"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSRedelivered", "" + data.get("JMSRedelivered"));
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSTimestamp", "" + data.get("JMSTimestamp"));
|
||||
|
||||
// Process the JMS custom message properties
|
||||
props.setProperty(AmqMessagesUtil.JMS_MESSAGE_CUSTOM_PREFIX + "Properties", "" + data.get("Properties"));
|
||||
|
||||
return props;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,34 +16,32 @@
|
|||
*/
|
||||
package org.apache.activemq.console.filter;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.QueueBrowser;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import javax.management.remote.JMXConnector;
|
||||
import javax.management.remote.JMXServiceURL;
|
||||
import javax.management.remote.JMXConnectorFactory;
|
||||
import javax.management.MBeanServerConnection;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
import java.util.Arrays;
|
||||
import java.io.IOException;
|
||||
|
||||
public class MessagesQueryFilter extends AbstractQueryFilter {
|
||||
|
||||
private URI brokerUrl;
|
||||
private Destination destination;
|
||||
private JMXServiceURL jmxServiceUrl;
|
||||
private ObjectName destName;
|
||||
|
||||
/**
|
||||
* Create a JMS message query filter
|
||||
* @param brokerUrl - broker url to connect to
|
||||
* @param destination - JMS destination to query
|
||||
* @param jmxServiceUrl - JMX service URL to connect to
|
||||
* @param destName - object name query to retrieve the destination
|
||||
*/
|
||||
public MessagesQueryFilter(URI brokerUrl, Destination destination) {
|
||||
public MessagesQueryFilter(JMXServiceURL jmxServiceUrl, ObjectName destName) {
|
||||
super(null);
|
||||
this.brokerUrl = brokerUrl;
|
||||
this.destination = destination;
|
||||
this.jmxServiceUrl = jmxServiceUrl;
|
||||
this.destName = destName;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -65,88 +63,54 @@ public class MessagesQueryFilter extends AbstractQueryFilter {
|
|||
selector = selector.substring(0, selector.length() - 5);
|
||||
}
|
||||
|
||||
if (destination instanceof ActiveMQQueue) {
|
||||
return queryMessages((ActiveMQQueue)destination, selector);
|
||||
} else {
|
||||
return queryMessages((ActiveMQTopic)destination, selector);
|
||||
}
|
||||
return queryMessages(selector);
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the messages of a queue destination using a queue browser
|
||||
* @param queue - queue destination
|
||||
* Query the messages of a queue destination using JMX
|
||||
* @param selector - message selector
|
||||
* @return list of messages that matches the selector
|
||||
* @throws Exception
|
||||
*/
|
||||
protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
|
||||
Connection conn = createConnection(getBrokerUrl());
|
||||
protected List queryMessages(String selector) throws Exception {
|
||||
JMXConnector connector = createJmxConnector();
|
||||
MBeanServerConnection server = connector.getMBeanServerConnection();
|
||||
CompositeData[] messages = (CompositeData[])server.invoke(destName, "browse", new Object[] {}, new String[] {});
|
||||
connector.close();
|
||||
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
QueueBrowser browser = sess.createBrowser(queue, selector);
|
||||
|
||||
List messages = Collections.list(browser.getEnumeration());
|
||||
|
||||
conn.close();
|
||||
|
||||
return messages;
|
||||
return Arrays.asList(messages);
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the messages of a topic destination using a message consumer
|
||||
* @param topic - topic destination
|
||||
* @param selector - message selector
|
||||
* @return list of messages that matches the selector
|
||||
* @throws Exception
|
||||
* Get the JMX service URL the query is connecting to.
|
||||
* @return JMX service URL
|
||||
*/
|
||||
protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
|
||||
// TODO: should we use a durable subscriber or a retroactive non-durable subscriber?
|
||||
// TODO: if a durable subscriber is used, how do we manage it? subscribe/unsubscribe tasks?
|
||||
return null;
|
||||
public JMXServiceURL getJmxServiceUrl() {
|
||||
return jmxServiceUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and start a JMS connection
|
||||
* @param brokerUrl - broker url to connect to.
|
||||
* @return JMS connection
|
||||
* @throws JMSException
|
||||
* Sets the JMX service URL the query is going to connect to.
|
||||
* @param jmxServiceUrl - new JMX service URL
|
||||
*/
|
||||
protected Connection createConnection(URI brokerUrl) throws JMSException {
|
||||
Connection conn = (new ActiveMQConnectionFactory(brokerUrl)).createConnection();
|
||||
conn.start();
|
||||
return conn;
|
||||
public void setJmxServiceUrl(JMXServiceURL jmxServiceUrl) {
|
||||
this.jmxServiceUrl = jmxServiceUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the broker url being used.
|
||||
* @return broker url
|
||||
* Sets the JMX service URL the query is going to connect to.
|
||||
* @param jmxServiceUrl - new JMX service URL
|
||||
*/
|
||||
public URI getBrokerUrl() {
|
||||
return brokerUrl;
|
||||
public void setJmxServiceUrl(String jmxServiceUrl) throws MalformedURLException {
|
||||
setJmxServiceUrl(new JMXServiceURL(jmxServiceUrl));
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the broker url to use.
|
||||
* @param brokerUrl - broker url
|
||||
* Creates a JMX connector
|
||||
* @return JMX connector
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
public void setBrokerUrl(URI brokerUrl) {
|
||||
this.brokerUrl = brokerUrl;
|
||||
protected JMXConnector createJmxConnector() throws IOException {
|
||||
return JMXConnectorFactory.connect(getJmxServiceUrl());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the destination being used.
|
||||
* @return - JMS destination
|
||||
*/
|
||||
public Destination getDestination() {
|
||||
return destination;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the destination to use.
|
||||
* @param destination - JMS destination
|
||||
*/
|
||||
public void setDestination(Destination destination) {
|
||||
this.destination = destination;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,11 +18,11 @@ package org.apache.activemq.console.util;
|
|||
|
||||
import org.apache.activemq.console.filter.QueryFilter;
|
||||
import org.apache.activemq.console.filter.WildcardToMsgSelectorTransformFilter;
|
||||
import org.apache.activemq.console.filter.MessagesQueryFilter;
|
||||
import org.apache.activemq.console.filter.PropertiesViewFilter;
|
||||
import org.apache.activemq.console.filter.StubQueryFilter;
|
||||
import org.apache.activemq.console.filter.MapTransformFilter;
|
||||
import org.apache.activemq.console.filter.GroupPropertiesViewFilter;
|
||||
import org.apache.activemq.console.filter.AmqMessagesQueryFilter;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import java.net.URI;
|
||||
|
@ -58,7 +58,7 @@ public class AmqMessagesUtil {
|
|||
|
||||
public static QueryFilter createMessageQueryFilter(URI brokerUrl, Destination dest) {
|
||||
return new WildcardToMsgSelectorTransformFilter(
|
||||
new MessagesQueryFilter(brokerUrl, dest)
|
||||
new AmqMessagesQueryFilter(brokerUrl, dest)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,16 +16,10 @@
|
|||
*/
|
||||
package org.apache.activemq.console.util;
|
||||
|
||||
import org.apache.activemq.console.filter.QueryFilter;
|
||||
import org.apache.activemq.console.filter.MBeansObjectNameQueryFilter;
|
||||
import org.apache.activemq.console.filter.WildcardToRegExTransformFilter;
|
||||
import org.apache.activemq.console.filter.MBeansRegExQueryFilter;
|
||||
import org.apache.activemq.console.filter.MBeansAttributeQueryFilter;
|
||||
import org.apache.activemq.console.filter.PropertiesViewFilter;
|
||||
import org.apache.activemq.console.filter.StubQueryFilter;
|
||||
import org.apache.activemq.console.filter.MapTransformFilter;
|
||||
import org.apache.activemq.console.filter.*;
|
||||
|
||||
import javax.management.remote.JMXServiceURL;
|
||||
import javax.management.ObjectName;
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
|
@ -113,4 +107,20 @@ public class JmxMBeansUtil {
|
|||
)
|
||||
);
|
||||
}
|
||||
|
||||
public static QueryFilter createMessageQueryFilter(JMXServiceURL jmxUrl, ObjectName destName) {
|
||||
return new WildcardToMsgSelectorTransformFilter(
|
||||
new MessagesQueryFilter(jmxUrl, destName)
|
||||
);
|
||||
}
|
||||
|
||||
public static List filterMessagesView(List messages, Set groupViews, Set attributeViews) throws Exception {
|
||||
return (new PropertiesViewFilter(attributeViews,
|
||||
new GroupPropertiesViewFilter(groupViews,
|
||||
new MapTransformFilter(
|
||||
new StubQueryFilter(messages)
|
||||
)
|
||||
)
|
||||
)).query("");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue