HBASE-19035 Miss metrics when coprocessor use region scanner to read data

This commit is contained in:
Guanghao Zhang 2017-11-12 10:32:58 +08:00
parent 3ee76f8573
commit bc8048cf6c
4 changed files with 125 additions and 45 deletions

View File

@ -293,8 +293,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final LongAdder checkAndMutateChecksFailed = new LongAdder();
// Number of requests
// Count rows for scan
final LongAdder readRequestsCount = new LongAdder();
final LongAdder filteredReadRequestsCount = new LongAdder();
// Count rows for multi row mutations
final LongAdder writeRequestsCount = new LongAdder();
// Number of requests blocked by memstore size.
@ -1230,14 +1232,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return readRequestsCount.sum();
}
/**
* Update the read request count for this region
* @param i increment
*/
public void updateReadRequestsCount(long i) {
readRequestsCount.add(i);
}
@Override
public long getFilteredReadRequestsCount() {
return filteredReadRequestsCount.sum();
@ -6225,7 +6219,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
"or a lengthy garbage collection");
}
startRegionOperation(Operation.SCAN);
readRequestsCount.increment();
try {
return nextRaw(outResults, scannerContext);
} finally {
@ -6257,6 +6250,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
outResults.addAll(tmpList);
}
if (!outResults.isEmpty()) {
readRequestsCount.increment();
}
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
@ -7178,7 +7175,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
writeRequestsCount.add(mutations.size());
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
processRowsWithLocks(proc, -1, nonceGroup, nonce);
}
@ -7272,6 +7268,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 4. Let the processor scan the rows, generate mutations and add waledits
doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
writeRequestsCount.add(mutations.size());
// STEP 5. Call the preBatchMutate hook
processor.preBatchMutate(this, walEdit);

View File

@ -228,7 +228,7 @@ class MetricsRegionServerWrapperImpl
@Override
public long getTotalRowActionRequestCount() {
return regionServer.rpcServices.requestRowActionCount.sum();
return readRequestsCount + writeRequestsCount;
}
@Override

View File

@ -264,10 +264,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
final LongAdder requestCount = new LongAdder();
// Request counter. (Excludes requests that are not serviced by regions.)
// Count rows for requests with multiple actions like multi/caching-scan/replayBatch
final LongAdder requestRowActionCount = new LongAdder();
// Request counter for rpc get
final LongAdder rpcGetRequestCount = new LongAdder();
@ -1104,7 +1100,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
requestCount.increment();
requestRowActionCount.add(mutations.size());
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
@ -2380,7 +2375,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
checkOpen();
requestCount.increment();
requestRowActionCount.increment();
rpcGetRequestCount.increment();
HRegion region = getRegion(request.getRegion());
@ -2549,7 +2543,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
.getRegionActionCount());
ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
for (RegionAction regionAction : request.getRegionActionList()) {
this.requestRowActionCount.add(regionAction.getActionCount());
OperationQuota quota;
HRegion region;
regionActionResultBuilder.clear();
@ -2684,7 +2677,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
checkOpen();
requestCount.increment();
requestRowActionCount.increment();
rpcMutateRequestCount.increment();
HRegion region = getRegion(request.getRegion());
MutateResponse.Builder builder = MutateResponse.newBuilder();
@ -3130,8 +3122,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.setScanMetrics(metricBuilder.build());
}
}
region.updateReadRequestsCount(numOfResults);
requestRowActionCount.add(numOfResults);
long end = EnvironmentEdgeManager.currentTime();
long responseCellSize = context != null ? context.getResponseCellSize() : 0;
region.getMetrics().updateScanTime(end - before);

View File

