Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
68912c6bac
commit
9e4aec56c3
|
@ -1,24 +1,29 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -31,6 +36,7 @@ import org.apache.hadoop.hbase.client.Row;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.metrics.MetricRegistry;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LossyCounting;
|
||||
|
||||
/**
|
||||
|
@ -48,15 +54,18 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
private MetricRegistry registry;
|
||||
private LossyCounting<String> clientMetricsLossyCounting, regionMetricsLossyCounting;
|
||||
private boolean active = false;
|
||||
private Set<String> metrics = new HashSet<String>();
|
||||
private Set<String> metrics = new HashSet<>();
|
||||
|
||||
enum MetaTableOps {
|
||||
GET, PUT, DELETE;
|
||||
GET, PUT, DELETE,
|
||||
}
|
||||
|
||||
private ImmutableMap<Class<?>, MetaTableOps> opsNameMap =
|
||||
ImmutableMap.<Class<?>, MetaTableOps>builder().put(Put.class, MetaTableOps.PUT)
|
||||
.put(Get.class, MetaTableOps.GET).put(Delete.class, MetaTableOps.DELETE).build();
|
||||
private ImmutableMap<Class<? extends Row>, MetaTableOps> opsNameMap =
|
||||
ImmutableMap.<Class<? extends Row>, MetaTableOps>builder()
|
||||
.put(Put.class, MetaTableOps.PUT)
|
||||
.put(Get.class, MetaTableOps.GET)
|
||||
.put(Delete.class, MetaTableOps.DELETE)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
|
||||
|
@ -93,13 +102,12 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
* @param op such as get, put or delete.
|
||||
*/
|
||||
private String getTableNameFromOp(Row op) {
|
||||
String tableName = null;
|
||||
String tableRowKey = new String(((Row) op).getRow(), StandardCharsets.UTF_8);
|
||||
final String tableRowKey = Bytes.toString(op.getRow());
|
||||
if (tableRowKey.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
tableName = tableRowKey.split(",").length > 0 ? tableRowKey.split(",")[0] : null;
|
||||
return tableName;
|
||||
final String[] splits = tableRowKey.split(",");
|
||||
return splits.length > 0 ? splits[0] : null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -108,13 +116,12 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
* @param op such as get, put or delete.
|
||||
*/
|
||||
private String getRegionIdFromOp(Row op) {
|
||||
String regionId = null;
|
||||
String tableRowKey = new String(((Row) op).getRow(), StandardCharsets.UTF_8);
|
||||
final String tableRowKey = Bytes.toString(op.getRow());
|
||||
if (tableRowKey.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
regionId = tableRowKey.split(",").length > 2 ? tableRowKey.split(",")[2] : null;
|
||||
return regionId;
|
||||
final String[] splits = tableRowKey.split(",");
|
||||
return splits.length > 2 ? splits[2] : null;
|
||||
}
|
||||
|
||||
private boolean isMetaTableOp(ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||
|
@ -258,8 +265,9 @@ public class MetaTableMetrics extends BaseRegionObserver {
|
|||
metrics.remove(key);
|
||||
}
|
||||
};
|
||||
clientMetricsLossyCounting = new LossyCounting<String>("clientMetaMetrics",listener);
|
||||
regionMetricsLossyCounting = new LossyCounting<String>("regionMetaMetrics",listener);
|
||||
final Configuration conf = regionCoprocessorEnv.getConfiguration();
|
||||
clientMetricsLossyCounting = new LossyCounting<>("clientMetaMetrics", conf, listener);
|
||||
regionMetricsLossyCounting = new LossyCounting<>("regionMetaMetrics", conf, listener);
|
||||
// only be active mode when this region holds meta table.
|
||||
active = true;
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -41,7 +40,7 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
|||
public MetricsUserAggregateImpl(Configuration conf) {
|
||||
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
|
||||
.getUserAggregate();
|
||||
userMetricLossyCounting = new LossyCounting<MetricsUserSource>("userMetrics",
|
||||
userMetricLossyCounting = new LossyCounting<>("userMetrics", conf,
|
||||
new LossyCounting.LossyCountingListener<MetricsUserSource>() {
|
||||
@Override
|
||||
public void sweep(MetricsUserSource key) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.util;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -29,8 +28,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -46,7 +44,6 @@ import org.slf4j.LoggerFactory;
|
|||
* Based on paper:
|
||||
* http://www.vldb.org/conf/2002/S10P03.pdf
|
||||
*/
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class LossyCounting<T> {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
|
||||
|
@ -56,14 +53,18 @@ public class LossyCounting<T> {
|
|||
private Map<T, Integer> data;
|
||||
private long totalDataCount;
|
||||
private final String name;
|
||||
private LossyCountingListener listener;
|
||||
private static AtomicReference<Future> fut = new AtomicReference<>(null);
|
||||
private LossyCountingListener<T> listener;
|
||||
private static AtomicReference<Future<?>> fut = new AtomicReference<>(null);
|
||||
|
||||
public interface LossyCountingListener<T> {
|
||||
void sweep(T key);
|
||||
}
|
||||
|
||||
public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
|
||||
LossyCounting(String name, double errorRate) {
|
||||
this(name, errorRate, null);
|
||||
}
|
||||
|
||||
public LossyCounting(String name, double errorRate, LossyCountingListener<T> listener) {
|
||||
if (errorRate < 0.0 || errorRate > 1.0) {
|
||||
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
|
||||
}
|
||||
|
@ -78,9 +79,12 @@ public class LossyCounting<T> {
|
|||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build());
|
||||
}
|
||||
|
||||
public LossyCounting(String name, LossyCountingListener listener) {
|
||||
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
|
||||
name, listener);
|
||||
LossyCounting(String name, Configuration conf) {
|
||||
this(name, conf, null);
|
||||
}
|
||||
|
||||
public LossyCounting(String name, Configuration conf, LossyCountingListener<T> listener) {
|
||||
this(name, conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), listener);
|
||||
}
|
||||
|
||||
private void addByOne(T key) {
|
||||
|
@ -101,7 +105,7 @@ public class LossyCounting<T> {
|
|||
if(totalDataCount % bucketSize == 0) {
|
||||
//sweep the entries at bucket boundaries
|
||||
//run Sweep
|
||||
Future future = fut.get();
|
||||
Future<?> future = fut.get();
|
||||
if (future != null && !future.isDone()){
|
||||
return;
|
||||
}
|
||||
|
@ -166,7 +170,7 @@ public class LossyCounting<T> {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting public Future getSweepFuture() {
|
||||
@VisibleForTesting public Future<?> getSweepFuture() {
|
||||
return fut.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -20,7 +20,9 @@
|
|||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
|
@ -29,31 +31,33 @@ import org.junit.experimental.categories.Category;
|
|||
@Category({MiscTests.class, SmallTests.class})
|
||||
public class TestLossyCounting {
|
||||
|
||||
private final Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
@Test
|
||||
public void testBucketSize() {
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null);
|
||||
LossyCounting<?> lossyCounting = new LossyCounting<>("testBucketSize", 0.01);
|
||||
assertEquals(100L, lossyCounting.getBucketSize());
|
||||
LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null);
|
||||
LossyCounting<?> lossyCounting2 = new LossyCounting<>("testBucketSize2", conf);
|
||||
assertEquals(50L, lossyCounting2.getBucketSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddByOne() {
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null);
|
||||
for(int i = 0; i < 100; i++){
|
||||
LossyCounting<String> lossyCounting = new LossyCounting<>("testAddByOne", 0.01);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
String key = "" + i;
|
||||
lossyCounting.add(key);
|
||||
}
|
||||
assertEquals(100L, lossyCounting.getDataSize());
|
||||
for(int i = 0; i < 100; i++){
|
||||
for (int i = 0; i < 100; i++) {
|
||||
String key = "" + i;
|
||||
assertEquals(true, lossyCounting.contains(key));
|
||||
assertTrue(lossyCounting.contains(key));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSweep1() {
|
||||
LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null);
|
||||
public void testSweep1() throws Exception {
|
||||
LossyCounting<String> lossyCounting = new LossyCounting<>("testSweep1", 0.01);
|
||||
for(int i = 0; i < 400; i++){
|
||||
String key = "" + i;
|
||||
lossyCounting.add(key);
|
||||
|
@ -66,22 +70,19 @@ public class TestLossyCounting {
|
|||
assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
|
||||
}
|
||||
|
||||
private void waitForSweep(LossyCounting<Object> lossyCounting) {
|
||||
private void waitForSweep(LossyCounting<?> lossyCounting) throws InterruptedException {
|
||||
//wait for sweep thread to complete
|
||||
int retry = 0;
|
||||
while (!lossyCounting.getSweepFuture().isDone() && retry < 10) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
Thread.sleep(100);
|
||||
retry++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSweep2() {
|
||||
LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null);
|
||||
for(int i = 0; i < 10; i++){
|
||||
public void testSweep2() throws Exception {
|
||||
LossyCounting<String> lossyCounting = new LossyCounting<>("testSweep2", 0.1);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String key = "" + i;
|
||||
lossyCounting.add(key);
|
||||
}
|
||||
|
@ -94,5 +95,4 @@ public class TestLossyCounting {
|
|||
waitForSweep(lossyCounting);
|
||||
assertEquals(1L, lossyCounting.getDataSize());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue