From 1ba7cc216470ac16cd439aa0b35d8e6f9d6c7d8d Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sun, 12 Nov 2017 10:32:58 +0800 Subject: [PATCH] HBASE-19035 Miss metrics when coprocessor use region scanner to read data --- .../hadoop/hbase/regionserver/HRegion.java | 17 +-- .../MetricsRegionServerWrapperImpl.java | 2 +- .../hbase/regionserver/RSRpcServices.java | 10 -- .../TestRegionServerReadRequestMetrics.java | 141 +++++++++++++++--- 4 files changed, 125 insertions(+), 45 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a265a5544ad..197aa3ce808 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -292,8 +292,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final LongAdder checkAndMutateChecksFailed = new LongAdder(); // Number of requests + // Count rows for scan final LongAdder readRequestsCount = new LongAdder(); final LongAdder filteredReadRequestsCount = new LongAdder(); + // Count rows for multi row mutations final LongAdder writeRequestsCount = new LongAdder(); // Number of requests blocked by memstore size. @@ -1229,14 +1231,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return readRequestsCount.sum(); } - /** - * Update the read request count for this region - * @param i increment - */ - public void updateReadRequestsCount(long i) { - readRequestsCount.add(i); - } - @Override public long getFilteredReadRequestsCount() { return filteredReadRequestsCount.sum(); @@ -6212,7 +6206,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "or a lengthy garbage collection"); } startRegionOperation(Operation.SCAN); - readRequestsCount.increment(); try { return nextRaw(outResults, scannerContext); } finally { @@ -6244,6 +6237,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi outResults.addAll(tmpList); } + if (!outResults.isEmpty()) { + readRequestsCount.increment(); + } + // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows @@ -7165,7 +7162,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { - writeRequestsCount.add(mutations.size()); MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); processRowsWithLocks(proc, -1, nonceGroup, nonce); } @@ -7259,6 +7255,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 4. Let the processor scan the rows, generate mutations and add waledits doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { + writeRequestsCount.add(mutations.size()); // STEP 5. Call the preBatchMutate hook processor.preBatchMutate(this, walEdit); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 515b1eae0ce..7d7833bcec9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -228,7 +228,7 @@ class MetricsRegionServerWrapperImpl @Override public long getTotalRowActionRequestCount() { - return regionServer.rpcServices.requestRowActionCount.sum(); + return readRequestsCount + writeRequestsCount; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 4b3fa505e34..68e7049cdbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -264,10 +264,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Count only once for requests with multiple actions like multi/caching-scan/replayBatch final LongAdder requestCount = new LongAdder(); - // Request counter. (Excludes requests that are not serviced by regions.) - // Count rows for requests with multiple actions like multi/caching-scan/replayBatch - final LongAdder requestRowActionCount = new LongAdder(); - // Request counter for rpc get final LongAdder rpcGetRequestCount = new LongAdder(); @@ -1104,7 +1100,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } requestCount.increment(); - requestRowActionCount.add(mutations.size()); if (!region.getRegionInfo().isMetaRegion()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } @@ -2380,7 +2375,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - requestRowActionCount.increment(); rpcGetRequestCount.increment(); HRegion region = getRegion(request.getRegion()); @@ -2549,7 +2543,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, .getRegionActionCount()); ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); for (RegionAction regionAction : request.getRegionActionList()) { - this.requestRowActionCount.add(regionAction.getActionCount()); OperationQuota quota; HRegion region; regionActionResultBuilder.clear(); @@ -2684,7 +2677,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - requestRowActionCount.increment(); rpcMutateRequestCount.increment(); HRegion region = getRegion(request.getRegion()); MutateResponse.Builder builder = MutateResponse.newBuilder(); @@ -3130,8 +3122,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.setScanMetrics(metricBuilder.build()); } } - region.updateReadRequestsCount(numOfResults); - requestRowActionCount.add(numOfResults); long end = EnvironmentEdgeManager.currentTime(); long responseCellSize = context != null ? context.getResponseCellSize() : 0; region.getMetrics().updateScanTime(end - before); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java index 7822bc7be15..7227183edc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java @@ -18,26 +18,33 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.ClusterStatus.Option; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.RowFilter; @@ -54,13 +61,17 @@ import java.io.IOException; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; @Category(MediumTests.class) public class TestRegionServerReadRequestMetrics { + private static final Log LOG = LogFactory.getLog(TestRegionServerReadRequestMetrics.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final TableName TABLE_NAME = TableName.valueOf("test"); private static final byte[] CF1 = "c1".getBytes(); @@ -83,7 +94,7 @@ public class TestRegionServerReadRequestMetrics { private static Admin admin; private static Collection serverNames; private static Table table; - private static List tableRegions; + private static RegionInfo regionInfo; private static Map requestsMap = new HashMap<>(); private static Map requestsMapPrev = new HashMap<>(); @@ -98,7 +109,9 @@ public class TestRegionServerReadRequestMetrics { serverNames = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers(); table = createTable(); putData(); - tableRegions = admin.getTableRegions(TABLE_NAME); + List regions = admin.getRegions(TABLE_NAME); + assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size()); + regionInfo = regions.get(0); for (Metric metric : Metric.values()) { requestsMap.put(metric, 0L); @@ -107,14 +120,11 @@ public class TestRegionServerReadRequestMetrics { } private static Table createTable() throws IOException { - HTableDescriptor td = new HTableDescriptor(TABLE_NAME); - HColumnDescriptor cd1 = new HColumnDescriptor(CF1); - td.addFamily(cd1); - HColumnDescriptor cd2 = new HColumnDescriptor(CF2); - cd2.setTimeToLive(TTL); - td.addFamily(cd2); - - admin.createTable(td); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL) + .build()); + admin.createTable(builder.build()); return TEST_UTIL.getConnection().getTable(TABLE_NAME); } @@ -159,18 +169,16 @@ public class TestRegionServerReadRequestMetrics { serverLoad = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName); Map regionsLoad = serverLoad.getRegionsLoad(); - for (HRegionInfo tableRegion : tableRegions) { - RegionLoad regionLoad = regionsLoad.get(tableRegion.getRegionName()); - if (regionLoad != null) { - regionLoadOuter = regionLoad; - for (Metric metric : Metric.values()) { - if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) { - for (Metric metricInner : Metric.values()) { - requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner)); - } - metricsUpdated = true; - break; + RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName()); + if (regionLoad != null) { + regionLoadOuter = regionLoad; + for (Metric metric : Metric.values()) { + if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) { + for (Metric metricInner : Metric.values()) { + requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner)); } + metricsUpdated = true; + break; } } } @@ -397,5 +405,90 @@ public class TestRegionServerReadRequestMetrics { } } + @Test + public void testReadRequestsWithCoprocessor() throws Exception { + TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor"); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); + builder.addCoprocessor(ScanRegionCoprocessor.class.getName()); + admin.createTable(builder.build()); + + try { + TEST_UTIL.waitTableAvailable(tableName); + List regionInfos = admin.getRegions(tableName); + assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size()); + boolean success = true; + int i = 0; + for (; i < MAX_TRY; i++) { + try { + testReadRequests(regionInfos.get(0).getRegionName(), 3); + } catch (Throwable t) { + LOG.warn("Got exception when try " + i + " times", t); + Thread.sleep(SLEEP_MS); + success = false; + } + if (success) { + break; + } + } + if (i == MAX_TRY) { + fail("Failed to get right read requests metric after try " + i + " times"); + } + } finally { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + } + + private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception { + for (ServerName serverName : serverNames) { + ServerLoad serverLoad = + admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName); + Map regionsLoad = serverLoad.getRegionsLoad(); + RegionLoad regionLoad = regionsLoad.get(regionName); + if (regionLoad != null) { + LOG.debug("server read request is " + serverLoad.getReadRequestsCount() + + ", region read request is " + regionLoad.getReadRequestsCount()); + assertEquals(3, serverLoad.getReadRequestsCount()); + assertEquals(3, regionLoad.getReadRequestsCount()); + } + } + } + + public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver { + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void postOpen(ObserverContext c) { + RegionCoprocessorEnvironment env = c.getEnvironment(); + Region region = env.getRegion(); + try { + putData(region); + RegionScanner scanner = region.getScanner(new Scan()); + List result = new LinkedList<>(); + while (scanner.next(result)) { + result.clear(); + } + } catch (Exception e) { + LOG.warn("Got exception in coprocessor", e); + } + } + + private void putData(Region region) throws Exception { + Put put = new Put(ROW1); + put.addColumn(CF1, COL1, VAL1); + region.put(put); + put = new Put(ROW2); + put.addColumn(CF1, COL1, VAL1); + region.put(put); + put = new Put(ROW3); + put.addColumn(CF1, COL1, VAL1); + region.put(put); + } + } + private enum Metric {REGION_READ, SERVER_READ, FILTERED_REGION_READ, FILTERED_SERVER_READ} }