From 3bffaf7e4c7a845bac73770d76e0b726a9c39834 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 17 Jan 2013 20:40:23 +0000 Subject: [PATCH] rudimentary generic persistence adapter mbean view - help show data and transaction info - usefull when blocked on usage git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1434903 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/jmx/BrokerMBeanSupport.java | 9 +++ .../jmx/PersistenceAdapterViewMBean.java | 16 +++++ .../kahadb/KahaDBPersistenceAdapter.java | 24 +++++++ .../store/kahadb/MessageDatabase.java | 66 +++++++++++++++++++ 4 files changed, 115 insertions(+) create mode 100644 activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java index 26c41a64a2..194ea3cffc 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java @@ -118,6 +118,15 @@ public class BrokerMBeanSupport { return new ObjectName(objectNameStr); } + public static ObjectName createPersistenceAdapterName(String brokerObjectName, String name) throws MalformedObjectNameException { + String objectNameStr = brokerObjectName; + + objectNameStr += "," + "Service=PersistenceAdapter"; + objectNameStr += "," + "InstanceName=" + JMXSupport.encodeObjectNamePart(name); + + return new ObjectName(objectNameStr); + } + public static ObjectName createAbortSlowConsumerStrategyName(ObjectName brokerObjectName, AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException { return createAbortSlowConsumerStrategyName(brokerObjectName.toString(), strategy); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java new file mode 100644 index 0000000000..38b968319d --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java @@ -0,0 +1,16 @@ +package org.apache.activemq.broker.jmx; + +public interface PersistenceAdapterViewMBean { + + @MBeanInfo("Name of this persistence adapter.") + String getName(); + + @MBeanInfo("Current inflight local transactions.") + String getTransactions(); + + @MBeanInfo("Current data.") + String getData(); + + @MBeanInfo("Current size.") + long getSize(); +} diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 65756e73e2..f9bc455901 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -19,6 +19,8 @@ package org.apache.activemq.store.kahadb; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.LockableServiceSupport; +import org.apache.activemq.broker.jmx.AnnotatedMBean; +import org.apache.activemq.broker.jmx.PersistenceAdapterView; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -38,6 +40,10 @@ import org.apache.activemq.util.ServiceStopper; import java.io.File; import java.io.IOException; import java.util.Set; +import java.util.concurrent.Callable; + + +import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; /** * An implementation of {@link PersistenceAdapter} designed for use with @@ -189,6 +195,24 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements */ public void doStart() throws Exception { this.letter.start(); + + if (brokerService != null && brokerService.isUseJmx()) { + PersistenceAdapterView view = new PersistenceAdapterView(this); + view.setInflightTransactionViewCallable(new Callable() { + @Override + public String call() throws Exception { + return letter.getTransactions(); + } + }); + view.setDataViewCallable(new Callable() { + @Override + public String call() throws Exception { + return letter.getJournal().getFileMap().keySet().toString(); + } + }); + AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, + createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString())); + } } /** diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index efb83fa064..a27c45ff4a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -466,6 +466,72 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return l; } + class TranInfo { + TransactionId id; + Location location; + + class opCount { + int add; + int remove; + } + HashMap destinationOpCount = new HashMap(); + + public void track(Operation operation) { + if (location == null ) { + location = operation.getLocation(); + } + KahaDestination destination; + boolean isAdd = false; + if (operation instanceof AddOpperation) { + AddOpperation add = (AddOpperation) operation; + destination = add.getCommand().getDestination(); + isAdd = true; + } else { + RemoveOpperation removeOpperation = (RemoveOpperation) operation; + destination = removeOpperation.getCommand().getDestination(); + } + opCount opCount = destinationOpCount.get(destination); + if (opCount == null) { + opCount = new opCount(); + destinationOpCount.put(destination, opCount); + } + if (isAdd) { + opCount.add++; + } else { + opCount.remove++; + } + } + + @Override + public String toString() { + StringBuffer buffer = new StringBuffer(); + buffer.append(location).append(";").append(id).append(";\n"); + for (Entry op : destinationOpCount.entrySet()) { + buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); + } + return buffer.toString(); + } + } + + @SuppressWarnings("rawtypes") + public String getTransactions() { + + ArrayList infos = new ArrayList(); + synchronized (inflightTransactions) { + if (!inflightTransactions.isEmpty()) { + for (Entry> entry : inflightTransactions.entrySet()) { + TranInfo info = new TranInfo(); + info.id = entry.getKey(); + for (Operation operation : entry.getValue()) { + info.track(operation); + } + infos.add(info); + } + } + } + return infos.toString(); + } + /** * Move all the messages that were in the journal into long term storage. We * just replay and do a checkpoint.