added extra visualisation plugin; to render the broker, connections, sessions, consumers in a DOT graph

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389519 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-28 15:27:55 +00:00
parent d754e76c50
commit d1a18f5787
6 changed files with 322 additions and 18 deletions

View File

@ -27,6 +27,6 @@ public interface BrokerPlugin {
* Installs the plugin into the interceptor chain of the broker, returning the new * Installs the plugin into the interceptor chain of the broker, returning the new
* intercepted broker to use. * intercepted broker to use.
*/ */
public Broker installPlugin(Broker broker); public Broker installPlugin(Broker broker) throws Exception;
} }

View File

@ -917,7 +917,7 @@ public class BrokerService implements Service {
* *
* @throws IOException * @throws IOException
*/ */
protected Broker addInterceptors(Broker broker) throws IOException { protected Broker addInterceptors(Broker broker) throws Exception {
broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
broker = new AdvisoryBroker(broker); broker = new AdvisoryBroker(broker);
broker = new CompositeDestinationBroker(broker); broker = new CompositeDestinationBroker(broker);

View File

@ -0,0 +1,252 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.view;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.filter.DestinationMapNode;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.io.PrintWriter;
import java.util.*;
/**
*
* @version $Revision: $
*/
public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
protected static final String ID_SEPARATOR = "_";
private final boolean redrawOnRemove;
private String domain = "org.apache.activemq";
private BrokerViewMBean brokerView;
private MBeanServer mbeanServer;
public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove)
throws MalformedObjectNameException {
super(next, file);
this.redrawOnRemove = redrawOnRemove;
mbeanServer = new ManagementContext().getMBeanServer();
ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
brokerView = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName,
BrokerViewMBean.class, true);
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = super.addConsumer(context, info);
generateFile();
return answer;
}
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.addProducer(context, info);
generateFile();
}
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
super.removeConsumer(context, info);
if (redrawOnRemove) {
generateFile();
}
}
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info);
if (redrawOnRemove) {
generateFile();
}
}
protected void generateFile(PrintWriter writer) throws Exception {
writer.println("digraph \"ActiveMQ Connections\" {");
writer.println();
writer.println("node [style = \"rounded,filled\", fillcolor = yellow, fontname=\"Helvetica-Oblique\"];");
writer.println();
writer.println("broker [fillcolor = deepskyblue, label=\"ActiveMQ Broker\\n" + brokerView.getBrokerId() + "\"];");
writer.println();
Map clients = new HashMap();
Map destinations = new HashMap();
printSubscribers(writer, clients, destinations, "queue_", brokerView.getQueueSubscribers());
writer.println();
printSubscribers(writer, clients, destinations, "topic_", brokerView.getTopicSubscribers());
writer.println();
// lets print the broker links
for (Iterator iter = clients.keySet().iterator(); iter.hasNext();) {
String clientId = (String) iter.next();
writer.print(clientId);
writer.println(" -> broker");
}
writer.println();
writeLabels(writer, "green", "Client: ", clients);
writer.println();
writeLabels(writer, "red", "Queue: ", destinations);
writer.println("}");
}
protected void writeLabels(PrintWriter writer, String color, String prefix, Map map) {
for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
String id = (String) entry.getKey();
String label = (String) entry.getValue();
writer.print(id);
writer.print(" [ fillcolor = ");
writer.print(color);
writer.print(", label = \"");
writer.print(prefix);
writer.print(label);
writer.println("\"];");
}
}
protected void printSubscribers(PrintWriter writer, Map clients, Map destinations, String type,
ObjectName[] subscribers) {
for (int i = 0; i < subscribers.length; i++) {
ObjectName name = subscribers[i];
SubscriptionViewMBean subscriber = (SubscriptionViewMBean) MBeanServerInvocationHandler.newProxyInstance(
mbeanServer, name, SubscriptionViewMBean.class, true);
String clientId = subscriber.getClientId();
String destination = subscriber.getDestinationName();
String selector = subscriber.getSelector();
String safeClientId = asID(clientId);
clients.put(safeClientId, clientId);
String safeDestinationId = type + asID(destination);
destinations.put(safeDestinationId, destination);
// lets write out the links
String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubcriptionId();
writer.print(safeClientId);
writer.print(" -> ");
writer.print(subscriberId);
writer.println(";");
writer.print(safeDestinationId);
writer.print(" -> ");
writer.print(subscriberId);
writer.println(";");
// now lets write out the label
writer.print(subscriberId);
writer.print(" [label = \"");
String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubcriptionId();
if (selector != null && selector.length() > 0) {
label = label + "\\nSelector: " + selector;
}
writer.print(label);
writer.println("\"];");
}
}
/**
* Lets strip out any non supported characters
*/
protected String asID(String name) {
StringBuffer buffer = new StringBuffer();
for (int i = 0, size = name.length(); i < size; i++) {
char ch = name.charAt(i);
if (Character.isLetterOrDigit(ch) || ch == '_') {
buffer.append(ch);
}
else {
buffer.append('_');
}
}
return buffer.toString();
}
protected void printNodes(PrintWriter writer, DestinationMapNode node, String prefix) {
String path = getPath(node);
writer.print(" ");
writer.print(prefix);
writer.print(ID_SEPARATOR);
writer.print(path);
String label = path;
if (prefix.equals("topic")) {
label = "Topics";
}
else if (prefix.equals("queue")) {
label = "Queues";
}
writer.print("[ label = \"");
writer.print(label);
writer.println("\" ];");
Collection children = node.getChildren();
for (Iterator iter = children.iterator(); iter.hasNext();) {
DestinationMapNode child = (DestinationMapNode) iter.next();
printNodes(writer, child, prefix + ID_SEPARATOR + path);
}
}
protected void printNodeLinks(PrintWriter writer, DestinationMapNode node, String prefix) {
String path = getPath(node);
Collection children = node.getChildren();
for (Iterator iter = children.iterator(); iter.hasNext();) {
DestinationMapNode child = (DestinationMapNode) iter.next();
writer.print(" ");
writer.print(prefix);
writer.print(ID_SEPARATOR);
writer.print(path);
writer.print(" -> ");
writer.print(prefix);
writer.print(ID_SEPARATOR);
writer.print(path);
writer.print(ID_SEPARATOR);
writer.print(getPath(child));
writer.println(";");
printNodeLinks(writer, child, prefix + ID_SEPARATOR + path);
}
}
protected String getPath(DestinationMapNode node) {
String path = node.getPath();
if (path.equals("*")) {
return "root";
}
return path;
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.view;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
/**
* A <a href="http://www.graphviz.org/">DOT</a> file creator plugin which
* creates a DOT file showing the current connections
*
* @version $Revision: $
*/
public class ConnectionDotFilePlugin implements BrokerPlugin {
private String file = "ActiveMQConnections.dot";
private boolean redrawOnRemove;
public Broker installPlugin(Broker broker) throws Exception {
return new ConnectionDotFileInterceptor(broker, file, redrawOnRemove);
}
public String getFile() {
return file;
}
/**
* Sets the destination file name to create the destination diagram
*/
public void setFile(String file) {
this.file = file;
}
}

View File

@ -16,18 +16,12 @@
package org.apache.activemq.broker.view; package org.apache.activemq.broker.view;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.filter.DestinationMapNode; import org.apache.activemq.filter.DestinationMapNode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.FileWriter;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@ -39,11 +33,9 @@ import java.util.Iterator;
public class DestinationDotFileInterceptor extends DotFileInterceptorSupport { public class DestinationDotFileInterceptor extends DotFileInterceptorSupport {
protected static final String ID_SEPARATOR = "_"; protected static final String ID_SEPARATOR = "_";
public DestinationDotFileInterceptor(Broker next, String file) { public DestinationDotFileInterceptor(Broker next, String file) {
super(next, file); super(next, file);
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {

View File

@ -16,14 +16,17 @@
*/ */
package org.apache.activemq.test; package org.apache.activemq.test;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.view.DestinationDotFileInterceptor; import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
import org.apache.activemq.broker.view.DestinationDotFilePlugin; import org.apache.activemq.broker.view.DestinationDotFilePlugin;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.demo.DefaultQueueSender; import org.apache.activemq.demo.DefaultQueueSender;
import java.net.URI; import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
/** /**
* A helper class which can be handy for running a broker in your IDE from the * A helper class which can be handy for running a broker in your IDE from the
@ -42,18 +45,29 @@ public class Main {
brokerURI = args[0]; brokerURI = args[0];
} }
try { try {
//BrokerService broker = BrokerFactory.createBroker(new URI(brokerURI)); // TODO - this seems to break interceptors for some reason
// BrokerService broker = BrokerFactory.createBroker(new
// URI(brokerURI));
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setPersistent(false); broker.setPersistent(false);
broker.setUseJmx(true); broker.setUseJmx(true);
broker.setPlugins(new BrokerPlugin[] { new DestinationDotFilePlugin() }); broker.setPlugins(new BrokerPlugin[] { /*new DestinationDotFilePlugin(), */ new ConnectionDotFilePlugin() });
broker.addConnector("tcp://localhost:61616"); broker.addConnector("tcp://localhost:61616");
broker.addConnector("stomp://localhost:61613"); broker.addConnector("stomp://localhost:61613");
broker.start(); broker.start();
// lets create a dummy couple of consumers
Connection connection = new ActiveMQConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = session.createConsumer(new ActiveMQQueue("Orders.IBM"));
MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 100");
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer3 = session2.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 200");
// lets publish some messages so that there is some stuff to browse // lets publish some messages so that there is some stuff to browse
DefaultQueueSender.main(new String[] {"Prices.Equity.IBM"}); DefaultQueueSender.main(new String[] { "Prices.Equity.IBM" });
DefaultQueueSender.main(new String[] {"Prices.Equity.MSFT"}); DefaultQueueSender.main(new String[] { "Prices.Equity.MSFT" });
} }
catch (Exception e) { catch (Exception e) {
System.out.println("Failed: " + e); System.out.println("Failed: " + e);