diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java index f9f6d675a11..e1eb0941e3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; - +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.TableName; @@ -36,13 +36,12 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LossyCounting; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; - /** * A coprocessor that collects metrics from meta table. *

@@ -57,16 +56,16 @@ public class MetaTableMetrics implements RegionCoprocessor { private ExampleRegionObserverMeta observer; private MetricRegistry registry; - private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; + private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private boolean active = false; - private Set metrics = new HashSet(); + private Set metrics = new HashSet<>(); enum MetaTableOps { - GET, PUT, DELETE; + GET, PUT, DELETE, } - private ImmutableMap opsNameMap = - ImmutableMap.builder() + private ImmutableMap, MetaTableOps> opsNameMap = + ImmutableMap., MetaTableOps>builder() .put(Put.class, MetaTableOps.PUT) .put(Get.class, MetaTableOps.GET) .put(Delete.class, MetaTableOps.DELETE) @@ -93,7 +92,7 @@ public class MetaTableMetrics implements RegionCoprocessor { @Override public void preDelete(ObserverContext e, Delete delete, - WALEdit edit, Durability durability) throws IOException { + WALEdit edit, Durability durability) { registerAndMarkMetrics(e, delete); } @@ -113,13 +112,12 @@ public class MetaTableMetrics implements RegionCoprocessor { * @param op such as get, put or delete. */ private String getTableNameFromOp(Row op) { - String tableName = null; - String tableRowKey = new String(((Row) op).getRow(), StandardCharsets.UTF_8); - if (tableRowKey.isEmpty()) { + final String tableRowKey = Bytes.toString(op.getRow()); + if (StringUtils.isEmpty(tableRowKey)) { return null; } - tableName = tableRowKey.split(",").length > 0 ? tableRowKey.split(",")[0] : null; - return tableName; + final String[] splits = tableRowKey.split(","); + return splits.length > 0 ? splits[0] : null; } /** @@ -127,13 +125,12 @@ public class MetaTableMetrics implements RegionCoprocessor { * @param op such as get, put or delete. */ private String getRegionIdFromOp(Row op) { - String regionId = null; - String tableRowKey = new String(((Row) op).getRow(), StandardCharsets.UTF_8); - if (tableRowKey.isEmpty()) { + final String tableRowKey = Bytes.toString(op.getRow()); + if (StringUtils.isEmpty(tableRowKey)) { return null; } - regionId = tableRowKey.split(",").length > 2 ? tableRowKey.split(",")[2] : null; - return regionId; + final String[] splits = tableRowKey.split(","); + return splits.length > 2 ? splits[2] : null; } private boolean isMetaTableOp(ObserverContext e) { @@ -279,13 +276,13 @@ public class MetaTableMetrics implements RegionCoprocessor { .equals(TableName.META_TABLE_NAME)) { RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); - LossyCounting.LossyCountingListener listener = - (LossyCounting.LossyCountingListener) key -> { - registry.remove(key); - metrics.remove(key); - }; - clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics",listener); - regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics",listener); + LossyCounting.LossyCountingListener listener = key -> { + registry.remove(key); + metrics.remove(key); + }; + final Configuration conf = regionCoprocessorEnv.getConfiguration(); + clientMetricsLossyCounting = new LossyCounting<>("clientMetaMetrics", conf, listener); + regionMetricsLossyCounting = new LossyCounting<>("regionMetaMetrics", conf, listener); // only be active mode when this region holds meta table. active = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java index b457c75affe..46bad6649f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.net.InetAddress; import java.util.Optional; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -37,13 +36,12 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ private final UserProvider userProvider; private final MetricsUserAggregateSource source; - private final LossyCounting userMetricLossyCounting; + private final LossyCounting userMetricLossyCounting; public MetricsUserAggregateImpl(Configuration conf) { source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) .getUserAggregate(); - userMetricLossyCounting = new LossyCounting("userMetrics", - (LossyCounting.LossyCountingListener) key -> source.deregister(key)); + userMetricLossyCounting = new LossyCounting<>("userMetrics", conf, source::deregister); this.userProvider = UserProvider.instantiate(conf); } @@ -61,7 +59,7 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ } catch (IOException ignore) { } } - return user.isPresent() ? user.get().getShortName() : null; + return user.map(User::getShortName).orElse(null); } @Override @@ -82,10 +80,7 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ private String getClient() { Optional ipOptional = RpcServer.getRemoteAddress(); - if (ipOptional.isPresent()) { - return ipOptional.get().getHostName(); - } - return null; + return ipOptional.map(InetAddress::getHostName).orElse(null); } private void incrementClientReadMetrics(MetricsUserSource userSource) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java index be9bf42b6cf..9d7cb566c65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -26,13 +26,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; - -import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -46,26 +44,27 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * Based on paper: * http://www.vldb.org/conf/2002/S10P03.pdf */ - @InterfaceAudience.Private public class LossyCounting { private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); private final ExecutorService executor; private long bucketSize; private int currentTerm; - private double errorRate; private Map data; private long totalDataCount; private final String name; - private LossyCountingListener listener; - private static AtomicReference fut = new AtomicReference<>(null); + private LossyCountingListener listener; + private static AtomicReference> fut = new AtomicReference<>(null); public interface LossyCountingListener { void sweep(T key); } - public LossyCounting(double errorRate, String name, LossyCountingListener listener) { - this.errorRate = errorRate; + LossyCounting(String name, double errorRate) { + this(name, errorRate, null); + } + + public LossyCounting(String name, double errorRate, LossyCountingListener listener) { this.name = name; if (errorRate < 0.0 || errorRate > 1.0) { throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); @@ -80,9 +79,12 @@ public class LossyCounting { new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build()); } - public LossyCounting(String name, LossyCountingListener listener) { - this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), - name, listener); + LossyCounting(String name, Configuration conf) { + this(name, conf, null); + } + + public LossyCounting(String name, Configuration conf, LossyCountingListener listener) { + this(name, conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), listener); } private void addByOne(T key) { @@ -100,7 +102,7 @@ public class LossyCounting { if(totalDataCount % bucketSize == 0) { //sweep the entries at bucket boundaries //run Sweep - Future future = fut.get(); + Future future = fut.get(); if (future != null && !future.isDone()){ return; } @@ -166,7 +168,7 @@ public class LossyCounting { } } - @VisibleForTesting public Future getSweepFuture() { + @VisibleForTesting public Future getSweepFuture() { return fut.get(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java index 050d2e5bc1e..b6c0ddf0f12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.util; import static org.junit.Assert.assertEquals; - +import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.ClassRule; @@ -35,31 +37,33 @@ public class TestLossyCounting { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestLossyCounting.class); + private final Configuration conf = HBaseConfiguration.create(); + @Test public void testBucketSize() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null); + LossyCounting lossyCounting = new LossyCounting<>("testBucketSize", 0.01); assertEquals(100L, lossyCounting.getBucketSize()); - LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null); + LossyCounting lossyCounting2 = new LossyCounting<>("testBucketSize2", conf); assertEquals(50L, lossyCounting2.getBucketSize()); } @Test public void testAddByOne() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null); - for(int i = 0; i < 100; i++){ + LossyCounting lossyCounting = new LossyCounting<>("testAddByOne", 0.01); + for (int i = 0; i < 100; i++) { String key = "" + i; lossyCounting.add(key); } assertEquals(100L, lossyCounting.getDataSize()); - for(int i = 0; i < 100; i++){ + for (int i = 0; i < 100; i++) { String key = "" + i; - assertEquals(true, lossyCounting.contains(key)); + assertTrue(lossyCounting.contains(key)); } } @Test - public void testSweep1() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null); + public void testSweep1() throws Exception { + LossyCounting lossyCounting = new LossyCounting<>("testSweep1", 0.01); for(int i = 0; i < 400; i++){ String key = "" + i; lossyCounting.add(key); @@ -72,22 +76,19 @@ public class TestLossyCounting { assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize()); } - private void waitForSweep(LossyCounting lossyCounting) { + private void waitForSweep(LossyCounting lossyCounting) throws InterruptedException { //wait for sweep thread to complete int retry = 0; while (!lossyCounting.getSweepFuture().isDone() && retry < 10) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } + Thread.sleep(100); retry++; } } @Test - public void testSweep2() { - LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null); - for(int i = 0; i < 10; i++){ + public void testSweep2() throws Exception { + LossyCounting lossyCounting = new LossyCounting<>("testSweep2", 0.1); + for (int i = 0; i < 10; i++) { String key = "" + i; lossyCounting.add(key); } @@ -100,6 +101,4 @@ public class TestLossyCounting { waitForSweep(lossyCounting); assertEquals(1L, lossyCounting.getDataSize()); } - - -} \ No newline at end of file +}