This commit is contained in:
Dejan Bosanac 2015-09-04 15:50:52 +02:00
parent c7b93d1232
commit 73f9131a62
10 changed files with 138 additions and 181 deletions

View File

@ -20,6 +20,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.util.List;
import org.apache.activemq.ActiveMQConnectionMetaData;
@ -159,4 +160,20 @@ public abstract class AbstractCommand implements Command {
}
}
}
protected void handleException(Throwable exception, String serviceUrl) {
Throwable cause = exception.getCause();
while (true) {
Throwable next = cause.getCause();
if (next == null) {
break;
}
cause = next;
}
if (cause instanceof ConnectException) {
context.printInfo("Broker not available at: " + serviceUrl);
} else {
context.printException(new RuntimeException("Failed to execute " + getName() + " task. Reason: " + exception));
}
}
}

View File

@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
@ -385,7 +386,10 @@ public abstract class AbstractJmxCommand extends AbstractCommand {
public void execute(List<String> tokens) throws Exception {
try {
super.execute(tokens);
} finally {
} catch (Throwable exception) {
handleException(exception, jmxServiceUrl.toString());
return;
}finally {
closeJmxConnection();
}
}

View File

@ -25,6 +25,7 @@ import java.util.StringTokenizer;
import javax.jms.Destination;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.console.util.AmqMessagesUtil;
@ -39,11 +40,11 @@ public class AmqBrowseCommand extends AbstractAmqCommand {
public static final String VIEW_GROUP_BODY = "body:";
protected String[] helpFile = new String[] {
"Task Usage: Main browse --amqurl <broker url> [browse-options] <destinations>",
"Task Usage: Main browse [browse-options] <destinations>",
"Description: Display selected destination's messages.",
"",
"Browse Options:",
" --amqurl <url> Set the broker URL to connect to.",
" --amqurl <url> Set the broker URL to connect to. Default tcp://localhost:61616",
" --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to",
" the messages selector format.",
" --factory <className> Load className as the javax.jms.ConnectionFactory to use for creating connections.",
@ -116,8 +117,7 @@ public class AmqBrowseCommand extends AbstractAmqCommand {
// If no broker url specified
if (getBrokerUrl() == null) {
context.printException(new IllegalStateException("No broker url specified. Use the --amqurl option to specify a broker url."));
return;
setBrokerUrl(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
}
// Display the messages for each destination
@ -151,9 +151,8 @@ public class AmqBrowseCommand extends AbstractAmqCommand {
context.printMessage(AmqMessagesUtil.filterMessagesView(addMsgs, groupViews, queryViews));
}
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute browse task. Reason: " + e));
throw new Exception(e);
} catch (Exception exception) {
handleException(exception, getBrokerUrl().toString());
}
}

View File

@ -97,25 +97,20 @@ public class BrowseCommand extends AbstractJmxCommand {
* @throws Exception
*/
protected void runTask(List<String> tokens) throws Exception {
try {
// If there is no queue name specified, let's select all
if (tokens.isEmpty()) {
tokens.add("*");
}
// If there is no queue name specified, let's select all
if (tokens.isEmpty()) {
tokens.add("*");
}
// Iterate through the queue names
for (Iterator<String> i = tokens.iterator(); i.hasNext();) {
List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "Type=Queue,Destination=" + i.next() + ",*");
// Iterate through the queue names
for (Iterator<String> i = tokens.iterator(); i.hasNext(); ) {
List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "Type=Queue,Destination=" + i.next() + ",*");
// Iterate through the queue result
for (Iterator j = queueList.iterator(); j.hasNext();) {
List messages = JmxMBeansUtil.createMessageQueryFilter(createJmxConnection(), ((ObjectInstance)j.next()).getObjectName()).query(queryAddObjects);
context.printMessage(JmxMBeansUtil.filterMessagesView(messages, groupViews, queryViews));
}
// Iterate through the queue result
for (Iterator j = queueList.iterator(); j.hasNext(); ) {
List messages = JmxMBeansUtil.createMessageQueryFilter(createJmxConnection(), ((ObjectInstance) j.next()).getObjectName()).query(queryAddObjects);
context.printMessage(JmxMBeansUtil.filterMessagesView(messages, groupViews, queryViews));
}
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute browse task. Reason: " + e));
throw new Exception(e);
}
}

View File

@ -66,20 +66,12 @@ public class DstatCommand extends AbstractJmxCommand {
*/
@Override
protected void runTask(List<String> tokens) throws Exception {
try {
if (tokens.contains("topics")) {
displayTopicStats();
} else if (tokens.contains("queues")) {
displayQueueStats();
} else {
displayAllDestinations();
}
// Iterate through the queue names
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute dstat task. Reason: " + e.getMessage(), e));
throw new Exception(e);
if (tokens.contains("topics")) {
displayTopicStats();
} else if (tokens.contains("queues")) {
displayQueueStats();
} else {
displayAllDestinations();
}
}

View File

@ -55,14 +55,9 @@ public class ListCommand extends AbstractJmxCommand {
* @throws Exception
*/
protected void runTask(List tokens) throws Exception {
try {
Set<String> propsView = new HashSet<String>();
propsView.add("brokerName");
context.printMBean(JmxMBeansUtil.filterMBeansView(JmxMBeansUtil.getAllBrokers(createJmxConnection()), propsView));
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute list task. Reason: " + e));
throw new Exception(e);
}
Set<String> propsView = new HashSet<String>();
propsView.add("brokerName");
context.printMBean(JmxMBeansUtil.filterMBeansView(JmxMBeansUtil.getAllBrokers(createJmxConnection()), propsView));
}
/**

View File

@ -83,61 +83,56 @@ public class PurgeCommand extends AbstractJmxCommand {
*/
@Override
protected void runTask(List<String> tokens) throws Exception {
try {
// If there is no queue name specified, let's select all
if (tokens.isEmpty()) {
tokens.add("*");
}
// If there is no queue name specified, let's select all
if (tokens.isEmpty()) {
tokens.add("*");
}
// Iterate through the queue names
for (Iterator<String> i = tokens.iterator(); i.hasNext();) {
List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + i.next());
// Iterate through the queue names
for (Iterator<String> i = tokens.iterator(); i.hasNext(); ) {
List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + i.next());
for (Iterator j = queueList.iterator(); j.hasNext();) {
ObjectName queueName = ((ObjectInstance)j.next()).getObjectName();
if (queryAddObjects.isEmpty()) {
purgeQueue(queueName);
for (Iterator j = queueList.iterator(); j.hasNext(); ) {
ObjectName queueName = ((ObjectInstance) j.next()).getObjectName();
if (queryAddObjects.isEmpty()) {
purgeQueue(queueName);
} else {
QueueViewMBean proxy = MBeanServerInvocationHandler.
newProxyInstance(createJmxConnection(),
queueName,
QueueViewMBean.class,
true);
int removed = 0;
// AMQ-3404: We support two syntaxes for the message
// selector query:
// 1) AMQ specific:
// "JMSPriority>2,MyHeader='Foo'"
//
// 2) SQL-92 syntax:
// "(JMSPriority>2) AND (MyHeader='Foo')"
//
// If syntax style 1) is used, the comma separated
// criterias are broken into List<String> elements.
// We then need to construct the SQL-92 query out of
// this list.
String sqlQuery = null;
if (queryAddObjects.size() > 1) {
sqlQuery = convertToSQL92(queryAddObjects);
} else {
sqlQuery = queryAddObjects.get(0);
}
removed = proxy.removeMatchingMessages(sqlQuery);
context.printInfo("Removed: " + removed
+ " messages for message selector " + sqlQuery.toString());
QueueViewMBean proxy = MBeanServerInvocationHandler.
newProxyInstance(createJmxConnection(),
queueName,
QueueViewMBean.class,
true);
int removed = 0;
// AMQ-3404: We support two syntaxes for the message
// selector query:
// 1) AMQ specific:
// "JMSPriority>2,MyHeader='Foo'"
//
// 2) SQL-92 syntax:
// "(JMSPriority>2) AND (MyHeader='Foo')"
//
// If syntax style 1) is used, the comma separated
// criterias are broken into List<String> elements.
// We then need to construct the SQL-92 query out of
// this list.
String sqlQuery = null;
if (queryAddObjects.size() > 1) {
sqlQuery = convertToSQL92(queryAddObjects);
} else {
sqlQuery = queryAddObjects.get(0);
}
removed = proxy.removeMatchingMessages(sqlQuery);
context.printInfo("Removed: " + removed
+ " messages for message selector " + sqlQuery.toString());
if (resetStatistics) {
proxy.resetStatistics();
}
if (resetStatistics) {
proxy.resetStatistics();
}
}
}
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute purge task. Reason: " + e));
throw new Exception(e);
}
}

View File

@ -116,23 +116,18 @@ public class QueryCommand extends AbstractJmxCommand {
* @throws Exception
*/
protected void runTask(List<String> tokens) throws Exception {
try {
// Query for the mbeans to add
Map<Object,List> addMBeans = JmxMBeansUtil.queryMBeansAsMap(createJmxConnection(), queryAddObjects, queryViews);
// Query for the mbeans to sub
if (querySubObjects.size() > 0) {
Map<Object,List> subMBeans = JmxMBeansUtil.queryMBeansAsMap(createJmxConnection(), querySubObjects, queryViews);
addMBeans.keySet().removeAll(subMBeans.keySet());
}
// Query for the mbeans to add
Map<Object, List> addMBeans = JmxMBeansUtil.queryMBeansAsMap(createJmxConnection(), queryAddObjects, queryViews);
// Query for the mbeans to sub
if (querySubObjects.size() > 0) {
Map<Object, List> subMBeans = JmxMBeansUtil.queryMBeansAsMap(createJmxConnection(), querySubObjects, queryViews);
addMBeans.keySet().removeAll(subMBeans.keySet());
}
if (opAndParams.isEmpty()) {
context.printMBean(JmxMBeansUtil.filterMBeansView(new ArrayList(addMBeans.values()), queryViews));
} else {
context.print(doInvoke(addMBeans.keySet(), opAndParams));
}
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute query task. Reason: " + e));
throw new Exception(e);
if (opAndParams.isEmpty()) {
context.printMBean(JmxMBeansUtil.filterMBeansView(new ArrayList(addMBeans.values()), queryViews));
} else {
context.print(doInvoke(addMBeans.keySet(), opAndParams));
}
}

View File

@ -69,53 +69,48 @@ public class ShutdownCommand extends AbstractJmxCommand {
* @throws Exception
*/
protected void runTask(List brokerNames) throws Exception {
try {
Collection mbeans;
Collection mbeans;
// Stop all brokers
if (isStopAllBrokers) {
mbeans = JmxMBeansUtil.getAllBrokers(createJmxConnection());
brokerNames.clear();
} else if (brokerNames.isEmpty()) {
// Stop the default broker
mbeans = JmxMBeansUtil.getAllBrokers(createJmxConnection());
// If there is no broker to stop
if (mbeans.isEmpty()) {
context.printInfo("There are no brokers to stop.");
return;
// Stop all brokers
if (isStopAllBrokers) {
mbeans = JmxMBeansUtil.getAllBrokers(createJmxConnection());
brokerNames.clear();
} else if (brokerNames.isEmpty()) {
// Stop the default broker
mbeans = JmxMBeansUtil.getAllBrokers(createJmxConnection());
// If there is no broker to stop
if (mbeans.isEmpty()) {
context.printInfo("There are no brokers to stop.");
return;
// There should only be one broker to stop
} else if (mbeans.size() > 1) {
context.printInfo("There are multiple brokers to stop. Please select the broker(s) to stop or use --all to stop all brokers.");
return;
// There should only be one broker to stop
} else if (mbeans.size() > 1) {
context.printInfo("There are multiple brokers to stop. Please select the broker(s) to stop or use --all to stop all brokers.");
return;
// Get the first broker only
} else {
Object firstBroker = mbeans.iterator().next();
mbeans.clear();
mbeans.add(firstBroker);
}
// Get the first broker only
} else {
// Stop each specified broker
String brokerName;
mbeans = new HashSet();
while (!brokerNames.isEmpty()) {
brokerName = (String)brokerNames.remove(0);
Collection matchedBrokers = JmxMBeansUtil.getBrokersByName(createJmxConnection(), brokerName);
if (matchedBrokers.isEmpty()) {
context.printInfo(brokerName + " did not match any running brokers.");
} else {
mbeans.addAll(matchedBrokers);
}
Object firstBroker = mbeans.iterator().next();
mbeans.clear();
mbeans.add(firstBroker);
}
} else {
// Stop each specified broker
String brokerName;
mbeans = new HashSet();
while (!brokerNames.isEmpty()) {
brokerName = (String) brokerNames.remove(0);
Collection matchedBrokers = JmxMBeansUtil.getBrokersByName(createJmxConnection(), brokerName);
if (matchedBrokers.isEmpty()) {
context.printInfo(brokerName + " did not match any running brokers.");
} else {
mbeans.addAll(matchedBrokers);
}
}
// Stop all brokers in set
stopBrokers(createJmxConnection(), mbeans);
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute stop task. Reason: " + e));
throw new Exception(e);
}
// Stop all brokers in set
stopBrokers(createJmxConnection(), mbeans);
}
/**

View File

@ -657,37 +657,7 @@ case "$1" in
invoke_kill
exit $?
;;
decrypt|encrypt|create)
*)
invoke_task checknotforrunning
exit $?
;;
export)
invoke_task checkfornotrunning
exit $?
;;
query|bstat|dstat|purge)
# Only check for a running broker if "--jmxurl" is part of the COMMANDLINE_ARGS
if (echo "$COMMANDLINE_ARGS"|grep -q -- "--jmxurl");then
invoke_task checknotforrunning
RET="$?"
else
invoke_task checkforrunning
RET="$?"
fi
exit $RET
;;
browse)
# Only check for a running broker if "--amqurl" is part of the COMMANDLINE_ARGS
if (echo "$COMMANDLINE_ARGS"|grep -q -- "--amqurl");then
invoke_task checknotforrunning
RET="$?"
else
invoke_task checkforrunning
RET="$?"
fi
exit $RET
;;
*)
invoke_task checkforrunning
exit $?
esac