From d6f23388f5d98d64bc0270e0273124922f8e695c Mon Sep 17 00:00:00 2001 From: Sakthi Date: Sat, 9 Mar 2019 16:09:34 -0800 Subject: [PATCH] HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements Signed-off-by: Xu Cang --- .../hbase/coprocessor/MetaTableMetrics.java | 118 ++++++++++-------- .../hadoop/hbase/util/LossyCounting.java | 11 +- .../coprocessor/TestMetaTableMetrics.java | 99 +++++++++++++++ .../hadoop/hbase/util/TestLossyCounting.java | 10 +- 4 files changed, 177 insertions(+), 61 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java index d08bae6762c..d542d2f2bcb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java @@ -50,8 +50,8 @@ public class MetaTableMetrics implements RegionCoprocessor { private ExampleRegionObserverMeta observer; private Map> requestsMap; - private RegionCoprocessorEnvironment regionCoprocessorEnv; - private LossyCounting clientMetricsLossyCounting; + private MetricRegistry registry; + private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private boolean active = false; enum MetaTableOps { @@ -94,11 +94,11 @@ public class MetaTableMetrics implements RegionCoprocessor { if (!active || !isMetaTableOp(e)) { return; } - tableMetricRegisterAndMark(e, row); - clientMetricRegisterAndMark(e); - regionMetricRegisterAndMark(e, row); - opMetricRegisterAndMark(e, row); - opWithClientMetricRegisterAndMark(e, row); + tableMetricRegisterAndMark(row); + clientMetricRegisterAndMark(); + regionMetricRegisterAndMark(row); + opMetricRegisterAndMark(row); + opWithClientMetricRegisterAndMark(row); } private void markMeterIfPresent(String requestMeter) { @@ -106,19 +106,18 @@ public class MetaTableMetrics implements RegionCoprocessor { return; } - if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) { - Meter metric = (Meter) requestsMap.get(requestMeter).get(); + Optional optionalMetric = requestsMap.get(requestMeter); + if (optionalMetric != null && optionalMetric.isPresent()) { + Meter metric = (Meter) optionalMetric.get(); metric.mark(); } } - private void registerMeterIfNotPresent(ObserverContext e, - String requestMeter) { + private void registerMeterIfNotPresent(String requestMeter) { if (requestMeter.isEmpty()) { return; } if (!requestsMap.containsKey(requestMeter)) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); registry.meter(requestMeter); requestsMap.put(requestMeter, registry.get(requestMeter)); } @@ -129,32 +128,36 @@ public class MetaTableMetrics implements RegionCoprocessor { * By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate) * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept * also, all kept elements have frequency higher than e * N. (N is total count) - * @param e Region coprocessor environment * @param requestMeter meter to be registered * @param lossyCounting lossyCounting object for one type of meters. */ - private void registerLossyCountingMeterIfNotPresent( - ObserverContext e, - String requestMeter, LossyCounting lossyCounting) { + private void registerLossyCountingMeterIfNotPresent(String requestMeter, + LossyCounting lossyCounting) { + if (requestMeter.isEmpty()) { return; } - Set metersToBeRemoved = lossyCounting.addByOne(requestMeter); - if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){ - for(String meter: metersToBeRemoved) { - //cleanup requestsMap according swept data from lossy count; + synchronized (lossyCounting) { + Set metersToBeRemoved = lossyCounting.addByOne(requestMeter); + + boolean isNewMeter = !requestsMap.containsKey(requestMeter); + boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter); + if (isNewMeter) { + if (requestMeterRemoved) { + // if the new metric is swept off by lossyCounting then don't add in the map + metersToBeRemoved.remove(requestMeter); + } else { + // else register the new metric and add in the map + registry.meter(requestMeter); + requestsMap.put(requestMeter, registry.get(requestMeter)); + } + } + + for (String meter : metersToBeRemoved) { + //cleanup requestsMap according to the swept data from lossy count; requestsMap.remove(meter); - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); registry.remove(meter); } - // newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap. - return; - } - - if (!requestsMap.containsKey(requestMeter)) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); - registry.meter(requestMeter); - requestsMap.put(requestMeter, registry.get(requestMeter)); } } @@ -191,49 +194,59 @@ public class MetaTableMetrics implements RegionCoprocessor { .equals(e.getEnvironment().getRegionInfo().getTable()); } - private void clientMetricRegisterAndMark(ObserverContext e) { + private void clientMetricRegisterAndMark() { // Mark client metric String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : ""; - + if (clientIP == null || clientIP.isEmpty()) { + return; + } String clientRequestMeter = clientRequestMeterName(clientIP); - registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting); + registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting); markMeterIfPresent(clientRequestMeter); } - private void tableMetricRegisterAndMark(ObserverContext e, - Row op) { + private void tableMetricRegisterAndMark(Row op) { // Mark table metric String tableName = getTableNameFromOp(op); + if (tableName == null || tableName.isEmpty()) { + return; + } String tableRequestMeter = tableMeterName(tableName); - registerAndMarkMeterIfNotPresent(e, tableRequestMeter); + registerAndMarkMeterIfNotPresent(tableRequestMeter); } - private void regionMetricRegisterAndMark(ObserverContext e, - Row op) { + private void regionMetricRegisterAndMark(Row op) { // Mark region metric String regionId = getRegionIdFromOp(op); + if (regionId == null || regionId.isEmpty()) { + return; + } String regionRequestMeter = regionMeterName(regionId); - registerAndMarkMeterIfNotPresent(e, regionRequestMeter); + registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting); + markMeterIfPresent(regionRequestMeter); } - private void opMetricRegisterAndMark(ObserverContext e, - Row op) { + private void opMetricRegisterAndMark(Row op) { // Mark access type ["get", "put", "delete"] metric String opMeterName = opMeterName(op); - registerAndMarkMeterIfNotPresent(e, opMeterName); + if (opMeterName == null || opMeterName.isEmpty()) { + return; + } + registerAndMarkMeterIfNotPresent(opMeterName); } - private void opWithClientMetricRegisterAndMark(ObserverContext e, - Object op) { + private void opWithClientMetricRegisterAndMark(Object op) { // // Mark client + access type metric String opWithClientMeterName = opWithClientMeterName(op); - registerAndMarkMeterIfNotPresent(e, opWithClientMeterName); + if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) { + return; + } + registerAndMarkMeterIfNotPresent(opWithClientMeterName); } // Helper function to register and mark meter if not present - private void registerAndMarkMeterIfNotPresent(ObserverContext e, - String name) { - registerMeterIfNotPresent(e, name); + private void registerAndMarkMeterIfNotPresent(String name) { + registerMeterIfNotPresent(name); markMeterIfPresent(name); } @@ -291,12 +304,12 @@ public class MetaTableMetrics implements RegionCoprocessor { if (clientIP.isEmpty()) { return ""; } - return String.format("MetaTable_client_%s_request", clientIP); + return String.format("MetaTable_client_%s_lossy_request", clientIP); } private String regionMeterName(String regionId) { // Extract meter name containing the region ID - return String.format("MetaTable_region_%s_request", regionId); + return String.format("MetaTable_region_%s_lossy_request", regionId); } } @@ -312,9 +325,11 @@ public class MetaTableMetrics implements RegionCoprocessor { && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() .equals(TableName.META_TABLE_NAME)) { - regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; + RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; + registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); requestsMap = new ConcurrentHashMap<>(); - clientMetricsLossyCounting = new LossyCounting(); + clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics"); + regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics"); // only be active mode when this region holds meta table. active = true; } @@ -324,7 +339,6 @@ public class MetaTableMetrics implements RegionCoprocessor { public void stop(CoprocessorEnvironment env) throws IOException { // since meta region can move around, clear stale metrics when stop. if (requestsMap != null) { - MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); for (String meterName : requestsMap.keySet()) { registry.remove(meterName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java index 839bb90acf4..d9d84e567d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java @@ -51,9 +51,11 @@ public class LossyCounting { private double errorRate; private Map data; private long totalDataCount; + private String name; - public LossyCounting(double errorRate) { + public LossyCounting(double errorRate, String name) { this.errorRate = errorRate; + this.name = name; if (errorRate < 0.0 || errorRate > 1.0) { throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); } @@ -64,8 +66,9 @@ public class LossyCounting { calculateCurrentTerm(); } - public LossyCounting() { - this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02)); + public LossyCounting(String name) { + this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), + name); } public Set addByOne(String key) { @@ -93,7 +96,7 @@ public class LossyCounting { for(String key : dataToBeSwept) { data.remove(key); } - LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size())); + LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size())); return dataToBeSwept; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java index bbbeb9e5273..82ce709bd36 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java @@ -13,6 +13,8 @@ package org.apache.hadoop.hbase.coprocessor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; @@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.JMXListener; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -73,6 +76,11 @@ public class TestMetaTableMetrics { private static Configuration conf = null; private static int connectorPort = 61120; + final byte[] cf = Bytes.toBytes("info"); + final byte[] col = Bytes.toBytes("any"); + byte[] tablename; + final int nthreads = 20; + @BeforeClass public static void setupBeforeClass() throws Exception { @@ -224,4 +232,95 @@ public class TestMetaTableMetrics { assertEquals(5L, putWithClientMetricsCount); } + @Test(timeout = 30000) + public void testConcurrentAccess() { + try { + tablename = Bytes.toBytes("hbase:meta"); + int numRows = 3000; + int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename)); + putData(numRows); + Thread.sleep(2000); + int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename)); + assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows); + getData(numRows); + } catch (InterruptedException e) { + LOG.info("Caught InterruptedException while testConcurrentAccess: " + e.getMessage()); + fail(); + } catch (IOException e) { + LOG.info("Caught IOException while testConcurrentAccess: " + e.getMessage()); + fail(); + } + } + + public void putData(int nrows) throws InterruptedException { + LOG.info(String.format("Putting %d rows in hbase:meta", nrows)); + Thread[] threads = new Thread[nthreads]; + for (int i = 1; i <= nthreads; i++) { + threads[i - 1] = new PutThread(1, nrows); + } + startThreadsAndWaitToJoin(threads); + } + + public void getData(int nrows) throws InterruptedException { + LOG.info(String.format("Getting %d rows from hbase:meta", nrows)); + Thread[] threads = new Thread[nthreads]; + for (int i = 1; i <= nthreads; i++) { + threads[i - 1] = new GetThread(1, nrows); + } + startThreadsAndWaitToJoin(threads); + } + + private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException { + for (int i = 1; i <= nthreads; i++) { + threads[i - 1].start(); + } + for (int i = 1; i <= nthreads; i++) { + threads[i - 1].join(); + } + } + + class PutThread extends Thread { + int start; + int end; + + public PutThread(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public void run() { + try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { + for (int i = start; i <= end; i++) { + Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); + p.addColumn(cf, col, Bytes.toBytes("Value" + i)); + table.put(p); + } + } catch (IOException e) { + LOG.info("Caught IOException while PutThread operation: " + e.getMessage()); + } + } + } + + class GetThread extends Thread { + int start; + int end; + + public GetThread(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public void run() { + try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { + for (int i = start; i <= end; i++) { + Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); + table.get(get); + } + } catch (IOException e) { + LOG.info("Caught IOException while GetThread operation: " + e.getMessage()); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java index 11758be7f5e..e4f19391ec8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java @@ -38,15 +38,15 @@ public class TestLossyCounting { @Test public void testBucketSize() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize"); assertEquals(100L, lossyCounting.getBucketSize()); - LossyCounting lossyCounting2 = new LossyCounting(); + LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2"); assertEquals(50L, lossyCounting2.getBucketSize()); } @Test public void testAddByOne() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne"); for(int i = 0; i < 100; i++){ String key = "" + i; lossyCounting.addByOne(key); @@ -60,7 +60,7 @@ public class TestLossyCounting { @Test public void testSweep1() { - LossyCounting lossyCounting = new LossyCounting(0.01); + LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1"); for(int i = 0; i < 400; i++){ String key = "" + i; lossyCounting.addByOne(key); @@ -71,7 +71,7 @@ public class TestLossyCounting { @Test public void testSweep2() { - LossyCounting lossyCounting = new LossyCounting(0.1); + LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2"); for(int i = 0; i < 10; i++){ String key = "" + i; lossyCounting.addByOne(key);