added support for a simple message based command agent so that you can send management commands to the broker over JMS

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@469135 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-10-30 12:17:14 +00:00
parent d9ad0437ff
commit 49e10d36fc
5 changed files with 259 additions and 1 deletions

View File

@ -20,6 +20,8 @@ package org.apache.activemq.advisory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.Destination;
public class AdvisorySupport {
public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
@ -38,6 +40,8 @@ public class AdvisorySupport {
public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Queue.";
public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic.";
public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue.";
public static final String AGENT_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Agent.";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC);
@ -167,4 +171,7 @@ public class AdvisorySupport {
}
}
public static Destination getAgentDestination(String brokerName) {
return new ActiveMQTopic(AGENT_TOPIC_PREFIX + brokerName);
}
}

View File

@ -51,6 +51,7 @@ import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessag
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.broker.util.CommandAgent;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.kaha.Store;
@ -118,7 +119,7 @@ public class BrokerService implements Service, Serializable {
private List proxyConnectors = new CopyOnWriteArrayList();
private List registeredMBeanNames = new CopyOnWriteArrayList();
private List jmsConnectors = new CopyOnWriteArrayList();
private Service[] services;
private Service[] services = new Service[] { new CommandAgent() };
private MasterConnector masterConnector;
private String masterConnectorURI;
private transient Thread shutdownHook;

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import org.apache.activemq.Service;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
/**
* An agent which listens to commands on a JMS destination
*
* @org.apache.xbean.XBean
*
* @version $Revision: $
*/
public class CommandAgent implements Service, BrokerServiceAware {
private static final Log log = LogFactory.getLog(CommandAgent.class);
private String brokerUrl = "vm://localhost";
private ConnectionFactory connectionFactory;
private Connection connection;
private Destination commandDestination;
private CommandMessageListener listener;
private Session session;
private MessageConsumer consumer;
private String brokerName = "default";
public void start() throws Exception {
session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
listener = new CommandMessageListener(session);
Destination destination = getCommandDestination();
if (log.isDebugEnabled()) {
log.debug("Agent subscribing to control destination: " + destination);
}
consumer = session.createConsumer(destination);
consumer.setMessageListener(listener);
}
public void stop() throws Exception {
consumer.close();
consumer = null;
session.close();
session = null;
connection.close();
connection = null;
}
public void setBrokerService(BrokerService brokerService) {
String name = brokerService.getBrokerName();
if (name != null) {
brokerName = name;
}
}
public String getBrokerUrl() {
return brokerUrl;
}
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
}
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public Connection getConnection() throws JMSException {
if (connection == null) {
connection = createConnection();
}
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public Destination getCommandDestination() {
if (commandDestination == null) {
commandDestination = createCommandDestination();
}
return commandDestination;
}
public void setCommandDestination(Destination commandDestination) {
this.commandDestination = commandDestination;
}
protected Connection createConnection() throws JMSException {
return getConnectionFactory().createConnection();
}
protected Destination createCommandDestination() {
return AdvisorySupport.getAgentDestination(brokerName);
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import javax.jms.TextMessage;
import javax.jms.JMSException;
/**
* Represents a processor of text based commands
*
* @version $Revision: $
*/
public interface CommandHandler {
void processCommand(TextMessage request, TextMessage response) throws Exception;
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import org.apache.activemq.util.FactoryFinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import java.io.IOException;
/**
* @version $Revision: $
*/
public class CommandMessageListener implements MessageListener {
private static final Log log = LogFactory.getLog(CommandMessageListener.class);
private Session session;
private MessageProducer producer;
private CommandHandler handler;
public CommandMessageListener(Session session) {
this.session = session;
}
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage request = (TextMessage) message;
try {
Destination replyTo = message.getJMSReplyTo();
if (replyTo == null) {
log.warn("Ignored message as no JMSReplyTo set: " + message);
return;
}
Message response = processCommand(request);
addReplyHeaders(request, response);
}
catch (Exception e) {
log.error("Failed to process message due to: " + e + ". Message: " + message, e);
}
}
else {
log.warn("Ignoring invalid message: " + message);
}
}
protected void addReplyHeaders(TextMessage request, Message response) throws JMSException {
String correlationID = request.getJMSCorrelationID();
if (correlationID != null) {
response.setJMSCorrelationID(correlationID);
}
}
protected Message processCommand(TextMessage request) throws Exception {
TextMessage response = session.createTextMessage();
getHandler().processCommand(request, response);
return response;
}
public Session getSession() {
return session;
}
public MessageProducer getProducer() throws JMSException {
if (producer == null) {
producer = getSession().createProducer(null);
}
return producer;
}
public CommandHandler getHandler() throws IllegalAccessException, IOException, InstantiationException, ClassNotFoundException {
if (handler == null) {
handler = createHandler();
}
return handler;
}
private CommandHandler createHandler() throws IllegalAccessException, IOException, ClassNotFoundException, InstantiationException {
FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
return (CommandHandler) factoryFinder.newInstance("agent");
}
}