diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerPlugin.java index fe67e3cdab..448fd6a390 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerPlugin.java @@ -27,6 +27,6 @@ public interface BrokerPlugin { * Installs the plugin into the interceptor chain of the broker, returning the new * intercepted broker to use. */ - public Broker installPlugin(Broker broker); + public Broker installPlugin(Broker broker) throws Exception; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index d27c461850..cbb4223645 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -917,7 +917,7 @@ public class BrokerService implements Service { * * @throws IOException */ - protected Broker addInterceptors(Broker broker) throws IOException { + protected Broker addInterceptors(Broker broker) throws Exception { broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); broker = new AdvisoryBroker(broker); broker = new CompositeDestinationBroker(broker); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java new file mode 100644 index 0000000000..be3acc9e57 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java @@ -0,0 +1,252 @@ +/* + * Copyright 2005-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.view; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.Connection; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.Connector; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.broker.jmx.SubscriptionViewMBean; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.filter.DestinationMap; +import org.apache.activemq.filter.DestinationMapNode; + +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import java.io.PrintWriter; +import java.util.*; + +/** + * + * @version $Revision: $ + */ +public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { + + protected static final String ID_SEPARATOR = "_"; + + private final boolean redrawOnRemove; + private String domain = "org.apache.activemq"; + private BrokerViewMBean brokerView; + private MBeanServer mbeanServer; + + public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) + throws MalformedObjectNameException { + super(next, file); + this.redrawOnRemove = redrawOnRemove; + + mbeanServer = new ManagementContext().getMBeanServer(); + ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost"); + brokerView = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, + BrokerViewMBean.class, true); + } + + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + Subscription answer = super.addConsumer(context, info); + generateFile(); + return answer; + } + + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { + super.addProducer(context, info); + generateFile(); + } + + public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + super.removeConsumer(context, info); + if (redrawOnRemove) { + generateFile(); + } + } + + public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { + super.removeProducer(context, info); + if (redrawOnRemove) { + generateFile(); + } + } + + protected void generateFile(PrintWriter writer) throws Exception { + + writer.println("digraph \"ActiveMQ Connections\" {"); + writer.println(); + writer.println("node [style = \"rounded,filled\", fillcolor = yellow, fontname=\"Helvetica-Oblique\"];"); + writer.println(); + + writer.println("broker [fillcolor = deepskyblue, label=\"ActiveMQ Broker\\n" + brokerView.getBrokerId() + "\"];"); + writer.println(); + + Map clients = new HashMap(); + Map destinations = new HashMap(); + printSubscribers(writer, clients, destinations, "queue_", brokerView.getQueueSubscribers()); + writer.println(); + + printSubscribers(writer, clients, destinations, "topic_", brokerView.getTopicSubscribers()); + writer.println(); + + // lets print the broker links + for (Iterator iter = clients.keySet().iterator(); iter.hasNext();) { + String clientId = (String) iter.next(); + writer.print(clientId); + writer.println(" -> broker"); + } + writer.println(); + + writeLabels(writer, "green", "Client: ", clients); + writer.println(); + + writeLabels(writer, "red", "Queue: ", destinations); + writer.println("}"); + } + + protected void writeLabels(PrintWriter writer, String color, String prefix, Map map) { + for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = (Map.Entry) iter.next(); + String id = (String) entry.getKey(); + String label = (String) entry.getValue(); + + writer.print(id); + writer.print(" [ fillcolor = "); + writer.print(color); + writer.print(", label = \""); + writer.print(prefix); + writer.print(label); + writer.println("\"];"); + } + } + + protected void printSubscribers(PrintWriter writer, Map clients, Map destinations, String type, + ObjectName[] subscribers) { + for (int i = 0; i < subscribers.length; i++) { + ObjectName name = subscribers[i]; + SubscriptionViewMBean subscriber = (SubscriptionViewMBean) MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, name, SubscriptionViewMBean.class, true); + + String clientId = subscriber.getClientId(); + String destination = subscriber.getDestinationName(); + String selector = subscriber.getSelector(); + + String safeClientId = asID(clientId); + clients.put(safeClientId, clientId); + + String safeDestinationId = type + asID(destination); + destinations.put(safeDestinationId, destination); + + // lets write out the links + + String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubcriptionId(); + + writer.print(safeClientId); + writer.print(" -> "); + writer.print(subscriberId); + writer.println(";"); + + writer.print(safeDestinationId); + writer.print(" -> "); + writer.print(subscriberId); + writer.println(";"); + + // now lets write out the label + writer.print(subscriberId); + writer.print(" [label = \""); + String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubcriptionId(); + if (selector != null && selector.length() > 0) { + label = label + "\\nSelector: " + selector; + } + writer.print(label); + writer.println("\"];"); + } + } + + /** + * Lets strip out any non supported characters + */ + protected String asID(String name) { + StringBuffer buffer = new StringBuffer(); + for (int i = 0, size = name.length(); i < size; i++) { + char ch = name.charAt(i); + if (Character.isLetterOrDigit(ch) || ch == '_') { + buffer.append(ch); + } + else { + buffer.append('_'); + } + } + return buffer.toString(); + } + + protected void printNodes(PrintWriter writer, DestinationMapNode node, String prefix) { + String path = getPath(node); + writer.print(" "); + writer.print(prefix); + writer.print(ID_SEPARATOR); + writer.print(path); + String label = path; + if (prefix.equals("topic")) { + label = "Topics"; + } + else if (prefix.equals("queue")) { + label = "Queues"; + } + writer.print("[ label = \""); + writer.print(label); + writer.println("\" ];"); + + Collection children = node.getChildren(); + for (Iterator iter = children.iterator(); iter.hasNext();) { + DestinationMapNode child = (DestinationMapNode) iter.next(); + printNodes(writer, child, prefix + ID_SEPARATOR + path); + } + } + + protected void printNodeLinks(PrintWriter writer, DestinationMapNode node, String prefix) { + String path = getPath(node); + Collection children = node.getChildren(); + for (Iterator iter = children.iterator(); iter.hasNext();) { + DestinationMapNode child = (DestinationMapNode) iter.next(); + + writer.print(" "); + writer.print(prefix); + writer.print(ID_SEPARATOR); + writer.print(path); + writer.print(" -> "); + writer.print(prefix); + writer.print(ID_SEPARATOR); + writer.print(path); + writer.print(ID_SEPARATOR); + writer.print(getPath(child)); + writer.println(";"); + + printNodeLinks(writer, child, prefix + ID_SEPARATOR + path); + } + } + + protected String getPath(DestinationMapNode node) { + String path = node.getPath(); + if (path.equals("*")) { + return "root"; + } + return path; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFilePlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFilePlugin.java new file mode 100644 index 0000000000..113ad7ba8c --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFilePlugin.java @@ -0,0 +1,46 @@ +/* + * Copyright 2005-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.view; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; + +/** + * A DOT file creator plugin which + * creates a DOT file showing the current connections + * + * @version $Revision: $ + */ +public class ConnectionDotFilePlugin implements BrokerPlugin { + private String file = "ActiveMQConnections.dot"; + private boolean redrawOnRemove; + + public Broker installPlugin(Broker broker) throws Exception { + return new ConnectionDotFileInterceptor(broker, file, redrawOnRemove); + } + + public String getFile() { + return file; + } + + /** + * Sets the destination file name to create the destination diagram + */ + public void setFile(String file) { + this.file = file; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java index 63a08766f8..40f860be08 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java @@ -16,18 +16,12 @@ package org.apache.activemq.broker.view; import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.filter.DestinationMapNode; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import java.io.FileWriter; import java.io.PrintWriter; import java.util.Collection; import java.util.Iterator; @@ -39,11 +33,9 @@ import java.util.Iterator; public class DestinationDotFileInterceptor extends DotFileInterceptorSupport { protected static final String ID_SEPARATOR = "_"; - public DestinationDotFileInterceptor(Broker next, String file) { super(next, file); - } public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/test/Main.java b/activemq-core/src/test/java/org/apache/activemq/test/Main.java index fb464f327a..7e24a2d4d4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/test/Main.java +++ b/activemq-core/src/test/java/org/apache/activemq/test/Main.java @@ -16,14 +16,17 @@ */ package org.apache.activemq.test; -import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.view.DestinationDotFileInterceptor; +import org.apache.activemq.broker.view.ConnectionDotFilePlugin; import org.apache.activemq.broker.view.DestinationDotFilePlugin; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.demo.DefaultQueueSender; -import java.net.URI; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; /** * A helper class which can be handy for running a broker in your IDE from the @@ -42,18 +45,29 @@ public class Main { brokerURI = args[0]; } try { - //BrokerService broker = BrokerFactory.createBroker(new URI(brokerURI)); + // TODO - this seems to break interceptors for some reason + // BrokerService broker = BrokerFactory.createBroker(new + // URI(brokerURI)); BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(true); - broker.setPlugins(new BrokerPlugin[] { new DestinationDotFilePlugin() }); + broker.setPlugins(new BrokerPlugin[] { /*new DestinationDotFilePlugin(), */ new ConnectionDotFilePlugin() }); broker.addConnector("tcp://localhost:61616"); broker.addConnector("stomp://localhost:61613"); broker.start(); - + + // lets create a dummy couple of consumers + Connection connection = new ActiveMQConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session.createConsumer(new ActiveMQQueue("Orders.IBM")); + MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 100"); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer3 = session2.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 200"); + // lets publish some messages so that there is some stuff to browse - DefaultQueueSender.main(new String[] {"Prices.Equity.IBM"}); - DefaultQueueSender.main(new String[] {"Prices.Equity.MSFT"}); + DefaultQueueSender.main(new String[] { "Prices.Equity.IBM" }); + DefaultQueueSender.main(new String[] { "Prices.Equity.MSFT" }); } catch (Exception e) { System.out.println("Failed: " + e);