HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements
Signed-off-by: Xu Cang <xucang@apache.org>
This commit is contained in:
parent
953e0f5f77
commit
1366f5cb6a
|
@ -35,12 +35,14 @@ import org.apache.hadoop.hbase.util.LossyCounting;
|
|||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
/**
|
||||
* A coprocessor that collects metrics from meta table.
|
||||
* <p>
|
||||
* These metrics will be available through the regular Hadoop metrics2 sinks (ganglia, opentsdb,
|
||||
* etc) as well as JMX output.
|
||||
* </p>
|
||||
*
|
||||
* @see MetaTableMetrics
|
||||
*/
|
||||
|
||||
|
@ -48,8 +50,8 @@ import com.google.common.collect.ImmutableMap;
|
|||
public class MetaTableMetrics extends BaseRegionObserver {
|
||||
|
||||
private Map<String, Optional<Metric>> requestsMap;
|
||||
private RegionCoprocessorEnvironment regionCoprocessorEnv;
|
||||
private LossyCounting clientMetricsLossyCounting;
|
||||
private MetricRegistry registry;
|
||||
private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
|
||||
private boolean active = false;
|
||||
|
||||
enum MetaTableOps {
|
||||
|
@ -57,11 +59,8 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
}
|
||||
|
||||
private ImmutableMap<Class, MetaTableOps> opsNameMap =
|
||||
ImmutableMap.<Class, MetaTableOps>builder()
|
||||
.put(Put.class, MetaTableOps.PUT)
|
||||
.put(Get.class, MetaTableOps.GET)
|
||||
.put(Delete.class, MetaTableOps.DELETE)
|
||||
.build();
|
||||
ImmutableMap.<Class, MetaTableOps>builder().put(Put.class, MetaTableOps.PUT)
|
||||
.put(Get.class, MetaTableOps.GET).put(Delete.class, MetaTableOps.DELETE).build();
|
||||
|
||||
@Override
|
||||
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
|
||||
|
@ -85,11 +84,11 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
if (!active || !isMetaTableOp(e)) {
|
||||
return;
|
||||
}
|
||||
tableMetricRegisterAndMark(e, row);
|
||||
clientMetricRegisterAndMark(e);
|
||||
regionMetricRegisterAndMark(e, row);
|
||||
opMetricRegisterAndMark(e, row);
|
||||
opWithClientMetricRegisterAndMark(e, row);
|
||||
tableMetricRegisterAndMark(row);
|
||||
clientMetricRegisterAndMark();
|
||||
regionMetricRegisterAndMark(row);
|
||||
opMetricRegisterAndMark(row);
|
||||
opWithClientMetricRegisterAndMark(row);
|
||||
}
|
||||
|
||||
private void markMeterIfPresent(String requestMeter) {
|
||||
|
@ -97,19 +96,18 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
return;
|
||||
}
|
||||
|
||||
if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) {
|
||||
Meter metric = (Meter) requestsMap.get(requestMeter).get();
|
||||
Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
|
||||
if (optionalMetric != null && optionalMetric.isPresent()) {
|
||||
Meter metric = (Meter) optionalMetric.get();
|
||||
metric.mark();
|
||||
}
|
||||
}
|
||||
|
||||
private void registerMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
String requestMeter) {
|
||||
private void registerMeterIfNotPresent(String requestMeter) {
|
||||
if (requestMeter.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (!requestsMap.containsKey(requestMeter)) {
|
||||
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||
registry.meter(requestMeter);
|
||||
requestsMap.put(requestMeter, registry.get(requestMeter));
|
||||
}
|
||||
|
@ -119,38 +117,43 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
* Registers and counts lossyCount for Meters that kept by lossy counting.
|
||||
* By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
|
||||
* e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
|
||||
* also, all kept elements have frequency higher than e * N. (N is total count)
|
||||
* @param e Region coprocessor environment
|
||||
* @param requestMeter meter to be registered
|
||||
* also, all kept elements have frequency higher than e * N. (N is total count)
|
||||
*
|
||||
* @param requestMeter meter to be registered
|
||||
* @param lossyCounting lossyCounting object for one type of meters.
|
||||
*/
|
||||
private void registerLossyCountingMeterIfNotPresent(
|
||||
ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
String requestMeter, LossyCounting lossyCounting) {
|
||||
private void registerLossyCountingMeterIfNotPresent(String requestMeter,
|
||||
LossyCounting lossyCounting) {
|
||||
if (requestMeter.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
|
||||
if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){
|
||||
for(String meter: metersToBeRemoved) {
|
||||
//cleanup requestsMap according swept data from lossy count;
|
||||
synchronized (lossyCounting) {
|
||||
Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
|
||||
|
||||
boolean isNewMeter = !requestsMap.containsKey(requestMeter);
|
||||
boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
|
||||
if (isNewMeter) {
|
||||
if (requestMeterRemoved) {
|
||||
// if the new metric is swept off by lossyCounting then don't add in the map
|
||||
metersToBeRemoved.remove(requestMeter);
|
||||
} else {
|
||||
// else register the new metric and add in the map
|
||||
registry.meter(requestMeter);
|
||||
requestsMap.put(requestMeter, registry.get(requestMeter));
|
||||
}
|
||||
}
|
||||
|
||||
for (String meter : metersToBeRemoved) {
|
||||
//cleanup requestsMap according to the swept data from lossy count;
|
||||
requestsMap.remove(meter);
|
||||
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||
registry.remove(meter);
|
||||
}
|
||||
// newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap.
|
||||
return;
|
||||
}
|
||||
|
||||
if (!requestsMap.containsKey(requestMeter)) {
|
||||
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||
registry.meter(requestMeter);
|
||||
requestsMap.put(requestMeter, registry.get(requestMeter));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get table name from Ops such as: get, put, delete.
|
||||
*
|
||||
* @param op such as get, put or delete.
|
||||
*/
|
||||
private String getTableNameFromOp(Row op) {
|
||||
|
@ -165,7 +168,8 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
|
||||
/**
|
||||
* Get regionId from Ops such as: get, put, delete.
|
||||
* @param op such as get, put or delete.
|
||||
*
|
||||
* @param op such as get, put or delete.
|
||||
*/
|
||||
private String getRegionIdFromOp(Row op) {
|
||||
String regionId = null;
|
||||
|
@ -181,47 +185,60 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
return TableName.META_TABLE_NAME.equals(e.getEnvironment().getRegionInfo().getTable());
|
||||
}
|
||||
|
||||
private void clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||
private void clientMetricRegisterAndMark() {
|
||||
// Mark client metric
|
||||
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
|
||||
if (clientIP == null || clientIP.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
String clientRequestMeter = clientRequestMeterName(clientIP);
|
||||
registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting);
|
||||
registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
|
||||
markMeterIfPresent(clientRequestMeter);
|
||||
}
|
||||
|
||||
private void tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e, Row op) {
|
||||
private void tableMetricRegisterAndMark(Row op) {
|
||||
// Mark table metric
|
||||
String tableName = getTableNameFromOp(op);
|
||||
if (tableName == null || tableName.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
String tableRequestMeter = tableMeterName(tableName);
|
||||
registerAndMarkMeterIfNotPresent(e, tableRequestMeter);
|
||||
registerAndMarkMeterIfNotPresent(tableRequestMeter);
|
||||
}
|
||||
|
||||
private void regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
Row op) {
|
||||
private void regionMetricRegisterAndMark(Row op) {
|
||||
// Mark region metric
|
||||
String regionId = getRegionIdFromOp(op);
|
||||
if (regionId == null || regionId.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
String regionRequestMeter = regionMeterName(regionId);
|
||||
registerAndMarkMeterIfNotPresent(e, regionRequestMeter);
|
||||
registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
|
||||
markMeterIfPresent(regionRequestMeter);
|
||||
}
|
||||
|
||||
private void opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e, Row op) {
|
||||
private void opMetricRegisterAndMark(Row op) {
|
||||
// Mark access type ["get", "put", "delete"] metric
|
||||
String opMeterName = opMeterName(op);
|
||||
registerAndMarkMeterIfNotPresent(e, opMeterName);
|
||||
if (opMeterName == null || opMeterName.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
registerAndMarkMeterIfNotPresent(opMeterName);
|
||||
}
|
||||
|
||||
private void opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
Object op) {
|
||||
// // Mark client + access type metric
|
||||
private void opWithClientMetricRegisterAndMark(Object op) {
|
||||
// Mark client + access type metric
|
||||
String opWithClientMeterName = opWithClientMeterName(op);
|
||||
registerAndMarkMeterIfNotPresent(e, opWithClientMeterName);
|
||||
if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
registerAndMarkMeterIfNotPresent(opWithClientMeterName);
|
||||
}
|
||||
|
||||
// Helper function to register and mark meter if not present
|
||||
private void registerAndMarkMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
String name) {
|
||||
registerMeterIfNotPresent(e, name);
|
||||
private void registerAndMarkMeterIfNotPresent(String name) {
|
||||
registerMeterIfNotPresent(name);
|
||||
markMeterIfPresent(name);
|
||||
}
|
||||
|
||||
|
@ -278,12 +295,12 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
if (clientIP.isEmpty()) {
|
||||
return "";
|
||||
}
|
||||
return String.format("MetaTable_client_%s_request", clientIP);
|
||||
return String.format("MetaTable_client_%s_lossy_request", clientIP);
|
||||
}
|
||||
|
||||
private String regionMeterName(String regionId) {
|
||||
// Extract meter name containing the region ID
|
||||
return String.format("MetaTable_region_%s_request", regionId);
|
||||
return String.format("MetaTable_region_%s_lossy_request", regionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -291,10 +308,12 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
if (env instanceof RegionCoprocessorEnvironment
|
||||
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
|
||||
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
|
||||
.equals(TableName.META_TABLE_NAME)) {
|
||||
regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
|
||||
.equals(TableName.META_TABLE_NAME)) {
|
||||
RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
|
||||
registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||
requestsMap = new ConcurrentHashMap<>();
|
||||
clientMetricsLossyCounting = new LossyCounting();
|
||||
clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
|
||||
regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
|
||||
// only be active mode when this region holds meta table.
|
||||
active = true;
|
||||
}
|
||||
|
@ -304,7 +323,6 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||
// since meta region can move around, clear stale metrics when stop.
|
||||
if (requestsMap != null) {
|
||||
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||
for (String meterName : requestsMap.keySet()) {
|
||||
registry.remove(meterName);
|
||||
}
|
||||
|
|
|
@ -50,9 +50,11 @@ public class LossyCounting {
|
|||
private double errorRate;
|
||||
private Map<String, Integer> data;
|
||||
private long totalDataCount;
|
||||
private String name;
|
||||
|
||||
public LossyCounting(double errorRate) {
|
||||
public LossyCounting(double errorRate, String name) {
|
||||
this.errorRate = errorRate;
|
||||
this.name = name;
|
||||
if (errorRate < 0.0 || errorRate > 1.0) {
|
||||
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
|
||||
}
|
||||
|
@ -63,8 +65,9 @@ public class LossyCounting {
|
|||
calculateCurrentTerm();
|
||||
}
|
||||
|
||||
public LossyCounting() {
|
||||
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02));
|
||||
public LossyCounting(String name) {
|
||||
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
|
||||
name);
|
||||
}
|
||||
|
||||
public Set<String> addByOne(String key) {
|
||||
|
@ -95,7 +98,7 @@ public class LossyCounting {
|
|||
for(String key : dataToBeSwept) {
|
||||
data.remove(key);
|
||||
}
|
||||
LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size()));
|
||||
LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
|
||||
return dataToBeSwept;
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@ package org.apache.hadoop.hbase.coprocessor;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -35,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.JMXListener;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||
|
@ -63,6 +66,11 @@ public class TestMetaTableMetrics {
|
|||
private static Configuration conf = null;
|
||||
private static int connectorPort = 61120;
|
||||
|
||||
final byte[] cf = Bytes.toBytes("info");
|
||||
final byte[] col = Bytes.toBytes("any");
|
||||
byte[] tablename;
|
||||
final int nthreads = 20;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
|
||||
|
@ -220,4 +228,96 @@ public class TestMetaTableMetrics {
|
|||
assertEquals(5L, putWithClientMetricsCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testConcurrentAccess() {
|
||||
try {
|
||||
tablename = Bytes.toBytes("hbase:meta");
|
||||
int numRows = 3000;
|
||||
int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
|
||||
putData(numRows);
|
||||
Thread.sleep(2000);
|
||||
int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
|
||||
assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
|
||||
getData(numRows);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Caught InterruptedException while testConcurrentAccess: " + e.getMessage());
|
||||
fail();
|
||||
} catch (IOException e) {
|
||||
LOG.info("Caught IOException while testConcurrentAccess: " + e.getMessage());
|
||||
fail();
|
||||
}
|
||||
}
|
||||
|
||||
public void putData(int nrows) throws InterruptedException {
|
||||
LOG.info(String.format("Putting %d rows in hbase:meta", nrows));
|
||||
Thread[] threads = new Thread[nthreads];
|
||||
for (int i = 1; i <= nthreads; i++) {
|
||||
threads[i - 1] = new PutThread(1, nrows);
|
||||
}
|
||||
startThreadsAndWaitToJoin(threads);
|
||||
}
|
||||
|
||||
public void getData(int nrows) throws InterruptedException {
|
||||
LOG.info(String.format("Getting %d rows from hbase:meta", nrows));
|
||||
Thread[] threads = new Thread[nthreads];
|
||||
for (int i = 1; i <= nthreads; i++) {
|
||||
threads[i - 1] = new GetThread(1, nrows);
|
||||
}
|
||||
startThreadsAndWaitToJoin(threads);
|
||||
}
|
||||
|
||||
private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException {
|
||||
for (int i = 1; i <= nthreads; i++) {
|
||||
threads[i - 1].start();
|
||||
}
|
||||
for (int i = 1; i <= nthreads; i++) {
|
||||
threads[i - 1].join();
|
||||
}
|
||||
}
|
||||
|
||||
class PutThread extends Thread {
|
||||
int start;
|
||||
int end;
|
||||
|
||||
public PutThread(int start, int end) {
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
|
||||
for (int i = start; i <= end; i++) {
|
||||
Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
|
||||
p.addColumn(cf, col, Bytes.toBytes("Value" + i));
|
||||
table.put(p);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Caught IOException while PutThread operation: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class GetThread extends Thread {
|
||||
int start;
|
||||
int end;
|
||||
|
||||
public GetThread(int start, int end) {
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
|
||||
for (int i = start; i <= end; i++) {
|
||||
Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
|
||||
table.get(get);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Caught IOException while GetThread operation: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,15 +33,15 @@ public class TestLossyCounting {
|
|||
|
||||
@Test
|
||||
public void testBucketSize() {
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01);
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize");
|
||||
assertEquals(100L, lossyCounting.getBucketSize());
|
||||
LossyCounting lossyCounting2 = new LossyCounting();
|
||||
LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2");
|
||||
assertEquals(50L, lossyCounting2.getBucketSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddByOne() {
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01);
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne");
|
||||
for(int i = 0; i < 100; i++){
|
||||
String key = "" + i;
|
||||
lossyCounting.addByOne(key);
|
||||
|
@ -55,7 +55,7 @@ public class TestLossyCounting {
|
|||
|
||||
@Test
|
||||
public void testSweep1() {
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01);
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1");
|
||||
for(int i = 0; i < 400; i++){
|
||||
String key = "" + i;
|
||||
lossyCounting.addByOne(key);
|
||||
|
@ -66,7 +66,7 @@ public class TestLossyCounting {
|
|||
|
||||
@Test
|
||||
public void testSweep2() {
|
||||
LossyCounting lossyCounting = new LossyCounting(0.1);
|
||||
LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2");
|
||||
for(int i = 0; i < 10; i++){
|
||||
String key = "" + i;
|
||||
lossyCounting.addByOne(key);
|
||||
|
|
Loading…
Reference in New Issue