diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index 0de95c52bc..de5eaf2f3c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -40,10 +40,11 @@ public class AdvisorySupport { public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Queue."; public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic."; public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue."; - public static final String AGENT_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Agent."; + public static final String AGENT_TOPIC = "ActiveMQ.Agent"; public static final String ADIVSORY_MESSAGE_TYPE = "Advisory"; public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC); + private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC); public static ActiveMQTopic getConnectionAdvisoryTopic() { return CONNECTION_ADVISORY_TOPIC; @@ -171,7 +172,10 @@ public class AdvisorySupport { } } - public static Destination getAgentDestination(String brokerName) { - return new ActiveMQTopic(AGENT_TOPIC_PREFIX + brokerName); + /** + * Returns the agent topic which is used to send commands to the broker + */ + public static Destination getAgentDestination() { + return AGENT_TOPIC_DESTINATION; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java index 59bafb51b0..47bc6ca224 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java @@ -16,24 +16,30 @@ */ package org.apache.activemq.broker.util; -import org.apache.activemq.Service; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.Service; import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.FactoryBean; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; /** * An agent which listens to commands on a JMS destination * + * @version $Revision$ * @org.apache.xbean.XBean - * - * @version $Revision: $ */ -public class CommandAgent implements Service, BrokerServiceAware { +public class CommandAgent implements Service, InitializingBean, DisposableBean, FactoryBean { private static final Log log = LogFactory.getLog(CommandAgent.class); private String brokerUrl = "vm://localhost"; @@ -43,7 +49,7 @@ public class CommandAgent implements Service, BrokerServiceAware { private CommandMessageListener listener; private Session session; private MessageConsumer consumer; - private String brokerName = "default"; + public void start() throws Exception { session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -57,21 +63,63 @@ public class CommandAgent implements Service, BrokerServiceAware { } public void stop() throws Exception { - consumer.close(); - consumer = null; - session.close(); - session = null; - connection.close(); - connection = null; - } - - public void setBrokerService(BrokerService brokerService) { - String name = brokerService.getBrokerName(); - if (name != null) { - brokerName = name; + ServiceStopper stopper = new ServiceStopper(); + if (consumer != null) { + try { + consumer.close(); + consumer = null; + } + catch (JMSException e) { + stopper.onException(this, e); + } } + if (session != null) { + try { + session.close(); + session = null; + } + catch (JMSException e) { + stopper.onException(this, e); + } + } + if (connection != null) { + try { + connection.close(); + connection = null; + } + catch (JMSException e) { + stopper.onException(this, e); + } + } + stopper.throwFirstException(); } + // the following methods ensure that we are created on startup and the lifecycles respected + // TODO there must be a simpler way? + public void afterPropertiesSet() throws Exception { + start(); + } + + public void destroy() throws Exception { + stop(); + } + + public Object getObject() throws Exception { + return this; + } + + public Class getObjectType() { + return getClass(); + } + + public boolean isSingleton() { + return true; + } + + + + // Properties + //------------------------------------------------------------------------- public String getBrokerUrl() { return brokerUrl; } @@ -94,6 +142,7 @@ public class CommandAgent implements Service, BrokerServiceAware { public Connection getConnection() throws JMSException { if (connection == null) { connection = createConnection(); + connection.start(); } return connection; } @@ -117,8 +166,7 @@ public class CommandAgent implements Service, BrokerServiceAware { return getConnectionFactory().createConnection(); } - protected Destination createCommandDestination() { - return AdvisorySupport.getAgentDestination(brokerName); + return AdvisorySupport.getAgentDestination(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java index bc79801112..3a6546019c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java @@ -16,12 +16,21 @@ */ package org.apache.activemq.broker.util; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.util.FactoryFinder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import javax.jms.*; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; /** * @version $Revision: $ @@ -38,6 +47,9 @@ public class CommandMessageListener implements MessageListener { } public void onMessage(Message message) { + if (log.isDebugEnabled()) { + log.debug("Received command: " + message); + } if (message instanceof TextMessage) { TextMessage request = (TextMessage) message; try { @@ -48,7 +60,7 @@ public class CommandMessageListener implements MessageListener { } Message response = processCommand(request); addReplyHeaders(request, response); - + getProducer().send(replyTo, response); } catch (Exception e) { log.error("Failed to process message due to: " + e + ". Message: " + message, e); @@ -66,12 +78,26 @@ public class CommandMessageListener implements MessageListener { } } - protected Message processCommand(TextMessage request) throws Exception { + /** + * Processes an incoming JMS message returning the response message + */ + public Message processCommand(TextMessage request) throws Exception { TextMessage response = session.createTextMessage(); getHandler().processCommand(request, response); return response; } + /** + * Processes an incoming command from a console and returning the text to output + */ + public String processCommandText(String line) throws Exception { + TextMessage request = new ActiveMQTextMessage(); + request.setText(line); + TextMessage response = new ActiveMQTextMessage(); + getHandler().processCommand(request, response); + return response.getText(); + } + public Session getSession() { return session; }