HBASE-19035 Miss metrics when coprocessor use region scanner to read data
This commit is contained in:
parent
c87189d418
commit
1ba7cc2164
|
@ -292,8 +292,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
final LongAdder checkAndMutateChecksFailed = new LongAdder();
|
||||
|
||||
// Number of requests
|
||||
// Count rows for scan
|
||||
final LongAdder readRequestsCount = new LongAdder();
|
||||
final LongAdder filteredReadRequestsCount = new LongAdder();
|
||||
// Count rows for multi row mutations
|
||||
final LongAdder writeRequestsCount = new LongAdder();
|
||||
|
||||
// Number of requests blocked by memstore size.
|
||||
|
@ -1229,14 +1231,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return readRequestsCount.sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the read request count for this region
|
||||
* @param i increment
|
||||
*/
|
||||
public void updateReadRequestsCount(long i) {
|
||||
readRequestsCount.add(i);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilteredReadRequestsCount() {
|
||||
return filteredReadRequestsCount.sum();
|
||||
|
@ -6212,7 +6206,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
"or a lengthy garbage collection");
|
||||
}
|
||||
startRegionOperation(Operation.SCAN);
|
||||
readRequestsCount.increment();
|
||||
try {
|
||||
return nextRaw(outResults, scannerContext);
|
||||
} finally {
|
||||
|
@ -6244,6 +6237,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
outResults.addAll(tmpList);
|
||||
}
|
||||
|
||||
if (!outResults.isEmpty()) {
|
||||
readRequestsCount.increment();
|
||||
}
|
||||
|
||||
// If the size limit was reached it means a partial Result is being returned. Returning a
|
||||
// partial Result means that we should not reset the filters; filters should only be reset in
|
||||
// between rows
|
||||
|
@ -7165,7 +7162,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
@Override
|
||||
public void mutateRowsWithLocks(Collection<Mutation> mutations,
|
||||
Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
|
||||
writeRequestsCount.add(mutations.size());
|
||||
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
|
||||
processRowsWithLocks(proc, -1, nonceGroup, nonce);
|
||||
}
|
||||
|
@ -7259,6 +7255,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// STEP 4. Let the processor scan the rows, generate mutations and add waledits
|
||||
doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
|
||||
if (!mutations.isEmpty()) {
|
||||
writeRequestsCount.add(mutations.size());
|
||||
// STEP 5. Call the preBatchMutate hook
|
||||
processor.preBatchMutate(this, walEdit);
|
||||
|
||||
|
|
|
@ -228,7 +228,7 @@ class MetricsRegionServerWrapperImpl
|
|||
|
||||
@Override
|
||||
public long getTotalRowActionRequestCount() {
|
||||
return regionServer.rpcServices.requestRowActionCount.sum();
|
||||
return readRequestsCount + writeRequestsCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -264,10 +264,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
|
||||
final LongAdder requestCount = new LongAdder();
|
||||
|
||||
// Request counter. (Excludes requests that are not serviced by regions.)
|
||||
// Count rows for requests with multiple actions like multi/caching-scan/replayBatch
|
||||
final LongAdder requestRowActionCount = new LongAdder();
|
||||
|
||||
// Request counter for rpc get
|
||||
final LongAdder rpcGetRequestCount = new LongAdder();
|
||||
|
||||
|
@ -1104,7 +1100,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
}
|
||||
requestCount.increment();
|
||||
requestRowActionCount.add(mutations.size());
|
||||
if (!region.getRegionInfo().isMetaRegion()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
|
@ -2380,7 +2375,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
try {
|
||||
checkOpen();
|
||||
requestCount.increment();
|
||||
requestRowActionCount.increment();
|
||||
rpcGetRequestCount.increment();
|
||||
HRegion region = getRegion(request.getRegion());
|
||||
|
||||
|
@ -2549,7 +2543,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
.getRegionActionCount());
|
||||
ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
|
||||
for (RegionAction regionAction : request.getRegionActionList()) {
|
||||
this.requestRowActionCount.add(regionAction.getActionCount());
|
||||
OperationQuota quota;
|
||||
HRegion region;
|
||||
regionActionResultBuilder.clear();
|
||||
|
@ -2684,7 +2677,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
try {
|
||||
checkOpen();
|
||||
requestCount.increment();
|
||||
requestRowActionCount.increment();
|
||||
rpcMutateRequestCount.increment();
|
||||
HRegion region = getRegion(request.getRegion());
|
||||
MutateResponse.Builder builder = MutateResponse.newBuilder();
|
||||
|
@ -3130,8 +3122,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
builder.setScanMetrics(metricBuilder.build());
|
||||
}
|
||||
}
|
||||
region.updateReadRequestsCount(numOfResults);
|
||||
requestRowActionCount.add(numOfResults);
|
||||
long end = EnvironmentEdgeManager.currentTime();
|
||||
long responseCellSize = context != null ? context.getResponseCellSize() : 0;
|
||||
region.getMetrics().updateScanTime(end - before);
|
||||
|
|
|
@ -18,26 +18,33 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.ClusterStatus.Option;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.ServerLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||
|
@ -54,13 +61,17 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestRegionServerReadRequestMetrics {
|
||||
private static final Log LOG = LogFactory.getLog(TestRegionServerReadRequestMetrics.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("test");
|
||||
private static final byte[] CF1 = "c1".getBytes();
|
||||
|
@ -83,7 +94,7 @@ public class TestRegionServerReadRequestMetrics {
|
|||
private static Admin admin;
|
||||
private static Collection<ServerName> serverNames;
|
||||
private static Table table;
|
||||
private static List<HRegionInfo> tableRegions;
|
||||
private static RegionInfo regionInfo;
|
||||
|
||||
private static Map<Metric, Long> requestsMap = new HashMap<>();
|
||||
private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
|
||||
|
@ -98,7 +109,9 @@ public class TestRegionServerReadRequestMetrics {
|
|||
serverNames = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers();
|
||||
table = createTable();
|
||||
putData();
|
||||
tableRegions = admin.getTableRegions(TABLE_NAME);
|
||||
List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
|
||||
assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size());
|
||||
regionInfo = regions.get(0);
|
||||
|
||||
for (Metric metric : Metric.values()) {
|
||||
requestsMap.put(metric, 0L);
|
||||
|
@ -107,14 +120,11 @@ public class TestRegionServerReadRequestMetrics {
|
|||
}
|
||||
|
||||
private static Table createTable() throws IOException {
|
||||
HTableDescriptor td = new HTableDescriptor(TABLE_NAME);
|
||||
HColumnDescriptor cd1 = new HColumnDescriptor(CF1);
|
||||
td.addFamily(cd1);
|
||||
HColumnDescriptor cd2 = new HColumnDescriptor(CF2);
|
||||
cd2.setTimeToLive(TTL);
|
||||
td.addFamily(cd2);
|
||||
|
||||
admin.createTable(td);
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
|
||||
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
|
||||
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL)
|
||||
.build());
|
||||
admin.createTable(builder.build());
|
||||
return TEST_UTIL.getConnection().getTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
|
@ -159,18 +169,16 @@ public class TestRegionServerReadRequestMetrics {
|
|||
serverLoad = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName);
|
||||
|
||||
Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
|
||||
for (HRegionInfo tableRegion : tableRegions) {
|
||||
RegionLoad regionLoad = regionsLoad.get(tableRegion.getRegionName());
|
||||
if (regionLoad != null) {
|
||||
regionLoadOuter = regionLoad;
|
||||
for (Metric metric : Metric.values()) {
|
||||
if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
|
||||
for (Metric metricInner : Metric.values()) {
|
||||
requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
|
||||
}
|
||||
metricsUpdated = true;
|
||||
break;
|
||||
RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName());
|
||||
if (regionLoad != null) {
|
||||
regionLoadOuter = regionLoad;
|
||||
for (Metric metric : Metric.values()) {
|
||||
if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
|
||||
for (Metric metricInner : Metric.values()) {
|
||||
requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
|
||||
}
|
||||
metricsUpdated = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -397,5 +405,90 @@ public class TestRegionServerReadRequestMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadRequestsWithCoprocessor() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor");
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
|
||||
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
|
||||
builder.addCoprocessor(ScanRegionCoprocessor.class.getName());
|
||||
admin.createTable(builder.build());
|
||||
|
||||
try {
|
||||
TEST_UTIL.waitTableAvailable(tableName);
|
||||
List<RegionInfo> regionInfos = admin.getRegions(tableName);
|
||||
assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size());
|
||||
boolean success = true;
|
||||
int i = 0;
|
||||
for (; i < MAX_TRY; i++) {
|
||||
try {
|
||||
testReadRequests(regionInfos.get(0).getRegionName(), 3);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Got exception when try " + i + " times", t);
|
||||
Thread.sleep(SLEEP_MS);
|
||||
success = false;
|
||||
}
|
||||
if (success) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (i == MAX_TRY) {
|
||||
fail("Failed to get right read requests metric after try " + i + " times");
|
||||
}
|
||||
} finally {
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception {
|
||||
for (ServerName serverName : serverNames) {
|
||||
ServerLoad serverLoad =
|
||||
admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName);
|
||||
Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
|
||||
RegionLoad regionLoad = regionsLoad.get(regionName);
|
||||
if (regionLoad != null) {
|
||||
LOG.debug("server read request is " + serverLoad.getReadRequestsCount()
|
||||
+ ", region read request is " + regionLoad.getReadRequestsCount());
|
||||
assertEquals(3, serverLoad.getReadRequestsCount());
|
||||
assertEquals(3, regionLoad.getReadRequestsCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver {
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
|
||||
RegionCoprocessorEnvironment env = c.getEnvironment();
|
||||
Region region = env.getRegion();
|
||||
try {
|
||||
putData(region);
|
||||
RegionScanner scanner = region.getScanner(new Scan());
|
||||
List<Cell> result = new LinkedList<>();
|
||||
while (scanner.next(result)) {
|
||||
result.clear();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Got exception in coprocessor", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void putData(Region region) throws Exception {
|
||||
Put put = new Put(ROW1);
|
||||
put.addColumn(CF1, COL1, VAL1);
|
||||
region.put(put);
|
||||
put = new Put(ROW2);
|
||||
put.addColumn(CF1, COL1, VAL1);
|
||||
region.put(put);
|
||||
put = new Put(ROW3);
|
||||
put.addColumn(CF1, COL1, VAL1);
|
||||
region.put(put);
|
||||
}
|
||||
}
|
||||
|
||||
private enum Metric {REGION_READ, SERVER_READ, FILTERED_REGION_READ, FILTERED_SERVER_READ}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue