From c0d53996354a9db4a682af38604dae420b759ef9 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Thu, 3 Jun 2021 23:19:54 -0500 Subject: [PATCH] [AMQ-8285] Add 'slow' metrics to PersistenceAdapterStatistics (#664) --- .../broker/jmx/PersistenceAdapterView.java | 3 ++ .../store/PersistenceAdapterStatistics.java | 46 +++++++++++++++++++ .../store/kahadb/MessageDatabase.java | 22 +++++---- 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java index beebd10229..be75a22980 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java @@ -93,6 +93,9 @@ public class PersistenceAdapterView implements PersistenceAdapterViewMBean { if (persistenceAdapterStatistics != null) { try { Map result = new HashMap(); + result.put("slowCleanupTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getSlowCleanupTime())); + result.put("slowWriteTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getSlowWriteTime())); + result.put("slowReadTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getSlowReadTime())); result.put("writeTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getWriteTime())); result.put("readTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getReadTime())); return mapper.writeValueAsString(result); diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java index b29982220c..23a112cacf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java @@ -20,16 +20,39 @@ import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.TimeStatisticImpl; public class PersistenceAdapterStatistics extends StatsImpl { + protected TimeStatisticImpl slowCleanupTime; + protected TimeStatisticImpl slowWriteTime; + protected TimeStatisticImpl slowReadTime; + protected TimeStatisticImpl writeTime; protected TimeStatisticImpl readTime; public PersistenceAdapterStatistics() { + slowCleanupTime = new TimeStatisticImpl("slowCleanupTime", "Slow time to cleanup data in the PersistentAdapter."); + slowWriteTime = new TimeStatisticImpl("slowWriteTime", "Slow time to write data to the PersistentAdapter."); + slowReadTime = new TimeStatisticImpl("slowReadTime", "Slow time to read data from the PersistentAdapter."); + addStatistic("slowCleanupTime", slowCleanupTime); + addStatistic("slowWriteTime", slowWriteTime); + addStatistic("slowReadTime", slowReadTime); + writeTime = new TimeStatisticImpl("writeTime", "Time to write data to the PersistentAdapter."); readTime = new TimeStatisticImpl("readTime", "Time to read data from the PersistentAdapter."); addStatistic("writeTime", writeTime); addStatistic("readTime", readTime); } + public void addSlowCleanupTime(final long time) { + slowCleanupTime.addTime(time); + } + + public void addSlowWriteTime(final long time) { + slowWriteTime.addTime(time); + } + + public void addSlowReadTime(final long time) { + slowReadTime.addTime(time); + } + public void addWriteTime(final long time) { writeTime.addTime(time); } @@ -41,9 +64,23 @@ public class PersistenceAdapterStatistics extends StatsImpl { @Override public void setEnabled(boolean enabled) { super.setEnabled(enabled); + slowCleanupTime.setEnabled(enabled); + slowWriteTime.setEnabled(enabled); + slowReadTime.setEnabled(enabled); writeTime.setEnabled(enabled); readTime.setEnabled(enabled); } + + public TimeStatisticImpl getSlowCleanupTime() { + return slowCleanupTime; + } + + public TimeStatisticImpl getSlowWriteTime() { + return slowWriteTime; + } + + public TimeStatisticImpl getSlowReadTime() { return slowReadTime; } + public TimeStatisticImpl getWriteTime() { return writeTime; @@ -56,6 +93,9 @@ public class PersistenceAdapterStatistics extends StatsImpl { if (isDoReset()) { writeTime.reset(); readTime.reset(); + slowCleanupTime.reset(); + slowWriteTime.reset(); + slowReadTime.reset(); } } @@ -63,9 +103,15 @@ public class PersistenceAdapterStatistics extends StatsImpl { if (parent != null) { writeTime.setParent(parent.writeTime); readTime.setParent(parent.readTime); + slowCleanupTime.setParent(parent.slowCleanupTime); + slowWriteTime.setParent(parent.slowWriteTime); + slowReadTime.setParent(parent.slowReadTime); } else { writeTime.setParent(null); readTime.setParent(null); + slowCleanupTime.setParent(null); + slowWriteTime.setParent(null); + slowReadTime.setParent(null); } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 448cb6a3f2..8a4e0ef393 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1102,11 +1102,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe this.indexLock.writeLock().unlock(); } checkpointUpdate(cleanup); - long end = System.currentTimeMillis(); - if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { + long totalTimeMillis = System.currentTimeMillis() - start; + if (LOG_SLOW_ACCESS_TIME > 0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: cleanup took " + (end - start)); + LOG.info("Slow KahaDB access: cleanup took " + totalTimeMillis); } + persistenceAdapterStatistics.addSlowCleanupTime(totalTimeMillis); } } @@ -1157,13 +1158,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe process(data, location, before); long end = System.currentTimeMillis(); - if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { + long totalTimeMillis = end - start; + if (LOG_SLOW_ACCESS_TIME > 0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); } + persistenceAdapterStatistics.addSlowWriteTime(totalTimeMillis); } - persistenceAdapterStatistics.addWriteTime(end - start); + persistenceAdapterStatistics.addWriteTime(totalTimeMillis); } finally { checkpointLock.readLock().unlock(); @@ -1191,14 +1194,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public JournalCommand load(Location location) throws IOException { long start = System.currentTimeMillis(); ByteSequence data = journal.read(location); - long end = System.currentTimeMillis(); - if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { + long totalTimeMillis = System.currentTimeMillis() - start; + if( LOG_SLOW_ACCESS_TIME>0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); + LOG.info("Slow KahaDB access: Journal read took: "+ totalTimeMillis +" ms"); } + persistenceAdapterStatistics.addSlowReadTime(totalTimeMillis); } - persistenceAdapterStatistics.addReadTime(end - start); + persistenceAdapterStatistics.addReadTime(totalTimeMillis); DataByteArrayInputStream is = new DataByteArrayInputStream(data); byte readByte = is.readByte();