diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java index 06b50d0020..beebd10229 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java @@ -16,16 +16,25 @@ */ package org.apache.activemq.broker.jmx; -import java.util.concurrent.Callable; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.activemq.management.TimeStatisticImpl; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.PersistenceAdapterStatistics; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; public class PersistenceAdapterView implements PersistenceAdapterViewMBean { + private final static ObjectMapper mapper = new ObjectMapper(); private final String name; private final PersistenceAdapter persistenceAdapter; private Callable inflightTransactionViewCallable; private Callable dataViewCallable; + private PersistenceAdapterStatistics persistenceAdapterStatistics; public PersistenceAdapterView(PersistenceAdapter adapter) { this.name = adapter.toString(); @@ -52,6 +61,22 @@ public class PersistenceAdapterView implements PersistenceAdapterViewMBean { return persistenceAdapter.size(); } + @Override + public String getStatistics() { + return serializePersistenceAdapterStatistics(); + } + + @Override + public String resetStatistics() { + final String result = serializePersistenceAdapterStatistics(); + + if (persistenceAdapterStatistics != null) { + persistenceAdapterStatistics.reset(); + } + + return result; + } + private String invoke(Callable callable) { String result = null; if (callable != null) { @@ -64,6 +89,36 @@ public class PersistenceAdapterView implements PersistenceAdapterViewMBean { return result; } + private String serializePersistenceAdapterStatistics() { + if (persistenceAdapterStatistics != null) { + try { + Map result = new HashMap(); + result.put("writeTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getWriteTime())); + result.put("readTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getReadTime())); + return mapper.writeValueAsString(result); + } catch (IOException e) { + return e.toString(); + } + } + + return null; + } + + private Map getTimeStatisticAsMap(final TimeStatisticImpl timeStatistic) { + Map result = new HashMap(); + + result.put("count", timeStatistic.getCount()); + result.put("maxTime", timeStatistic.getMaxTime()); + result.put("minTime", timeStatistic.getMinTime()); + result.put("totalTime", timeStatistic.getTotalTime()); + result.put("averageTime", timeStatistic.getAverageTime()); + result.put("averageTimeExMinMax", timeStatistic.getAverageTimeExcludingMinMax()); + result.put("averagePerSecond", timeStatistic.getAveragePerSecond()); + result.put("averagePerSecondExMinMax", timeStatistic.getAveragePerSecondExcludingMinMax()); + + return result; + } + public void setDataViewCallable(Callable dataViewCallable) { this.dataViewCallable = dataViewCallable; } @@ -71,4 +126,8 @@ public class PersistenceAdapterView implements PersistenceAdapterViewMBean { public void setInflightTransactionViewCallable(Callable inflightTransactionViewCallable) { this.inflightTransactionViewCallable = inflightTransactionViewCallable; } + + public void setPersistenceAdapterStatistics(PersistenceAdapterStatistics persistenceAdapterStatistics) { + this.persistenceAdapterStatistics = persistenceAdapterStatistics; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java index b860e9cd68..3eee6eaadf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java @@ -29,4 +29,10 @@ public interface PersistenceAdapterViewMBean { @MBeanInfo("Current size.") long getSize(); + + @MBeanInfo("Statistics related to the PersistentAdapter.") + String getStatistics(); + + @MBeanInfo("Resets statistics.") + String resetStatistics(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java new file mode 100644 index 0000000000..0a21469635 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java @@ -0,0 +1,54 @@ +package org.apache.activemq.store; + +import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.management.TimeStatisticImpl; + +public class PersistenceAdapterStatistics extends StatsImpl { + protected TimeStatisticImpl writeTime; + protected TimeStatisticImpl readTime; + + public PersistenceAdapterStatistics() { + writeTime = new TimeStatisticImpl("writeTime", "Time to write data to the PersistentAdapter."); + readTime = new TimeStatisticImpl("readTime", "Time to read data from the PersistentAdapter."); + addStatistic("writeTime", writeTime); + addStatistic("readTime", readTime); + } + + public void addWriteTime(final long time) { + writeTime.addTime(time); + } + + public void addReadTime(final long time) { + readTime.addTime(time); + } + + public void setEnabled(boolean enabled) { + super.setEnabled(enabled); + writeTime.setEnabled(enabled); + readTime.setEnabled(enabled); + } + + public TimeStatisticImpl getWriteTime() { + return writeTime; + } + + public TimeStatisticImpl getReadTime() { return readTime; } + + public void reset() { + if (isDoReset()) { + writeTime.reset(); + readTime.reset(); + } + } + + public void setParent(PersistenceAdapterStatistics parent) { + if (parent != null) { + writeTime.setParent(parent.writeTime); + readTime.setParent(parent.readTime); + } else { + writeTime.setParent(null); + readTime.setParent(null); + } + + } +} diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 5d6e89699e..eb0d56d154 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -16,15 +16,6 @@ */ package org.apache.activemq.store.kahadb; -import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; - -import java.io.File; -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.Callable; - -import javax.management.ObjectName; - import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.LockableServiceSupport; @@ -44,6 +35,7 @@ import org.apache.activemq.store.JournaledStore; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.NoLocalSubscriptionAware; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.PersistenceAdapterStatistics; import org.apache.activemq.store.SharedFileLocker; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; @@ -56,6 +48,14 @@ import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStra import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ServiceStopper; +import javax.management.ObjectName; +import java.io.File; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.Callable; + +import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; + /** * An implementation of {@link PersistenceAdapter} designed for use with * KahaDB - Embedded Lightweight Non-Relational Database @@ -245,6 +245,9 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements return letter.getJournal().getFileMap().keySet().toString(); } }); + + view.setPersistenceAdapterStatistics(letter.persistenceAdapterStatistics); + AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString())); } @@ -404,6 +407,15 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); } + /** + * Get the PersistenceAdapterStatistics + * + * @return the persistenceAdapterStatistics + */ + public PersistenceAdapterStatistics getPersistenceAdapterStatistics() { + return this.letter.getPersistenceAdapterStatistics(); + } + /** * Get the directory * diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 8d0693d922..9660afc145 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -70,6 +70,7 @@ import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.MessageStoreSubscriptionStatistics; +import org.apache.activemq.store.PersistenceAdapterStatistics; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; @@ -249,6 +250,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected PageFile pageFile; protected Journal journal; protected Metadata metadata = new Metadata(); + protected final PersistenceAdapterStatistics persistenceAdapterStatistics = new PersistenceAdapterStatistics(); protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); @@ -1142,6 +1144,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); } } + + persistenceAdapterStatistics.addWriteTime(end - start); + } finally { checkpointLock.readLock().unlock(); } @@ -1174,6 +1179,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); } } + + persistenceAdapterStatistics.addReadTime(end - start); + DataByteArrayInputStream is = new DataByteArrayInputStream(data); byte readByte = is.readByte(); KahaEntryType type = KahaEntryType.valueOf(readByte); @@ -3651,6 +3659,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return enableIndexPageCaching; } + public PersistenceAdapterStatistics getPersistenceAdapterStatistics() { + return this.persistenceAdapterStatistics; + } + // ///////////////////////////////////////////////////////////////// // Internal conversion methods. // /////////////////////////////////////////////////////////////////