HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements

Signed-off-by: Xu Cang <xucang@apache.org>
This commit is contained in:
Sakthi 2019-03-09 16:09:34 -08:00 committed by Xu Cang
parent 1f84dfc796
commit ef1ed87679
No known key found for this signature in database
GPG Key ID: 8E6C8FEDCA866394
4 changed files with 177 additions and 61 deletions

View File

@ -50,8 +50,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
private ExampleRegionObserverMeta observer; private ExampleRegionObserverMeta observer;
private Map<String, Optional<Metric>> requestsMap; private Map<String, Optional<Metric>> requestsMap;
private RegionCoprocessorEnvironment regionCoprocessorEnv; private MetricRegistry registry;
private LossyCounting clientMetricsLossyCounting; private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
private boolean active = false; private boolean active = false;
enum MetaTableOps { enum MetaTableOps {
@ -94,11 +94,11 @@ public class MetaTableMetrics implements RegionCoprocessor {
if (!active || !isMetaTableOp(e)) { if (!active || !isMetaTableOp(e)) {
return; return;
} }
tableMetricRegisterAndMark(e, row); tableMetricRegisterAndMark(row);
clientMetricRegisterAndMark(e); clientMetricRegisterAndMark();
regionMetricRegisterAndMark(e, row); regionMetricRegisterAndMark(row);
opMetricRegisterAndMark(e, row); opMetricRegisterAndMark(row);
opWithClientMetricRegisterAndMark(e, row); opWithClientMetricRegisterAndMark(row);
} }
private void markMeterIfPresent(String requestMeter) { private void markMeterIfPresent(String requestMeter) {
@ -106,19 +106,18 @@ public class MetaTableMetrics implements RegionCoprocessor {
return; return;
} }
if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) { Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
Meter metric = (Meter) requestsMap.get(requestMeter).get(); if (optionalMetric != null && optionalMetric.isPresent()) {
Meter metric = (Meter) optionalMetric.get();
metric.mark(); metric.mark();
} }
} }
private void registerMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e, private void registerMeterIfNotPresent(String requestMeter) {
String requestMeter) {
if (requestMeter.isEmpty()) { if (requestMeter.isEmpty()) {
return; return;
} }
if (!requestsMap.containsKey(requestMeter)) { if (!requestsMap.containsKey(requestMeter)) {
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
registry.meter(requestMeter); registry.meter(requestMeter);
requestsMap.put(requestMeter, registry.get(requestMeter)); requestsMap.put(requestMeter, registry.get(requestMeter));
} }
@ -129,32 +128,36 @@ public class MetaTableMetrics implements RegionCoprocessor {
* By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate) * By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
* e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
* also, all kept elements have frequency higher than e * N. (N is total count) * also, all kept elements have frequency higher than e * N. (N is total count)
* @param e Region coprocessor environment
* @param requestMeter meter to be registered * @param requestMeter meter to be registered
* @param lossyCounting lossyCounting object for one type of meters. * @param lossyCounting lossyCounting object for one type of meters.
*/ */
private void registerLossyCountingMeterIfNotPresent( private void registerLossyCountingMeterIfNotPresent(String requestMeter,
ObserverContext<RegionCoprocessorEnvironment> e, LossyCounting lossyCounting) {
String requestMeter, LossyCounting lossyCounting) {
if (requestMeter.isEmpty()) { if (requestMeter.isEmpty()) {
return; return;
} }
Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter); synchronized (lossyCounting) {
if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){ Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
for(String meter: metersToBeRemoved) {
//cleanup requestsMap according swept data from lossy count; boolean isNewMeter = !requestsMap.containsKey(requestMeter);
boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
if (isNewMeter) {
if (requestMeterRemoved) {
// if the new metric is swept off by lossyCounting then don't add in the map
metersToBeRemoved.remove(requestMeter);
} else {
// else register the new metric and add in the map
registry.meter(requestMeter);
requestsMap.put(requestMeter, registry.get(requestMeter));
}
}
for (String meter : metersToBeRemoved) {
//cleanup requestsMap according to the swept data from lossy count;
requestsMap.remove(meter); requestsMap.remove(meter);
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
registry.remove(meter); registry.remove(meter);
} }
// newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap.
return;
}
if (!requestsMap.containsKey(requestMeter)) {
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
registry.meter(requestMeter);
requestsMap.put(requestMeter, registry.get(requestMeter));
} }
} }
@ -191,49 +194,59 @@ public class MetaTableMetrics implements RegionCoprocessor {
.equals(e.getEnvironment().getRegionInfo().getTable()); .equals(e.getEnvironment().getRegionInfo().getTable());
} }
private void clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) { private void clientMetricRegisterAndMark() {
// Mark client metric // Mark client metric
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : ""; String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
if (clientIP == null || clientIP.isEmpty()) {
return;
}
String clientRequestMeter = clientRequestMeterName(clientIP); String clientRequestMeter = clientRequestMeterName(clientIP);
registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting); registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
markMeterIfPresent(clientRequestMeter); markMeterIfPresent(clientRequestMeter);
} }
private void tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e, private void tableMetricRegisterAndMark(Row op) {
Row op) {
// Mark table metric // Mark table metric
String tableName = getTableNameFromOp(op); String tableName = getTableNameFromOp(op);
if (tableName == null || tableName.isEmpty()) {
return;
}
String tableRequestMeter = tableMeterName(tableName); String tableRequestMeter = tableMeterName(tableName);
registerAndMarkMeterIfNotPresent(e, tableRequestMeter); registerAndMarkMeterIfNotPresent(tableRequestMeter);
} }
private void regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e, private void regionMetricRegisterAndMark(Row op) {
Row op) {
// Mark region metric // Mark region metric
String regionId = getRegionIdFromOp(op); String regionId = getRegionIdFromOp(op);
if (regionId == null || regionId.isEmpty()) {
return;
}
String regionRequestMeter = regionMeterName(regionId); String regionRequestMeter = regionMeterName(regionId);
registerAndMarkMeterIfNotPresent(e, regionRequestMeter); registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
markMeterIfPresent(regionRequestMeter);
} }
private void opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e, private void opMetricRegisterAndMark(Row op) {
Row op) {
// Mark access type ["get", "put", "delete"] metric // Mark access type ["get", "put", "delete"] metric
String opMeterName = opMeterName(op); String opMeterName = opMeterName(op);
registerAndMarkMeterIfNotPresent(e, opMeterName); if (opMeterName == null || opMeterName.isEmpty()) {
return;
}
registerAndMarkMeterIfNotPresent(opMeterName);
} }
private void opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e, private void opWithClientMetricRegisterAndMark(Object op) {
Object op) {
// // Mark client + access type metric // // Mark client + access type metric
String opWithClientMeterName = opWithClientMeterName(op); String opWithClientMeterName = opWithClientMeterName(op);
registerAndMarkMeterIfNotPresent(e, opWithClientMeterName); if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
return;
}
registerAndMarkMeterIfNotPresent(opWithClientMeterName);
} }
// Helper function to register and mark meter if not present // Helper function to register and mark meter if not present
private void registerAndMarkMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e, private void registerAndMarkMeterIfNotPresent(String name) {
String name) { registerMeterIfNotPresent(name);
registerMeterIfNotPresent(e, name);
markMeterIfPresent(name); markMeterIfPresent(name);
} }
@ -291,12 +304,12 @@ public class MetaTableMetrics implements RegionCoprocessor {
if (clientIP.isEmpty()) { if (clientIP.isEmpty()) {
return ""; return "";
} }
return String.format("MetaTable_client_%s_request", clientIP); return String.format("MetaTable_client_%s_lossy_request", clientIP);
} }
private String regionMeterName(String regionId) { private String regionMeterName(String regionId) {
// Extract meter name containing the region ID // Extract meter name containing the region ID
return String.format("MetaTable_region_%s_request", regionId); return String.format("MetaTable_region_%s_lossy_request", regionId);
} }
} }
@ -312,9 +325,11 @@ public class MetaTableMetrics implements RegionCoprocessor {
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
.equals(TableName.META_TABLE_NAME)) { .equals(TableName.META_TABLE_NAME)) {
regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
requestsMap = new ConcurrentHashMap<>(); requestsMap = new ConcurrentHashMap<>();
clientMetricsLossyCounting = new LossyCounting(); clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
// only be active mode when this region holds meta table. // only be active mode when this region holds meta table.
active = true; active = true;
} }
@ -324,7 +339,6 @@ public class MetaTableMetrics implements RegionCoprocessor {
public void stop(CoprocessorEnvironment env) throws IOException { public void stop(CoprocessorEnvironment env) throws IOException {
// since meta region can move around, clear stale metrics when stop. // since meta region can move around, clear stale metrics when stop.
if (requestsMap != null) { if (requestsMap != null) {
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
for (String meterName : requestsMap.keySet()) { for (String meterName : requestsMap.keySet()) {
registry.remove(meterName); registry.remove(meterName);
} }

View File

@ -51,9 +51,11 @@ public class LossyCounting {
private double errorRate; private double errorRate;
private Map<String, Integer> data; private Map<String, Integer> data;
private long totalDataCount; private long totalDataCount;
private String name;
public LossyCounting(double errorRate) { public LossyCounting(double errorRate, String name) {
this.errorRate = errorRate; this.errorRate = errorRate;
this.name = name;
if (errorRate < 0.0 || errorRate > 1.0) { if (errorRate < 0.0 || errorRate > 1.0) {
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
} }
@ -64,8 +66,9 @@ public class LossyCounting {
calculateCurrentTerm(); calculateCurrentTerm();
} }
public LossyCounting() { public LossyCounting(String name) {
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02)); this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
name);
} }
public Set<String> addByOne(String key) { public Set<String> addByOne(String key) {
@ -93,7 +96,7 @@ public class LossyCounting {
for(String key : dataToBeSwept) { for(String key : dataToBeSwept) {
data.remove(key); data.remove(key);
} }
LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size())); LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
return dataToBeSwept; return dataToBeSwept;
} }

View File

@ -13,6 +13,8 @@ package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.JMXListener;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@ -73,6 +76,11 @@ public class TestMetaTableMetrics {
private static Configuration conf = null; private static Configuration conf = null;
private static int connectorPort = 61120; private static int connectorPort = 61120;
final byte[] cf = Bytes.toBytes("info");
final byte[] col = Bytes.toBytes("any");
byte[] tablename;
final int nthreads = 20;
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
@ -224,4 +232,95 @@ public class TestMetaTableMetrics {
assertEquals(5L, putWithClientMetricsCount); assertEquals(5L, putWithClientMetricsCount);
} }
@Test(timeout = 30000)
public void testConcurrentAccess() {
try {
tablename = Bytes.toBytes("hbase:meta");
int numRows = 3000;
int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
putData(numRows);
Thread.sleep(2000);
int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
getData(numRows);
} catch (InterruptedException e) {
LOG.info("Caught InterruptedException while testConcurrentAccess: " + e.getMessage());
fail();
} catch (IOException e) {
LOG.info("Caught IOException while testConcurrentAccess: " + e.getMessage());
fail();
}
}
public void putData(int nrows) throws InterruptedException {
LOG.info(String.format("Putting %d rows in hbase:meta", nrows));
Thread[] threads = new Thread[nthreads];
for (int i = 1; i <= nthreads; i++) {
threads[i - 1] = new PutThread(1, nrows);
}
startThreadsAndWaitToJoin(threads);
}
public void getData(int nrows) throws InterruptedException {
LOG.info(String.format("Getting %d rows from hbase:meta", nrows));
Thread[] threads = new Thread[nthreads];
for (int i = 1; i <= nthreads; i++) {
threads[i - 1] = new GetThread(1, nrows);
}
startThreadsAndWaitToJoin(threads);
}
private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException {
for (int i = 1; i <= nthreads; i++) {
threads[i - 1].start();
}
for (int i = 1; i <= nthreads; i++) {
threads[i - 1].join();
}
}
class PutThread extends Thread {
int start;
int end;
public PutThread(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public void run() {
try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
for (int i = start; i <= end; i++) {
Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
p.addColumn(cf, col, Bytes.toBytes("Value" + i));
table.put(p);
}
} catch (IOException e) {
LOG.info("Caught IOException while PutThread operation: " + e.getMessage());
}
}
}
class GetThread extends Thread {
int start;
int end;
public GetThread(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public void run() {
try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
for (int i = start; i <= end; i++) {
Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
table.get(get);
}
} catch (IOException e) {
LOG.info("Caught IOException while GetThread operation: " + e.getMessage());
}
}
}
} }

View File

@ -38,15 +38,15 @@ public class TestLossyCounting {
@Test @Test
public void testBucketSize() { public void testBucketSize() {
LossyCounting lossyCounting = new LossyCounting(0.01); LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize");
assertEquals(100L, lossyCounting.getBucketSize()); assertEquals(100L, lossyCounting.getBucketSize());
LossyCounting lossyCounting2 = new LossyCounting(); LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2");
assertEquals(50L, lossyCounting2.getBucketSize()); assertEquals(50L, lossyCounting2.getBucketSize());
} }
@Test @Test
public void testAddByOne() { public void testAddByOne() {
LossyCounting lossyCounting = new LossyCounting(0.01); LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne");
for(int i = 0; i < 100; i++){ for(int i = 0; i < 100; i++){
String key = "" + i; String key = "" + i;
lossyCounting.addByOne(key); lossyCounting.addByOne(key);
@ -60,7 +60,7 @@ public class TestLossyCounting {
@Test @Test
public void testSweep1() { public void testSweep1() {
LossyCounting lossyCounting = new LossyCounting(0.01); LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1");
for(int i = 0; i < 400; i++){ for(int i = 0; i < 400; i++){
String key = "" + i; String key = "" + i;
lossyCounting.addByOne(key); lossyCounting.addByOne(key);
@ -71,7 +71,7 @@ public class TestLossyCounting {
@Test @Test
public void testSweep2() { public void testSweep2() {
LossyCounting lossyCounting = new LossyCounting(0.1); LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2");
for(int i = 0; i < 10; i++){ for(int i = 0; i < 10; i++){
String key = "" + i; String key = "" + i;
lossyCounting.addByOne(key); lossyCounting.addByOne(key);