From ada9d8d361c3619fed181bc9a0ddca49b8de77ec Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Wed, 28 Mar 2012 23:59:17 +0000 Subject: [PATCH] HBASE-5544 Add metrics to HRegion.processRow() (Scott Chen) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1306648 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/regionserver/BaseRowProcessor.java | 4 ++ .../hadoop/hbase/regionserver/HRegion.java | 46 +++++++++++++++++-- .../hbase/regionserver/RowProcessor.java | 7 ++- .../coprocessor/TestRowProcessorEndpoint.java | 20 ++++++-- 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java index c407f98bde5..f5243744664 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java @@ -46,4 +46,8 @@ public abstract class BaseRowProcessor implements RowProcessor { return HConstants.DEFAULT_CLUSTER_ID; } + @Override + public String getName() { + return this.getClass().getSimpleName().toLowerCase(); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 02d55d4761f..8ae60a34a3a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4264,6 +4264,9 @@ public class HRegion implements HeapSize { // , Writable{ public void processRowsWithLocks(RowProcessor processor, long timeout) throws IOException { + final long startNanoTime = System.nanoTime(); + String metricsName = "rowprocessor." + processor.getName(); + for (byte[] row : processor.getRowsToLock()) { checkRow(row, "processRowsWithLocks"); } @@ -4285,12 +4288,21 @@ public class HRegion implements HeapSize { // , Writable{ doProcessRowWithTimeout( processor, now, this, null, null, timeout); processor.postProcess(this, walEdit); + } catch (IOException e) { + long endNanoTime = System.nanoTime(); + HRegion.incrTimeVaryingMetric(metricsName + ".error.nano", + endNanoTime - startNanoTime); + throw e; } finally { closeRegionOperation(); } + final long endNanoTime = System.nanoTime(); + HRegion.incrTimeVaryingMetric(metricsName + ".nano", + endNanoTime - startNanoTime); return; } + long lockedNanoTime, processDoneNanoTime, unlockedNanoTime = 0; MultiVersionConsistencyControl.WriteEntry writeEntry = null; boolean locked = false; boolean walSyncSuccessful = false; @@ -4313,6 +4325,7 @@ public class HRegion implements HeapSize { // , Writable{ // 3. Region lock this.updatesLock.readLock().lock(); locked = true; + lockedNanoTime = System.nanoTime(); long now = EnvironmentEdgeManager.currentTimeMillis(); try { @@ -4320,6 +4333,7 @@ public class HRegion implements HeapSize { // , Writable{ // waledits doProcessRowWithTimeout( processor, now, this, mutations, walEdit, timeout); + processDoneNanoTime = System.nanoTime(); if (!mutations.isEmpty()) { // 5. Get a mvcc write number @@ -4344,6 +4358,8 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); locked = false; } + unlockedNanoTime = System.nanoTime(); + // 9. Release row lock(s) if (acquiredLocks != null) { for (Integer lid : acquiredLocks) { @@ -4382,11 +4398,18 @@ public class HRegion implements HeapSize { // , Writable{ releaseRowLock(lid); } } + unlockedNanoTime = unlockedNanoTime == 0 ? + System.nanoTime() : unlockedNanoTime; } // 12. Run post-process hook processor.postProcess(this, walEdit); + } catch (IOException e) { + long endNanoTime = System.nanoTime(); + HRegion.incrTimeVaryingMetric(metricsName + ".error.nano", + endNanoTime - startNanoTime); + throw e; } finally { closeRegionOperation(); if (!mutations.isEmpty() && @@ -4394,6 +4417,22 @@ public class HRegion implements HeapSize { // , Writable{ requestFlush(); } } + // Populate all metrics + long endNanoTime = System.nanoTime(); + HRegion.incrTimeVaryingMetric(metricsName + ".nano", + endNanoTime - startNanoTime); + + HRegion.incrTimeVaryingMetric(metricsName + ".acquirelock.nano", + lockedNanoTime - startNanoTime); + + HRegion.incrTimeVaryingMetric(metricsName + ".process.nano", + processDoneNanoTime - lockedNanoTime); + + HRegion.incrTimeVaryingMetric(metricsName + ".occupylock.nano", + unlockedNanoTime - lockedNanoTime); + + HRegion.incrTimeVaryingMetric(metricsName + ".sync.nano", + endNanoTime - unlockedNanoTime); } private void doProcessRowWithTimeout(final RowProcessor processor, @@ -4795,8 +4834,9 @@ public class HRegion implements HeapSize { // , Writable{ // Request a cache flush. Do it outside update lock. requestFlush(); } - if(wrongLength){ - throw new IOException("Attempted to increment field that isn't 64 bits wide"); + if (wrongLength) { + throw new IOException( + "Attempted to increment field that isn't 64 bits wide"); } return result; } @@ -4812,7 +4852,7 @@ public class HRegion implements HeapSize { // , Writable{ throw new NoSuchColumnFamilyException("Column family " + Bytes.toString(family) + " does not exist in region " + this + " in table " + this.htableDescriptor); - } + } } public static final long FIXED_OVERHEAD = ClassSize.align( diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index 15c67f26949..4be0cd310bc 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -64,7 +64,7 @@ public interface RowProcessor { /** * HRegion handles the locks and MVCC and invokes this method properly. - * + * * You should override this to create your own RowProcessor. * * If you are doing read-modify-write here, you should consider using @@ -103,4 +103,9 @@ public interface RowProcessor { */ UUID getClusterId(); + /** + * Human readable name of the processor + * @return The name of the processor + */ + String getName(); } diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 1623f17ecaa..cfcb6d6e002 100644 --- a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -138,7 +138,7 @@ public class TestRowProcessorEndpoint { prepareTestData(); RowProcessorProtocol protocol = table.coprocessorProxy(RowProcessorProtocol.class, ROW); - RowProcessorEndpoint.FriendsOfFriendsProcessor processor = + RowProcessorEndpoint.FriendsOfFriendsProcessor processor = new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A); Set result = protocol.process(processor); @@ -176,7 +176,7 @@ public class TestRowProcessorEndpoint { private int incrementCounter(HTable table) throws Throwable { RowProcessorProtocol protocol = table.coprocessorProxy(RowProcessorProtocol.class, ROW); - RowProcessorEndpoint.IncrementCounterProcessor processor = + RowProcessorEndpoint.IncrementCounterProcessor processor = new RowProcessorEndpoint.IncrementCounterProcessor(ROW); int counterValue = protocol.process(processor); return counterValue; @@ -234,7 +234,7 @@ public class TestRowProcessorEndpoint { private void swapRows(HTable table) throws Throwable { RowProcessorProtocol protocol = table.coprocessorProxy(RowProcessorProtocol.class, ROW); - RowProcessorEndpoint.RowSwapProcessor processor = + RowProcessorEndpoint.RowSwapProcessor processor = new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2); protocol.process(processor); } @@ -244,7 +244,7 @@ public class TestRowProcessorEndpoint { prepareTestData(); RowProcessorProtocol protocol = table.coprocessorProxy(RowProcessorProtocol.class, ROW); - RowProcessorEndpoint.TimeoutProcessor processor = + RowProcessorEndpoint.TimeoutProcessor processor = new RowProcessorEndpoint.TimeoutProcessor(ROW); boolean exceptionCaught = false; try { @@ -510,13 +510,18 @@ public class TestRowProcessorEndpoint { Bytes.writeByteArray(out, row1); Bytes.writeByteArray(out, row2); } + + @Override + public String getName() { + return "swap"; + } } public static class TimeoutProcessor extends BaseRowProcessor implements Writable { byte[] row = new byte[0]; - + /** * Empty constructor for Writable */ @@ -556,6 +561,11 @@ public class TestRowProcessorEndpoint { public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, row); } + + @Override + public String getName() { + return "timeout"; + } } public static void doScan(