@ -18,26 +18,33 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ClusterStatus.Option;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
@ -54,13 +61,17 @@ import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@Category(MediumTests.class)
public class TestRegionServerReadRequestMetrics {
private static final Log LOG = LogFactory.getLog(TestRegionServerReadRequestMetrics.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("test");
private static final byte[] CF1 = "c1".getBytes();
@ -83,7 +94,7 @@ public class TestRegionServerReadRequestMetrics {
private static Admin admin;
private static Collection<ServerName> serverNames;
private static Table table;
private static List<HRegionInfo> tableRegions;
private static RegionInfo regionInfo;
private static Map<Metric, Long> requestsMap = new HashMap<>();
private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
@ -98,7 +109,9 @@ public class TestRegionServerReadRequestMetrics {
serverNames = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers();
table = createTable();
putData();
tableRegions = admin.getTableRegions(TABLE_NAME);
List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size());
regionInfo = regions.get(0);
for (Metric metric : Metric.values()) {
requestsMap.put(metric, 0L);
@ -107,14 +120,11 @@ public class TestRegionServerReadRequestMetrics {
}
private static Table createTable() throws IOException {
HTableDescriptor td = new HTableDescriptor(TABLE_NAME);
HColumnDescriptor cd1 = new HColumnDescriptor(CF1);
td.addFamily(cd1);
HColumnDescriptor cd2 = new HColumnDescriptor(CF2);
cd2.setTimeToLive(TTL);
td.addFamily(cd2);
admin.createTable(td);
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL)
.build());
admin.createTable(builder.build());
return TEST_UTIL.getConnection().getTable(TABLE_NAME);
}
@ -159,8 +169,7 @@ public class TestRegionServerReadRequestMetrics {
serverLoad = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName);
Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
for (HRegionInfo tableRegion : tableRegions) {
RegionLoad regionLoad = regionsLoad.get(tableRegion.getRegionName());
RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName());
if (regionLoad != null) {
regionLoadOuter = regionLoad;
for (Metric metric : Metric.values()) {
@ -174,7 +183,6 @@ public class TestRegionServerReadRequestMetrics {
}
}
}
}
if (metricsUpdated) {
break;
}
@ -397,5 +405,90 @@ public class TestRegionServerReadRequestMetrics {
}
}
@Test
public void testReadRequestsWithCoprocessor() throws Exception {
TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor");
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
builder.addCoprocessor(ScanRegionCoprocessor.class.getName());
admin.createTable(builder.build());
try {
TEST_UTIL.waitTableAvailable(tableName);
List<RegionInfo> regionInfos = admin.getRegions(tableName);
assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size());
boolean success = true;
int i = 0;
for (; i < MAX_TRY; i++) {
try {
testReadRequests(regionInfos.get(0).getRegionName(), 3);
} catch (Throwable t) {
LOG.warn("Got exception when try " + i + " times", t);
Thread.sleep(SLEEP_MS);
success = false;
}
if (success) {
break;
}
}
if (i == MAX_TRY) {
fail("Failed to get right read requests metric after try " + i + " times");
}
} finally {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
}
private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception {
for (ServerName serverName : serverNames) {
ServerLoad serverLoad =
admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName);
Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
RegionLoad regionLoad = regionsLoad.get(regionName);
if (regionLoad != null) {
LOG.debug("server read request is " + serverLoad.getReadRequestsCount()
+ ", region read request is " + regionLoad.getReadRequestsCount());
assertEquals(3, serverLoad.getReadRequestsCount());
assertEquals(3, regionLoad.getReadRequestsCount());
}
}
}
public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
RegionCoprocessorEnvironment env = c.getEnvironment();
Region region = env.getRegion();
try {
putData(region);
RegionScanner scanner = region.getScanner(new Scan());
List<Cell> result = new LinkedList<>();
while (scanner.next(result)) {
result.clear();
}
} catch (Exception e) {
LOG.warn("Got exception in coprocessor", e);
}
}
private void putData(Region region) throws Exception {
Put put = new Put(ROW1);
put.addColumn(CF1, COL1, VAL1);
region.put(put);
put = new Put(ROW2);
put.addColumn(CF1, COL1, VAL1);
region.put(put);
put = new Put(ROW3);
put.addColumn(CF1, COL1, VAL1);
region.put(put);
}
}
private enum Metric {REGION_READ, SERVER_READ, FILTERED_REGION_READ, FILTERED_SERVER_READ}
}