diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/AmqBrowseCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/AmqBrowseCommand.java new file mode 100644 index 0000000000..d6f081a03e --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/AmqBrowseCommand.java @@ -0,0 +1,240 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command; + +import org.apache.activemq.console.util.AmqMessagesUtil; +import org.apache.activemq.console.formatter.GlobalWriter; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; + +import javax.jms.Destination; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.StringTokenizer; +import java.util.Iterator; + +public class AmqBrowseCommand extends AbstractAmqCommand { + public static final String QUEUE_PREFIX = "queue:"; + public static final String TOPIC_PREFIX = "topic:"; + + public static final String VIEW_GROUP_HEADER = "header:"; + public static final String VIEW_GROUP_CUSTOM = "custom:"; + public static final String VIEW_GROUP_BODY = "body:"; + + private final List queryAddObjects = new ArrayList(10); + private final List querySubObjects = new ArrayList(10); + private final Set groupViews = new HashSet(10); + private final Set queryViews = new HashSet(10); + + /** + * Execute the browse command, which allows you to browse the messages in a given JMS destination + * @param tokens - command arguments + * @throws Exception + */ + protected void runTask(List tokens) throws Exception { + try { + // If no destination specified + if (tokens.isEmpty()) { + GlobalWriter.printException(new IllegalArgumentException("No JMS destination specified.")); + return; + } + + // If no broker url specified + if (getBrokerUrl() == null) { + GlobalWriter.printException(new IllegalStateException("No broker url specified. Use the --amqurl option to specify a broker url.")); + return; + } + + // Display the messages for each destination + for (Iterator i=tokens.iterator(); i.hasNext();) { + String destName = (String)i.next(); + Destination dest; + + // If destination has been explicitly specified as a queue + if (destName.startsWith(QUEUE_PREFIX)) { + dest = new ActiveMQQueue(destName.substring(QUEUE_PREFIX.length())); + + // If destination has been explicitly specified as a topic + } else if (destName.startsWith(TOPIC_PREFIX)) { + dest = new ActiveMQTopic(destName.substring(TOPIC_PREFIX.length())); + + // By default destination is assumed to be a queue + } else { + dest = new ActiveMQQueue(destName); + } + + // Query for the messages to view + List addMsgs = AmqMessagesUtil.getMessages(getBrokerUrl(), dest, queryAddObjects); + + // Query for the messages to remove from view + if (querySubObjects.size() > 0) { + List subMsgs = AmqMessagesUtil.getMessages(getBrokerUrl(), dest, querySubObjects); + addMsgs.removeAll(subMsgs); + } + + // Display the messages + GlobalWriter.printMessage(AmqMessagesUtil.filterMessagesView(addMsgs, groupViews, queryViews)); + } + + } catch (Exception e) { + GlobalWriter.printException(new RuntimeException("Failed to execute browse task. Reason: " + e)); + throw new Exception(e); + } + } + + /** + * Handle the --msgsel, --xmsgsel, --view, -V options. + * @param token - option token to handle + * @param tokens - succeeding command arguments + * @throws Exception + */ + protected void handleOption(String token, List tokens) throws Exception { + + // If token is an additive message selector option + if (token.startsWith("--msgsel")) { + + // If no message selector is specified, or next token is a new option + if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { + GlobalWriter.printException(new IllegalArgumentException("Message selector not specified")); + return; + } + + StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); + while (queryTokens.hasMoreTokens()) { + queryAddObjects.add(queryTokens.nextToken()); + } + } + + // If token is a substractive message selector option + else if (token.startsWith("--xmsgsel")) { + + // If no message selector is specified, or next token is a new option + if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { + GlobalWriter.printException(new IllegalArgumentException("Message selector not specified")); + return; + } + + StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); + while (queryTokens.hasMoreTokens()) { + querySubObjects.add(queryTokens.nextToken()); + } + + } + + // If token is a view option + else if (token.startsWith("--view")) { + + // If no view specified, or next token is a new option + if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { + GlobalWriter.printException(new IllegalArgumentException("Attributes to view not specified")); + return; + } + + // Add the attributes to view + StringTokenizer viewTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); + while (viewTokens.hasMoreTokens()) { + String viewToken = viewTokens.nextToken(); + + // If view is explicitly specified to belong to the JMS header + if (viewToken.equals(VIEW_GROUP_HEADER)) { + queryViews.add(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + viewToken.substring(VIEW_GROUP_HEADER.length())); + + // If view is explicitly specified to belong to the JMS custom header + } else if (viewToken.equals(VIEW_GROUP_CUSTOM)) { + queryViews.add(AmqMessagesUtil.JMS_MESSAGE_CUSTOM_PREFIX + viewToken.substring(VIEW_GROUP_CUSTOM.length())); + + // If view is explicitly specified to belong to the JMS body + } else if (viewToken.equals(VIEW_GROUP_BODY)) { + queryViews.add(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + viewToken.substring(VIEW_GROUP_BODY.length())); + + // If no view explicitly specified, let's check the view for each group + } else { + queryViews.add(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + viewToken); + queryViews.add(AmqMessagesUtil.JMS_MESSAGE_CUSTOM_PREFIX + viewToken); + queryViews.add(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + viewToken); + } + } + } + + // If token is a predefined group view option + else if (token.startsWith("-V")) { + String viewGroup = token.substring(2); + // If option is a header group view + if (viewGroup.equals("header")) { + groupViews.add(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX); + + // If option is a custom header group view + } else if (viewGroup.equals("custom")) { + groupViews.add(AmqMessagesUtil.JMS_MESSAGE_CUSTOM_PREFIX); + + // If option is a body group view + } else if (viewGroup.equals("body")) { + groupViews.add(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX); + + // Unknown group view + } else { + GlobalWriter.printInfo("Unknown group view: " + viewGroup + ". Ignoring group view option."); + } + } + + // Let super class handle unknown option + else { + super.handleOption(token, tokens); + } + } + + /** + * Print the help messages for the browse command + */ + protected void printHelp() { + GlobalWriter.printHelp(helpFile); + } + + protected String[] helpFile = new String[] { + "Task Usage: Main browse --amqurl [browse-options] ", + "Description: Display selected destination's messages.", + "", + "Browse Options:", + " --amqurl Set the broker URL to connect to.", + " --msgsel Add to the search list messages matched by the query similar to", + " the messages selector format.", + " -V Predefined view that allows you to view the message header, custom", + " message header, or the message body.", + " --view ,,... Select the specific attribute of the message to view.", + " --version Display the version information.", + " -h,-?,--help Display the browse broker help information.", + "", + "Examples:", + " Main browse --amqurl tcp://localhost:61616 FOO.BAR", + " - Print the message header, custom message header, and message body of all messages in the", + " queue FOO.BAR", + "", + " Main browse --amqurl tcp://localhost:61616 -Vheader,body queue:FOO.BAR", + " - Print only the message header and message body of all messages in the queue FOO.BAR", + "", + " Main browse --amqurl tcp://localhost:61616 -Vheader --view custom:MyField queue:FOO.BAR", + " - Print the message header and the custom field 'MyField' of all messages in the queue FOO.BAR", + "", + " Main browse --amqurl tcp://localhost:61616 --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.BAR", + " - Print all the message fields that has a JMSMessageID in the header field that matches the", + " wildcard *:10, and has a JMSPriority field > 5 in the queue FOO.BAR", + " * To use wildcard queries, the field must be a string and the query enclosed in ''", + "", + }; +} diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/BrowseCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/BrowseCommand.java index 25607f49d2..9d1fbf5747 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/BrowseCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/BrowseCommand.java @@ -16,20 +16,19 @@ */ package org.apache.activemq.console.command; -import org.apache.activemq.console.util.AmqMessagesUtil; import org.apache.activemq.console.formatter.GlobalWriter; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.console.util.AmqMessagesUtil; +import org.apache.activemq.console.util.JmxMBeansUtil; -import javax.jms.Destination; +import javax.management.ObjectInstance; import java.util.List; +import java.util.StringTokenizer; import java.util.ArrayList; import java.util.Set; import java.util.HashSet; -import java.util.StringTokenizer; import java.util.Iterator; -public class BrowseCommand extends AbstractAmqCommand { +public class BrowseCommand extends AbstractJmxCommand { public static final String QUEUE_PREFIX = "queue:"; public static final String TOPIC_PREFIX = "topic:"; @@ -49,49 +48,21 @@ public class BrowseCommand extends AbstractAmqCommand { */ protected void runTask(List tokens) throws Exception { try { - // If no destination specified + // If there is no queue name specified, let's select all if (tokens.isEmpty()) { - GlobalWriter.printException(new IllegalArgumentException("No JMS destination specified.")); - return; + tokens.add("*"); } - // If no broker url specified - if (getBrokerUrl() == null) { - GlobalWriter.printException(new IllegalStateException("No broker url specified. Use the --amqurl option to specify a broker url.")); - return; - } - - // Display the messages for each destination + // Iterate through the queue names for (Iterator i=tokens.iterator(); i.hasNext();) { - String destName = (String)i.next(); - Destination dest; + List queueList = JmxMBeansUtil.queryMBeans(useJmxServiceUrl(), "Type=Queue,Destination=" + i.next() + ",*"); - // If destination has been explicitly specified as a queue - if (destName.startsWith(QUEUE_PREFIX)) { - dest = new ActiveMQQueue(destName.substring(QUEUE_PREFIX.length())); - - // If destination has been explicitly specified as a topic - } else if (destName.startsWith(TOPIC_PREFIX)) { - dest = new ActiveMQTopic(destName.substring(TOPIC_PREFIX.length())); - - // By default destination is assumed to be a queue - } else { - dest = new ActiveMQQueue(destName); + // Iterate through the queue result + for (Iterator j=queueList.iterator(); j.hasNext();) { + List messages = JmxMBeansUtil.createMessageQueryFilter(useJmxServiceUrl(), ((ObjectInstance)j.next()).getObjectName()).query(queryAddObjects); + GlobalWriter.printMessage(JmxMBeansUtil.filterMessagesView(messages, groupViews, queryViews)); } - - // Query for the messages to view - List addMsgs = AmqMessagesUtil.getMessages(getBrokerUrl(), dest, queryAddObjects); - - // Query for the messages to remove from view - if (querySubObjects.size() > 0) { - List subMsgs = AmqMessagesUtil.getMessages(getBrokerUrl(), dest, querySubObjects); - addMsgs.removeAll(subMsgs); - } - - // Display the messages - GlobalWriter.printMessage(AmqMessagesUtil.filterMessagesView(addMsgs, groupViews, queryViews)); } - } catch (Exception e) { GlobalWriter.printException(new RuntimeException("Failed to execute browse task. Reason: " + e)); throw new Exception(e); @@ -207,31 +178,31 @@ public class BrowseCommand extends AbstractAmqCommand { } protected String[] helpFile = new String[] { - "Task Usage: Main browse --amqurl [browse-options] ", + "Task Usage: Main browse [browse-options] ", "Description: Display selected destination's messages.", "", "Browse Options:", - " --amqurl Set the broker URL to connect to.", " --msgsel Add to the search list messages matched by the query similar to", " the messages selector format.", " -V Predefined view that allows you to view the message header, custom", " message header, or the message body.", " --view ,,... Select the specific attribute of the message to view.", + " --jmxurl Set the JMX URL to connect to.", " --version Display the version information.", " -h,-?,--help Display the browse broker help information.", "", "Examples:", - " Main browse --amqurl tcp://localhost:61616 FOO.BAR", + " Main browse FOO.BAR", " - Print the message header, custom message header, and message body of all messages in the", " queue FOO.BAR", "", - " Main browse --amqurl tcp://localhost:61616 -Vheader,body queue:FOO.BAR", + " Main browse -Vheader,body queue:FOO.BAR", " - Print only the message header and message body of all messages in the queue FOO.BAR", "", - " Main browse --amqurl tcp://localhost:61616 -Vheader --view custom:MyField queue:FOO.BAR", + " Main browse -Vheader --view custom:MyField queue:FOO.BAR", " - Print the message header and the custom field 'MyField' of all messages in the queue FOO.BAR", "", - " Main browse --amqurl tcp://localhost:61616 --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.BAR", + " Main browse --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.BAR", " - Print all the message fields that has a JMSMessageID in the header field that matches the", " wildcard *:10, and has a JMSPriority field > 5 in the queue FOO.BAR", " * To use wildcard queries, the field must be a string and the query enclosed in ''", diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/PurgeCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/PurgeCommand.java new file mode 100644 index 0000000000..7528ac7438 --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/PurgeCommand.java @@ -0,0 +1,176 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command; + +import org.apache.activemq.console.formatter.GlobalWriter; +import org.apache.activemq.console.util.JmxMBeansUtil; + +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.MBeanServerConnection; +import javax.management.openmbean.CompositeData; +import javax.management.remote.JMXConnector; +import java.util.List; +import java.util.StringTokenizer; +import java.util.ArrayList; +import java.util.Iterator; + +public class PurgeCommand extends AbstractJmxCommand { + + private final List queryAddObjects = new ArrayList(10); + private final List querySubObjects = new ArrayList(10); + + /** + * Execute the purge command, which allows you to purge the messages in a given JMS destination + * @param tokens - command arguments + * @throws Exception + */ + protected void runTask(List tokens) throws Exception { + try { + // If there is no queue name specified, let's select all + if (tokens.isEmpty()) { + tokens.add("*"); + } + + // Iterate through the queue names + for (Iterator i=tokens.iterator(); i.hasNext();) { + List queueList = JmxMBeansUtil.queryMBeans(useJmxServiceUrl(), "Type=Queue,Destination=" + i.next() + ",*"); + + for (Iterator j=queueList.iterator(); j.hasNext();) { + ObjectName queueName = ((ObjectInstance)j.next()).getObjectName(); + if (queryAddObjects.isEmpty()) { + purgeQueue(queueName); + } else { + List messages = JmxMBeansUtil.createMessageQueryFilter(useJmxServiceUrl(), queueName).query(queryAddObjects); + purgeMessages(queueName, messages); + } + } + } + } catch (Exception e) { + GlobalWriter.printException(new RuntimeException("Failed to execute purge task. Reason: " + e)); + throw new Exception(e); + } + } + + /** + * Purge all the messages in the queue + * @param queue - ObjectName of the queue to purge + * @throws Exception + */ + public void purgeQueue(ObjectName queue) throws Exception { + JMXConnector conn = createJmxConnector(); + MBeanServerConnection server = conn.getMBeanServerConnection(); + GlobalWriter.printInfo("Purging all messages in queue: " + queue.getKeyProperty("Destination")); + server.invoke(queue, "purge", new Object[] {}, new String[] {}); + conn.close(); + } + + /** + * Purge selected messages in the queue + * @param queue - ObjectName of the queue to purge the messages from + * @param messages - List of messages to purge + * @throws Exception + */ + public void purgeMessages(ObjectName queue, List messages) throws Exception { + JMXConnector conn = createJmxConnector(); + MBeanServerConnection server = conn.getMBeanServerConnection(); + + Object[] param = new Object[1]; + for (Iterator i=messages.iterator(); i.hasNext();) { + CompositeData msg = (CompositeData)i.next(); + param[0] = "" + msg.get("JMSMessageID"); + GlobalWriter.printInfo("Removing message: " + param[0] + " from queue: " + queue.getKeyProperty("Destination")); + server.invoke(queue, "removeMessage", param, new String[] {"java.lang.String"}); + } + + conn.close(); + } + + /** + * Handle the --msgsel, --xmsgsel. + * @param token - option token to handle + * @param tokens - succeeding command arguments + * @throws Exception + */ + protected void handleOption(String token, List tokens) throws Exception { + // If token is an additive message selector option + if (token.startsWith("--msgsel")) { + + // If no message selector is specified, or next token is a new option + if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { + GlobalWriter.printException(new IllegalArgumentException("Message selector not specified")); + return; + } + + StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); + while (queryTokens.hasMoreTokens()) { + queryAddObjects.add(queryTokens.nextToken()); + } + } + + // If token is a substractive message selector option + else if (token.startsWith("--xmsgsel")) { + + // If no message selector is specified, or next token is a new option + if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { + GlobalWriter.printException(new IllegalArgumentException("Message selector not specified")); + return; + } + + StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); + while (queryTokens.hasMoreTokens()) { + querySubObjects.add(queryTokens.nextToken()); + } + + } + + // Let super class handle unknown option + else { + super.handleOption(token, tokens); + } + } + + /** + * Print the help messages for the browse command + */ + protected void printHelp() { + GlobalWriter.printHelp(helpFile); + } + + protected String[] helpFile = new String[] { + "Task Usage: Main purge [browse-options] ", + "Description: Delete selected destination's messages that matches the message selector.", + "", + "Browse Options:", + " --msgsel Add to the search list messages matched by the query similar to", + " the messages selector format.", + " --jmxurl Set the JMX URL to connect to.", + " --version Display the version information.", + " -h,-?,--help Display the browse broker help information.", + "", + "Examples:", + " Main purge FOO.BAR", + " - Delete all the messages in queue FOO.BAR", + + " Main purge --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.*", + " - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in", + " the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the", + " queue FOO.BAR", + " * To use wildcard queries, the field must be a string and the query enclosed in ''", + "", + }; +} diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java index 7c7f51d245..cdee5edc78 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java @@ -69,7 +69,9 @@ public class ShellCommand extends AbstractCommand { } else if (taskToken.equals("query")) { new QueryCommand().execute(tokens); } else if (taskToken.equals("browse")) { - new BrowseCommand().execute(tokens); + new AmqBrowseCommand().execute(tokens); + } else if (taskToken.equals("purge")) { + new PurgeCommand().execute(tokens); } else { // If not valid task, push back to list tokens.add(0, taskToken); diff --git a/activemq-console/src/main/java/org/apache/activemq/console/filter/AmqMessagesQueryFilter.java b/activemq-console/src/main/java/org/apache/activemq/console/filter/AmqMessagesQueryFilter.java new file mode 100644 index 0000000000..a9452dee19 --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/filter/AmqMessagesQueryFilter.java @@ -0,0 +1,152 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.filter; + +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.ActiveMQConnectionFactory; + +import javax.jms.Destination; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.QueueBrowser; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Iterator; + +public class AmqMessagesQueryFilter extends AbstractQueryFilter { + + private URI brokerUrl; + private Destination destination; + + /** + * Create a JMS message query filter + * @param brokerUrl - broker url to connect to + * @param destination - JMS destination to query + */ + public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) { + super(null); + this.brokerUrl = brokerUrl; + this.destination = destination; + } + + /** + * Queries the specified destination using the message selector format query + * @param queries - message selector queries + * @return list messages that matches the selector + * @throws Exception + */ + public List query(List queries) throws Exception { + String selector = ""; + + // Convert to message selector + for (Iterator i=queries.iterator(); i.hasNext();) { + selector = selector + "(" + i.next().toString() + ") AND "; + } + + // Remove last AND + if (selector != "") { + selector = selector.substring(0, selector.length() - 5); + } + + if (destination instanceof ActiveMQQueue) { + return queryMessages((ActiveMQQueue)destination, selector); + } else { + return queryMessages((ActiveMQTopic)destination, selector); + } + } + + /** + * Query the messages of a queue destination using a queue browser + * @param queue - queue destination + * @param selector - message selector + * @return list of messages that matches the selector + * @throws Exception + */ + protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception { + Connection conn = createConnection(getBrokerUrl()); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser browser = sess.createBrowser(queue, selector); + + List messages = Collections.list(browser.getEnumeration()); + + conn.close(); + + return messages; + } + + /** + * Query the messages of a topic destination using a message consumer + * @param topic - topic destination + * @param selector - message selector + * @return list of messages that matches the selector + * @throws Exception + */ + protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception { + // TODO: should we use a durable subscriber or a retroactive non-durable subscriber? + // TODO: if a durable subscriber is used, how do we manage it? subscribe/unsubscribe tasks? + return null; + } + + /** + * Create and start a JMS connection + * @param brokerUrl - broker url to connect to. + * @return JMS connection + * @throws JMSException + */ + protected Connection createConnection(URI brokerUrl) throws JMSException { + Connection conn = (new ActiveMQConnectionFactory(brokerUrl)).createConnection(); + conn.start(); + return conn; + } + + /** + * Get the broker url being used. + * @return broker url + */ + public URI getBrokerUrl() { + return brokerUrl; + } + + /** + * Set the broker url to use. + * @param brokerUrl - broker url + */ + public void setBrokerUrl(URI brokerUrl) { + this.brokerUrl = brokerUrl; + } + + /** + * Get the destination being used. + * @return - JMS destination + */ + public Destination getDestination() { + return destination; + } + + /** + * Set the destination to use. + * @param destination - JMS destination + */ + public void setDestination(Destination destination) { + this.destination = destination; + } + +} diff --git a/activemq-console/src/main/java/org/apache/activemq/console/filter/MBeansRegExQueryFilter.java b/activemq-console/src/main/java/org/apache/activemq/console/filter/MBeansRegExQueryFilter.java index bcb873348e..3e242841e9 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/filter/MBeansRegExQueryFilter.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/filter/MBeansRegExQueryFilter.java @@ -83,7 +83,7 @@ public class MBeansRegExQueryFilter extends RegExQueryFilter { } /** - * Try to match the attribute list using teh regular expression map + * Try to match the attribute list using the regular expression map * @param data - attribute list to match * @param regex - regex map * @return true if the attribute list matches the regex map diff --git a/activemq-console/src/main/java/org/apache/activemq/console/filter/MapTransformFilter.java b/activemq-console/src/main/java/org/apache/activemq/console/filter/MapTransformFilter.java index 25075c7e32..6778d0d3eb 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/filter/MapTransformFilter.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/filter/MapTransformFilter.java @@ -30,6 +30,7 @@ import javax.management.ObjectInstance; import javax.management.ObjectName; import javax.management.AttributeList; import javax.management.Attribute; +import javax.management.openmbean.CompositeDataSupport; import javax.jms.JMSException; import javax.jms.DeliveryMode; import java.util.Map; @@ -256,4 +257,59 @@ public class MapTransformFilter extends ResultTransformFilter { return props; } + + /** + * Transform an openMBean composite data to a Map + * @param data - composite data to transform + * @return map object + */ + protected Map transformToMap(CompositeDataSupport data) { + Properties props = new Properties(); + + String typeName = data.getCompositeType().getTypeName(); + + // Retrieve text message + if (typeName.equals(ActiveMQTextMessage.class.getName())) { + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + "Text", data.get("Text").toString()); + + // Retrieve byte preview + } else if (typeName.equals(ActiveMQBytesMessage.class.getName())) { + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + "BodyLength", data.get("BodyLength").toString()); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + "BodyPreview", new String((byte[])data.get("BodyPreview"))); + + // Expand content map + } else if (typeName.equals(ActiveMQMapMessage.class.getName())) { + Map contentMap = (Map)data.get("ContentMap"); + for (Iterator i=contentMap.keySet().iterator(); i.hasNext();) { + String key = (String)i.next(); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_BODY_PREFIX + key, contentMap.get(key).toString()); + } + + // Do nothing + } else if (typeName.equals(ActiveMQObjectMessage.class.getName()) || + typeName.equals(ActiveMQStreamMessage.class.getName()) || + typeName.equals(ActiveMQMessage.class.getName())) { + + // Unrecognized composite data. Throw exception. + } else { + throw new IllegalArgumentException("Unrecognized composite data to transform. composite type: " + typeName); + } + + // Process the JMS message header values + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSCorrelationID", "" + data.get("JMSCorrelationID")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSDestination", "" + data.get("JMSDestination")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSMessageID", "" + data.get("JMSMessageID")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSReplyTo", "" + data.get("JMSReplyTo")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSType", "" + data.get("JMSType")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSDeliveryMode", "" + data.get("JMSDeliveryMode")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSExpiration", "" + data.get("JMSExpiration")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSPriority", "" + data.get("JMSPriority")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSRedelivered", "" + data.get("JMSRedelivered")); + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_HEADER_PREFIX + "JMSTimestamp", "" + data.get("JMSTimestamp")); + + // Process the JMS custom message properties + props.setProperty(AmqMessagesUtil.JMS_MESSAGE_CUSTOM_PREFIX + "Properties", "" + data.get("Properties")); + + return props; + } } diff --git a/activemq-console/src/main/java/org/apache/activemq/console/filter/MessagesQueryFilter.java b/activemq-console/src/main/java/org/apache/activemq/console/filter/MessagesQueryFilter.java index a888b061ea..eaf216ee6c 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/filter/MessagesQueryFilter.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/filter/MessagesQueryFilter.java @@ -16,34 +16,32 @@ */ package org.apache.activemq.console.filter; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.ActiveMQConnectionFactory; - -import javax.jms.Destination; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.QueueBrowser; -import java.net.URI; -import java.util.Collections; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXServiceURL; +import javax.management.remote.JMXConnectorFactory; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import java.net.MalformedURLException; import java.util.List; import java.util.Iterator; +import java.util.Arrays; +import java.io.IOException; public class MessagesQueryFilter extends AbstractQueryFilter { - private URI brokerUrl; - private Destination destination; + private JMXServiceURL jmxServiceUrl; + private ObjectName destName; /** * Create a JMS message query filter - * @param brokerUrl - broker url to connect to - * @param destination - JMS destination to query + * @param jmxServiceUrl - JMX service URL to connect to + * @param destName - object name query to retrieve the destination */ - public MessagesQueryFilter(URI brokerUrl, Destination destination) { + public MessagesQueryFilter(JMXServiceURL jmxServiceUrl, ObjectName destName) { super(null); - this.brokerUrl = brokerUrl; - this.destination = destination; + this.jmxServiceUrl = jmxServiceUrl; + this.destName = destName; } /** @@ -65,88 +63,54 @@ public class MessagesQueryFilter extends AbstractQueryFilter { selector = selector.substring(0, selector.length() - 5); } - if (destination instanceof ActiveMQQueue) { - return queryMessages((ActiveMQQueue)destination, selector); - } else { - return queryMessages((ActiveMQTopic)destination, selector); - } + return queryMessages(selector); } /** - * Query the messages of a queue destination using a queue browser - * @param queue - queue destination + * Query the messages of a queue destination using JMX * @param selector - message selector * @return list of messages that matches the selector * @throws Exception */ - protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception { - Connection conn = createConnection(getBrokerUrl()); + protected List queryMessages(String selector) throws Exception { + JMXConnector connector = createJmxConnector(); + MBeanServerConnection server = connector.getMBeanServerConnection(); + CompositeData[] messages = (CompositeData[])server.invoke(destName, "browse", new Object[] {}, new String[] {}); + connector.close(); - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - QueueBrowser browser = sess.createBrowser(queue, selector); - - List messages = Collections.list(browser.getEnumeration()); - - conn.close(); - - return messages; + return Arrays.asList(messages); } /** - * Query the messages of a topic destination using a message consumer - * @param topic - topic destination - * @param selector - message selector - * @return list of messages that matches the selector - * @throws Exception + * Get the JMX service URL the query is connecting to. + * @return JMX service URL */ - protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception { - // TODO: should we use a durable subscriber or a retroactive non-durable subscriber? - // TODO: if a durable subscriber is used, how do we manage it? subscribe/unsubscribe tasks? - return null; + public JMXServiceURL getJmxServiceUrl() { + return jmxServiceUrl; } /** - * Create and start a JMS connection - * @param brokerUrl - broker url to connect to. - * @return JMS connection - * @throws JMSException + * Sets the JMX service URL the query is going to connect to. + * @param jmxServiceUrl - new JMX service URL */ - protected Connection createConnection(URI brokerUrl) throws JMSException { - Connection conn = (new ActiveMQConnectionFactory(brokerUrl)).createConnection(); - conn.start(); - return conn; + public void setJmxServiceUrl(JMXServiceURL jmxServiceUrl) { + this.jmxServiceUrl = jmxServiceUrl; } /** - * Get the broker url being used. - * @return broker url + * Sets the JMX service URL the query is going to connect to. + * @param jmxServiceUrl - new JMX service URL */ - public URI getBrokerUrl() { - return brokerUrl; + public void setJmxServiceUrl(String jmxServiceUrl) throws MalformedURLException { + setJmxServiceUrl(new JMXServiceURL(jmxServiceUrl)); } /** - * Set the broker url to use. - * @param brokerUrl - broker url + * Creates a JMX connector + * @return JMX connector + * @throws java.io.IOException */ - public void setBrokerUrl(URI brokerUrl) { - this.brokerUrl = brokerUrl; + protected JMXConnector createJmxConnector() throws IOException { + return JMXConnectorFactory.connect(getJmxServiceUrl()); } - - /** - * Get the destination being used. - * @return - JMS destination - */ - public Destination getDestination() { - return destination; - } - - /** - * Set the destination to use. - * @param destination - JMS destination - */ - public void setDestination(Destination destination) { - this.destination = destination; - } - } diff --git a/activemq-console/src/main/java/org/apache/activemq/console/util/AmqMessagesUtil.java b/activemq-console/src/main/java/org/apache/activemq/console/util/AmqMessagesUtil.java index 5c54c9f08c..5bbcd49fe3 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/util/AmqMessagesUtil.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/util/AmqMessagesUtil.java @@ -18,11 +18,11 @@ package org.apache.activemq.console.util; import org.apache.activemq.console.filter.QueryFilter; import org.apache.activemq.console.filter.WildcardToMsgSelectorTransformFilter; -import org.apache.activemq.console.filter.MessagesQueryFilter; import org.apache.activemq.console.filter.PropertiesViewFilter; import org.apache.activemq.console.filter.StubQueryFilter; import org.apache.activemq.console.filter.MapTransformFilter; import org.apache.activemq.console.filter.GroupPropertiesViewFilter; +import org.apache.activemq.console.filter.AmqMessagesQueryFilter; import javax.jms.Destination; import java.net.URI; @@ -58,7 +58,7 @@ public class AmqMessagesUtil { public static QueryFilter createMessageQueryFilter(URI brokerUrl, Destination dest) { return new WildcardToMsgSelectorTransformFilter( - new MessagesQueryFilter(brokerUrl, dest) + new AmqMessagesQueryFilter(brokerUrl, dest) ); } } diff --git a/activemq-console/src/main/java/org/apache/activemq/console/util/JmxMBeansUtil.java b/activemq-console/src/main/java/org/apache/activemq/console/util/JmxMBeansUtil.java index 2555c59f07..36af15aa97 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/util/JmxMBeansUtil.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/util/JmxMBeansUtil.java @@ -16,16 +16,10 @@ */ package org.apache.activemq.console.util; -import org.apache.activemq.console.filter.QueryFilter; -import org.apache.activemq.console.filter.MBeansObjectNameQueryFilter; -import org.apache.activemq.console.filter.WildcardToRegExTransformFilter; -import org.apache.activemq.console.filter.MBeansRegExQueryFilter; -import org.apache.activemq.console.filter.MBeansAttributeQueryFilter; -import org.apache.activemq.console.filter.PropertiesViewFilter; -import org.apache.activemq.console.filter.StubQueryFilter; -import org.apache.activemq.console.filter.MapTransformFilter; +import org.apache.activemq.console.filter.*; import javax.management.remote.JMXServiceURL; +import javax.management.ObjectName; import java.util.Set; import java.util.List; import java.util.Iterator; @@ -113,4 +107,20 @@ public class JmxMBeansUtil { ) ); } + + public static QueryFilter createMessageQueryFilter(JMXServiceURL jmxUrl, ObjectName destName) { + return new WildcardToMsgSelectorTransformFilter( + new MessagesQueryFilter(jmxUrl, destName) + ); + } + + public static List filterMessagesView(List messages, Set groupViews, Set attributeViews) throws Exception { + return (new PropertiesViewFilter(attributeViews, + new GroupPropertiesViewFilter(groupViews, + new MapTransformFilter( + new StubQueryFilter(messages) + ) + ) + )).query(""); + } }