rudimentary generic persistence adapter mbean view - help show data and transaction info - usefull when blocked on usage

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1434903 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-01-17 20:40:23 +00:00
parent 1898869fbc
commit 3bffaf7e4c
4 changed files with 115 additions and 0 deletions

View File

@ -118,6 +118,15 @@ public class BrokerMBeanSupport {
return new ObjectName(objectNameStr);
}
public static ObjectName createPersistenceAdapterName(String brokerObjectName, String name) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += "," + "Service=PersistenceAdapter";
objectNameStr += "," + "InstanceName=" + JMXSupport.encodeObjectNamePart(name);
return new ObjectName(objectNameStr);
}
public static ObjectName createAbortSlowConsumerStrategyName(ObjectName brokerObjectName, AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
return createAbortSlowConsumerStrategyName(brokerObjectName.toString(), strategy);
}

View File

@ -0,0 +1,16 @@
package org.apache.activemq.broker.jmx;
public interface PersistenceAdapterViewMBean {
@MBeanInfo("Name of this persistence adapter.")
String getName();
@MBeanInfo("Current inflight local transactions.")
String getTransactions();
@MBeanInfo("Current data.")
String getData();
@MBeanInfo("Current size.")
long getSize();
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.store.kahadb;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterView;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -38,6 +40,10 @@ import org.apache.activemq.util.ServiceStopper;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Callable;
import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
/**
* An implementation of {@link PersistenceAdapter} designed for use with
@ -189,6 +195,24 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
*/
public void doStart() throws Exception {
this.letter.start();
if (brokerService != null && brokerService.isUseJmx()) {
PersistenceAdapterView view = new PersistenceAdapterView(this);
view.setInflightTransactionViewCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return letter.getTransactions();
}
});
view.setDataViewCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return letter.getJournal().getFileMap().keySet().toString();
}
});
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view,
createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString()));
}
}
/**

View File

@ -466,6 +466,72 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return l;
}
class TranInfo {
TransactionId id;
Location location;
class opCount {
int add;
int remove;
}
HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
public void track(Operation operation) {
if (location == null ) {
location = operation.getLocation();
}
KahaDestination destination;
boolean isAdd = false;
if (operation instanceof AddOpperation) {
AddOpperation add = (AddOpperation) operation;
destination = add.getCommand().getDestination();
isAdd = true;
} else {
RemoveOpperation removeOpperation = (RemoveOpperation) operation;
destination = removeOpperation.getCommand().getDestination();
}
opCount opCount = destinationOpCount.get(destination);
if (opCount == null) {
opCount = new opCount();
destinationOpCount.put(destination, opCount);
}
if (isAdd) {
opCount.add++;
} else {
opCount.remove++;
}
}
@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append(location).append(";").append(id).append(";\n");
for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
}
return buffer.toString();
}
}
@SuppressWarnings("rawtypes")
public String getTransactions() {
ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
synchronized (inflightTransactions) {
if (!inflightTransactions.isEmpty()) {
for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
TranInfo info = new TranInfo();
info.id = entry.getKey();
for (Operation operation : entry.getValue()) {
info.track(operation);
}
infos.add(info);
}
}
}
return infos.toString();
}
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.