added a simple command agent that can be used to send messages to a destination (ActiveMQ.Agent) to interact with the broker via the activemq-console commands (list, query, browse etc)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@475210 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-11-15 12:02:34 +00:00
parent f3105202ef
commit 940850910d
3 changed files with 106 additions and 28 deletions

View File

@ -40,10 +40,11 @@ public class AdvisorySupport {
public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Queue.";
public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic.";
public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue.";
public static final String AGENT_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Agent.";
public static final String AGENT_TOPIC = "ActiveMQ.Agent";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC);
private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
public static ActiveMQTopic getConnectionAdvisoryTopic() {
return CONNECTION_ADVISORY_TOPIC;
@ -171,7 +172,10 @@ public class AdvisorySupport {
}
}
public static Destination getAgentDestination(String brokerName) {
return new ActiveMQTopic(AGENT_TOPIC_PREFIX + brokerName);
/**
* Returns the agent topic which is used to send commands to the broker
*/
public static Destination getAgentDestination() {
return AGENT_TOPIC_DESTINATION;
}
}

View File

@ -16,24 +16,30 @@
*/
package org.apache.activemq.broker.util;
import org.apache.activemq.Service;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.FactoryBean;
import javax.jms.*;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
/**
* An agent which listens to commands on a JMS destination
*
* @version $Revision$
* @org.apache.xbean.XBean
*
* @version $Revision: $
*/
public class CommandAgent implements Service, BrokerServiceAware {
public class CommandAgent implements Service, InitializingBean, DisposableBean, FactoryBean {
private static final Log log = LogFactory.getLog(CommandAgent.class);
private String brokerUrl = "vm://localhost";
@ -43,7 +49,7 @@ public class CommandAgent implements Service, BrokerServiceAware {
private CommandMessageListener listener;
private Session session;
private MessageConsumer consumer;
private String brokerName = "default";
public void start() throws Exception {
session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -57,21 +63,63 @@ public class CommandAgent implements Service, BrokerServiceAware {
}
public void stop() throws Exception {
consumer.close();
consumer = null;
session.close();
session = null;
connection.close();
connection = null;
}
public void setBrokerService(BrokerService brokerService) {
String name = brokerService.getBrokerName();
if (name != null) {
brokerName = name;
ServiceStopper stopper = new ServiceStopper();
if (consumer != null) {
try {
consumer.close();
consumer = null;
}
catch (JMSException e) {
stopper.onException(this, e);
}
}
if (session != null) {
try {
session.close();
session = null;
}
catch (JMSException e) {
stopper.onException(this, e);
}
}
if (connection != null) {
try {
connection.close();
connection = null;
}
catch (JMSException e) {
stopper.onException(this, e);
}
}
stopper.throwFirstException();
}
// the following methods ensure that we are created on startup and the lifecycles respected
// TODO there must be a simpler way?
public void afterPropertiesSet() throws Exception {
start();
}
public void destroy() throws Exception {
stop();
}
public Object getObject() throws Exception {
return this;
}
public Class getObjectType() {
return getClass();
}
public boolean isSingleton() {
return true;
}
// Properties
//-------------------------------------------------------------------------
public String getBrokerUrl() {
return brokerUrl;
}
@ -94,6 +142,7 @@ public class CommandAgent implements Service, BrokerServiceAware {
public Connection getConnection() throws JMSException {
if (connection == null) {
connection = createConnection();
connection.start();
}
return connection;
}
@ -117,8 +166,7 @@ public class CommandAgent implements Service, BrokerServiceAware {
return getConnectionFactory().createConnection();
}
protected Destination createCommandDestination() {
return AdvisorySupport.getAgentDestination(brokerName);
return AdvisorySupport.getAgentDestination();
}
}

View File

@ -16,12 +16,21 @@
*/
package org.apache.activemq.broker.util;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.FactoryFinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* @version $Revision: $
@ -38,6 +47,9 @@ public class CommandMessageListener implements MessageListener {
}
public void onMessage(Message message) {
if (log.isDebugEnabled()) {
log.debug("Received command: " + message);
}
if (message instanceof TextMessage) {
TextMessage request = (TextMessage) message;
try {
@ -48,7 +60,7 @@ public class CommandMessageListener implements MessageListener {
}
Message response = processCommand(request);
addReplyHeaders(request, response);
getProducer().send(replyTo, response);
}
catch (Exception e) {
log.error("Failed to process message due to: " + e + ". Message: " + message, e);
@ -66,12 +78,26 @@ public class CommandMessageListener implements MessageListener {
}
}
protected Message processCommand(TextMessage request) throws Exception {
/**
* Processes an incoming JMS message returning the response message
*/
public Message processCommand(TextMessage request) throws Exception {
TextMessage response = session.createTextMessage();
getHandler().processCommand(request, response);
return response;
}
/**
* Processes an incoming command from a console and returning the text to output
*/
public String processCommandText(String line) throws Exception {
TextMessage request = new ActiveMQTextMessage();
request.setText(line);
TextMessage response = new ActiveMQTextMessage();
getHandler().processCommand(request, response);
return response.getText();
}
public Session getSession() {
return session;
}