HBASE-23802 Remove unnecessary Configuration instantiation in LossyAccounting (#1127)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Nick Dimiduk 2020-02-05 14:41:51 -08:00 committed by GitHub
parent b49ec58073
commit 5b4545de5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 74 deletions

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -20,12 +20,12 @@
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -36,13 +36,12 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LossyCounting; import org.apache.hadoop.hbase.util.LossyCounting;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
/** /**
* A coprocessor that collects metrics from meta table. * A coprocessor that collects metrics from meta table.
* <p> * <p>
@ -57,16 +56,16 @@ public class MetaTableMetrics implements RegionCoprocessor {
private ExampleRegionObserverMeta observer; private ExampleRegionObserverMeta observer;
private MetricRegistry registry; private MetricRegistry registry;
private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private LossyCounting<String> clientMetricsLossyCounting, regionMetricsLossyCounting;
private boolean active = false; private boolean active = false;
private Set<String> metrics = new HashSet<String>(); private Set<String> metrics = new HashSet<>();
enum MetaTableOps { enum MetaTableOps {
GET, PUT, DELETE; GET, PUT, DELETE,
} }
private ImmutableMap<Class, MetaTableOps> opsNameMap = private ImmutableMap<Class<? extends Row>, MetaTableOps> opsNameMap =
ImmutableMap.<Class, MetaTableOps>builder() ImmutableMap.<Class<? extends Row>, MetaTableOps>builder()
.put(Put.class, MetaTableOps.PUT) .put(Put.class, MetaTableOps.PUT)
.put(Get.class, MetaTableOps.GET) .put(Get.class, MetaTableOps.GET)
.put(Delete.class, MetaTableOps.DELETE) .put(Delete.class, MetaTableOps.DELETE)
@ -93,7 +92,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
@Override @Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
WALEdit edit, Durability durability) throws IOException { WALEdit edit, Durability durability) {
registerAndMarkMetrics(e, delete); registerAndMarkMetrics(e, delete);
} }
@ -113,13 +112,12 @@ public class MetaTableMetrics implements RegionCoprocessor {
* @param op such as get, put or delete. * @param op such as get, put or delete.
*/ */
private String getTableNameFromOp(Row op) { private String getTableNameFromOp(Row op) {
String tableName = null; final String tableRowKey = Bytes.toString(op.getRow());
String tableRowKey = new String(((Row) op).getRow(), StandardCharsets.UTF_8); if (StringUtils.isEmpty(tableRowKey)) {
if (tableRowKey.isEmpty()) {
return null; return null;
} }
tableName = tableRowKey.split(",").length > 0 ? tableRowKey.split(",")[0] : null; final String[] splits = tableRowKey.split(",");
return tableName; return splits.length > 0 ? splits[0] : null;
} }
/** /**
@ -127,13 +125,12 @@ public class MetaTableMetrics implements RegionCoprocessor {
* @param op such as get, put or delete. * @param op such as get, put or delete.
*/ */
private String getRegionIdFromOp(Row op) { private String getRegionIdFromOp(Row op) {
String regionId = null; final String tableRowKey = Bytes.toString(op.getRow());
String tableRowKey = new String(((Row) op).getRow(), StandardCharsets.UTF_8); if (StringUtils.isEmpty(tableRowKey)) {
if (tableRowKey.isEmpty()) {
return null; return null;
} }
regionId = tableRowKey.split(",").length > 2 ? tableRowKey.split(",")[2] : null; final String[] splits = tableRowKey.split(",");
return regionId; return splits.length > 2 ? splits[2] : null;
} }
private boolean isMetaTableOp(ObserverContext<RegionCoprocessorEnvironment> e) { private boolean isMetaTableOp(ObserverContext<RegionCoprocessorEnvironment> e) {
@ -279,13 +276,13 @@ public class MetaTableMetrics implements RegionCoprocessor {
.equals(TableName.META_TABLE_NAME)) { .equals(TableName.META_TABLE_NAME)) {
RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
LossyCounting.LossyCountingListener listener = LossyCounting.LossyCountingListener<String> listener = key -> {
(LossyCounting.LossyCountingListener<String>) key -> {
registry.remove(key); registry.remove(key);
metrics.remove(key); metrics.remove(key);
}; };
clientMetricsLossyCounting = new LossyCounting<String>("clientMetaMetrics",listener); final Configuration conf = regionCoprocessorEnv.getConfiguration();
regionMetricsLossyCounting = new LossyCounting<String>("regionMetaMetrics",listener); clientMetricsLossyCounting = new LossyCounting<>("clientMetaMetrics", conf, listener);
regionMetricsLossyCounting = new LossyCounting<>("regionMetaMetrics", conf, listener);
// only be active mode when this region holds meta table. // only be active mode when this region holds meta table.
active = true; active = true;
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Optional; import java.util.Optional;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
@ -37,13 +36,12 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
private final UserProvider userProvider; private final UserProvider userProvider;
private final MetricsUserAggregateSource source; private final MetricsUserAggregateSource source;
private final LossyCounting userMetricLossyCounting; private final LossyCounting<MetricsUserSource> userMetricLossyCounting;
public MetricsUserAggregateImpl(Configuration conf) { public MetricsUserAggregateImpl(Configuration conf) {
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.getUserAggregate(); .getUserAggregate();
userMetricLossyCounting = new LossyCounting<MetricsUserSource>("userMetrics", userMetricLossyCounting = new LossyCounting<>("userMetrics", conf, source::deregister);
(LossyCounting.LossyCountingListener<MetricsUserSource>) key -> source.deregister(key));
this.userProvider = UserProvider.instantiate(conf); this.userProvider = UserProvider.instantiate(conf);
} }
@ -61,7 +59,7 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
} catch (IOException ignore) { } catch (IOException ignore) {
} }
} }
return user.isPresent() ? user.get().getShortName() : null; return user.map(User::getShortName).orElse(null);
} }
@Override @Override
@ -82,10 +80,7 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
private String getClient() { private String getClient() {
Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress(); Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress();
if (ipOptional.isPresent()) { return ipOptional.map(InetAddress::getHostName).orElse(null);
return ipOptional.get().getHostName();
}
return null;
} }
private void incrementClientReadMetrics(MetricsUserSource userSource) { private void incrementClientReadMetrics(MetricsUserSource userSource) {

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -26,13 +26,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -46,26 +44,27 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* Based on paper: * Based on paper:
* http://www.vldb.org/conf/2002/S10P03.pdf * http://www.vldb.org/conf/2002/S10P03.pdf
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class LossyCounting<T> { public class LossyCounting<T> {
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
private final ExecutorService executor; private final ExecutorService executor;
private long bucketSize; private long bucketSize;
private int currentTerm; private int currentTerm;
private double errorRate;
private Map<T, Integer> data; private Map<T, Integer> data;
private long totalDataCount; private long totalDataCount;
private final String name; private final String name;
private LossyCountingListener listener; private LossyCountingListener<T> listener;
private static AtomicReference<Future> fut = new AtomicReference<>(null); private static AtomicReference<Future<?>> fut = new AtomicReference<>(null);
public interface LossyCountingListener<T> { public interface LossyCountingListener<T> {
void sweep(T key); void sweep(T key);
} }
public LossyCounting(double errorRate, String name, LossyCountingListener listener) { LossyCounting(String name, double errorRate) {
this.errorRate = errorRate; this(name, errorRate, null);
}
public LossyCounting(String name, double errorRate, LossyCountingListener<T> listener) {
this.name = name; this.name = name;
if (errorRate < 0.0 || errorRate > 1.0) { if (errorRate < 0.0 || errorRate > 1.0) {
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
@ -80,9 +79,12 @@ public class LossyCounting<T> {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build()); new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build());
} }
public LossyCounting(String name, LossyCountingListener listener) { LossyCounting(String name, Configuration conf) {
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), this(name, conf, null);
name, listener); }
public LossyCounting(String name, Configuration conf, LossyCountingListener<T> listener) {
this(name, conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), listener);
} }
private void addByOne(T key) { private void addByOne(T key) {
@ -100,7 +102,7 @@ public class LossyCounting<T> {
if(totalDataCount % bucketSize == 0) { if(totalDataCount % bucketSize == 0) {
//sweep the entries at bucket boundaries //sweep the entries at bucket boundaries
//run Sweep //run Sweep
Future future = fut.get(); Future<?> future = fut.get();
if (future != null && !future.isDone()){ if (future != null && !future.isDone()){
return; return;
} }
@ -166,7 +168,7 @@ public class LossyCounting<T> {
} }
} }
@VisibleForTesting public Future getSweepFuture() { @VisibleForTesting public Future<?> getSweepFuture() {
return fut.get(); return fut.get();
} }
} }

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -20,8 +20,10 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -35,17 +37,19 @@ public class TestLossyCounting {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLossyCounting.class); HBaseClassTestRule.forClass(TestLossyCounting.class);
private final Configuration conf = HBaseConfiguration.create();
@Test @Test
public void testBucketSize() { public void testBucketSize() {
LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null); LossyCounting<?> lossyCounting = new LossyCounting<>("testBucketSize", 0.01);
assertEquals(100L, lossyCounting.getBucketSize()); assertEquals(100L, lossyCounting.getBucketSize());
LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null); LossyCounting<?> lossyCounting2 = new LossyCounting<>("testBucketSize2", conf);
assertEquals(50L, lossyCounting2.getBucketSize()); assertEquals(50L, lossyCounting2.getBucketSize());
} }
@Test @Test
public void testAddByOne() { public void testAddByOne() {
LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null); LossyCounting<String> lossyCounting = new LossyCounting<>("testAddByOne", 0.01);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
String key = "" + i; String key = "" + i;
lossyCounting.add(key); lossyCounting.add(key);
@ -53,13 +57,13 @@ public class TestLossyCounting {
assertEquals(100L, lossyCounting.getDataSize()); assertEquals(100L, lossyCounting.getDataSize());
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
String key = "" + i; String key = "" + i;
assertEquals(true, lossyCounting.contains(key)); assertTrue(lossyCounting.contains(key));
} }
} }
@Test @Test
public void testSweep1() { public void testSweep1() throws Exception {
LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null); LossyCounting<String> lossyCounting = new LossyCounting<>("testSweep1", 0.01);
for(int i = 0; i < 400; i++){ for(int i = 0; i < 400; i++){
String key = "" + i; String key = "" + i;
lossyCounting.add(key); lossyCounting.add(key);
@ -72,21 +76,18 @@ public class TestLossyCounting {
assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize()); assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
} }
private void waitForSweep(LossyCounting<Object> lossyCounting) { private void waitForSweep(LossyCounting<?> lossyCounting) throws InterruptedException {
//wait for sweep thread to complete //wait for sweep thread to complete
int retry = 0; int retry = 0;
while (!lossyCounting.getSweepFuture().isDone() && retry < 10) { while (!lossyCounting.getSweepFuture().isDone() && retry < 10) {
try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) {
}
retry++; retry++;
} }
} }
@Test @Test
public void testSweep2() { public void testSweep2() throws Exception {
LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null); LossyCounting<String> lossyCounting = new LossyCounting<>("testSweep2", 0.1);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String key = "" + i; String key = "" + i;
lossyCounting.add(key); lossyCounting.add(key);
@ -100,6 +101,4 @@ public class TestLossyCounting {
waitForSweep(lossyCounting); waitForSweep(lossyCounting);
assertEquals(1L, lossyCounting.getDataSize()); assertEquals(1L, lossyCounting.getDataSize());
} }
} }