HBASE-23054 Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm

This commit is contained in:
Ankit Singhal 2019-09-28 09:29:17 -07:00 committed by stack
parent ef79b40e6e
commit 836368189a
4 changed files with 110 additions and 131 deletions

View File

@ -1,23 +1,31 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
* agreements. See the NOTICE file distributed with this work for additional information regarding * Licensed to the Apache Software Foundation (ASF) under one
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * or more contributor license agreements. See the NOTICE file
* "License"); you may not use this file except in compliance with the License. You may obtain a * distributed with this work for additional information
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable * regarding copyright ownership. The ASF licenses this file
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS" * to you under the Apache License, Version 2.0 (the
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License * "License"); you may not use this file except in compliance
* for the specific language governing permissions and limitations under the License. * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -27,12 +35,11 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.metrics.Meter;
import org.apache.hadoop.hbase.metrics.Metric;
import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.util.LossyCounting; import org.apache.hadoop.hbase.util.LossyCounting;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@ -49,10 +56,10 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
public class MetaTableMetrics implements RegionCoprocessor { public class MetaTableMetrics implements RegionCoprocessor {
private ExampleRegionObserverMeta observer; private ExampleRegionObserverMeta observer;
private Map<String, Optional<Metric>> requestsMap;
private MetricRegistry registry; private MetricRegistry registry;
private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
private boolean active = false; private boolean active = false;
private Set<String> metrics = new HashSet<String>();
enum MetaTableOps { enum MetaTableOps {
GET, PUT, DELETE; GET, PUT, DELETE;
@ -101,66 +108,6 @@ public class MetaTableMetrics implements RegionCoprocessor {
opWithClientMetricRegisterAndMark(row); opWithClientMetricRegisterAndMark(row);
} }
private void markMeterIfPresent(String requestMeter) {
if (requestMeter.isEmpty()) {
return;
}
Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
if (optionalMetric != null && optionalMetric.isPresent()) {
Meter metric = (Meter) optionalMetric.get();
metric.mark();
}
}
private void registerMeterIfNotPresent(String requestMeter) {
if (requestMeter.isEmpty()) {
return;
}
if (!requestsMap.containsKey(requestMeter)) {
registry.meter(requestMeter);
requestsMap.put(requestMeter, registry.get(requestMeter));
}
}
/**
* Registers and counts lossyCount for Meters that kept by lossy counting.
* By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
* e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
* also, all kept elements have frequency higher than e * N. (N is total count)
* @param requestMeter meter to be registered
* @param lossyCounting lossyCounting object for one type of meters.
*/
private void registerLossyCountingMeterIfNotPresent(String requestMeter,
LossyCounting lossyCounting) {
if (requestMeter.isEmpty()) {
return;
}
synchronized (lossyCounting) {
Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
boolean isNewMeter = !requestsMap.containsKey(requestMeter);
boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
if (isNewMeter) {
if (requestMeterRemoved) {
// if the new metric is swept off by lossyCounting then don't add in the map
metersToBeRemoved.remove(requestMeter);
} else {
// else register the new metric and add in the map
registry.meter(requestMeter);
requestsMap.put(requestMeter, registry.get(requestMeter));
}
}
for (String meter : metersToBeRemoved) {
//cleanup requestsMap according to the swept data from lossy count;
requestsMap.remove(meter);
registry.remove(meter);
}
}
}
/** /**
* Get table name from Ops such as: get, put, delete. * Get table name from Ops such as: get, put, delete.
* @param op such as get, put or delete. * @param op such as get, put or delete.
@ -196,13 +143,13 @@ public class MetaTableMetrics implements RegionCoprocessor {
private void clientMetricRegisterAndMark() { private void clientMetricRegisterAndMark() {
// Mark client metric // Mark client metric
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : ""; String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null;
if (clientIP == null || clientIP.isEmpty()) { if (clientIP == null || clientIP.isEmpty()) {
return; return;
} }
String clientRequestMeter = clientRequestMeterName(clientIP); String clientRequestMeter = clientRequestMeterName(clientIP);
registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting); clientMetricsLossyCounting.add(clientRequestMeter);
markMeterIfPresent(clientRequestMeter); registerAndMarkMeter(clientRequestMeter);
} }
private void tableMetricRegisterAndMark(Row op) { private void tableMetricRegisterAndMark(Row op) {
@ -212,7 +159,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
return; return;
} }
String tableRequestMeter = tableMeterName(tableName); String tableRequestMeter = tableMeterName(tableName);
registerAndMarkMeterIfNotPresent(tableRequestMeter); registerAndMarkMeter(tableRequestMeter);
} }
private void regionMetricRegisterAndMark(Row op) { private void regionMetricRegisterAndMark(Row op) {
@ -222,8 +169,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
return; return;
} }
String regionRequestMeter = regionMeterName(regionId); String regionRequestMeter = regionMeterName(regionId);
registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting); regionMetricsLossyCounting.add(regionRequestMeter);
markMeterIfPresent(regionRequestMeter); registerAndMarkMeter(regionRequestMeter);
} }
private void opMetricRegisterAndMark(Row op) { private void opMetricRegisterAndMark(Row op) {
@ -232,7 +179,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
if (opMeterName == null || opMeterName.isEmpty()) { if (opMeterName == null || opMeterName.isEmpty()) {
return; return;
} }
registerAndMarkMeterIfNotPresent(opMeterName); registerAndMarkMeter(opMeterName);
} }
private void opWithClientMetricRegisterAndMark(Object op) { private void opWithClientMetricRegisterAndMark(Object op) {
@ -241,13 +188,18 @@ public class MetaTableMetrics implements RegionCoprocessor {
if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) { if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
return; return;
} }
registerAndMarkMeterIfNotPresent(opWithClientMeterName); registerAndMarkMeter(opWithClientMeterName);
} }
// Helper function to register and mark meter if not present // Helper function to register and mark meter if not present
private void registerAndMarkMeterIfNotPresent(String name) { private void registerAndMarkMeter(String requestMeter) {
registerMeterIfNotPresent(name); if (requestMeter.isEmpty()) {
markMeterIfPresent(name); return;
}
if(!registry.get(requestMeter).isPresent()){
metrics.add(requestMeter);
}
registry.meter(requestMeter).mark();
} }
private String opWithClientMeterName(Object op) { private String opWithClientMeterName(Object op) {
@ -327,9 +279,14 @@ public class MetaTableMetrics implements RegionCoprocessor {
.equals(TableName.META_TABLE_NAME)) { .equals(TableName.META_TABLE_NAME)) {
RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
requestsMap = new ConcurrentHashMap<>(); LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){
clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics"); @Override public void sweep(String key) {
regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics"); registry.remove(key);
metrics.remove(key);
}
};
clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics",listener);
regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics",listener);
// only be active mode when this region holds meta table. // only be active mode when this region holds meta table.
active = true; active = true;
} }
@ -338,10 +295,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
@Override @Override
public void stop(CoprocessorEnvironment env) throws IOException { public void stop(CoprocessorEnvironment env) throws IOException {
// since meta region can move around, clear stale metrics when stop. // since meta region can move around, clear stale metrics when stop.
if (requestsMap != null) { for(String metric:metrics){
for (String meterName : requestsMap.keySet()) { registry.remove(metric);
registry.remove(meterName);
}
} }
} }
} }

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -47,13 +46,18 @@ import org.slf4j.LoggerFactory;
public class LossyCounting { public class LossyCounting {
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
private long bucketSize; private long bucketSize;
private long currentTerm; private int currentTerm;
private double errorRate; private double errorRate;
private Map<String, Integer> data; private Map<String, Integer> data;
private long totalDataCount; private long totalDataCount;
private String name; private String name;
private LossyCountingListener listener;
public LossyCounting(double errorRate, String name) { public interface LossyCountingListener {
void sweep(String key);
}
public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
this.errorRate = errorRate; this.errorRate = errorRate;
this.name = name; this.name = name;
if (errorRate < 0.0 || errorRate > 1.0) { if (errorRate < 0.0 || errorRate > 1.0) {
@ -63,48 +67,55 @@ public class LossyCounting {
this.currentTerm = 1; this.currentTerm = 1;
this.totalDataCount = 0; this.totalDataCount = 0;
this.data = new ConcurrentHashMap<>(); this.data = new ConcurrentHashMap<>();
this.listener = listener;
calculateCurrentTerm(); calculateCurrentTerm();
} }
public LossyCounting(String name) { public LossyCounting(String name, LossyCountingListener listener) {
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
name); name, listener);
} }
public Set<String> addByOne(String key) { private void addByOne(String key) {
data.put(key, data.getOrDefault(key, 0) + 1); //If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
//we create a new entry starting with currentTerm so that it will not be pruned immediately
data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
//update totalDataCount and term
totalDataCount++; totalDataCount++;
calculateCurrentTerm(); calculateCurrentTerm();
Set<String> dataToBeSwept = new HashSet<>(); }
public void add(String key) {
addByOne(key);
if(totalDataCount % bucketSize == 0) { if(totalDataCount % bucketSize == 0) {
dataToBeSwept = sweep(); //sweep the entries at bucket boundaries
sweep();
} }
return dataToBeSwept;
} }
/** /**
* sweep low frequency data * sweep low frequency data
* @return Names of elements got swept * @return Names of elements got swept
*/ */
private Set<String> sweep() { private void sweep() {
Set<String> dataToBeSwept = new HashSet<>();
for(Map.Entry<String, Integer> entry : data.entrySet()) { for(Map.Entry<String, Integer> entry : data.entrySet()) {
if(entry.getValue() + errorRate < currentTerm) { if(entry.getValue() < currentTerm) {
dataToBeSwept.add(entry.getKey()); String metric = entry.getKey();
data.remove(metric);
if (listener != null) {
listener.sweep(metric);
} }
} }
for(String key : dataToBeSwept) {
data.remove(key);
} }
LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
return dataToBeSwept;
} }
/** /**
* Calculate and set current term * Calculate and set current term
*/ */
private void calculateCurrentTerm() { private void calculateCurrentTerm() {
this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize); this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
} }
public long getBucketSize(){ public long getBucketSize(){
@ -119,6 +130,10 @@ public class LossyCounting {
return data.containsKey(key); return data.containsKey(key);
} }
public Set<String> getElements(){
return data.keySet();
}
public long getCurrentTerm() { public long getCurrentTerm() {
return currentTerm; return currentTerm;
} }

View File

@ -1,12 +1,20 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
* agreements. See the NOTICE file distributed with this work for additional information regarding * Licensed to the Apache Software Foundation (ASF) under one
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * or more contributor license agreements. See the NOTICE file
* "License"); you may not use this file except in compliance with the License. You may obtain a * distributed with this work for additional information
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable * regarding copyright ownership. The ASF licenses this file
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS" * to you under the Apache License, Version 2.0 (the
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License * "License"); you may not use this file except in compliance
* for the specific language governing permissions and limitations under the License. * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;

View File

@ -38,18 +38,18 @@ public class TestLossyCounting {
@Test @Test
public void testBucketSize() { public void testBucketSize() {
LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize"); LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null);
assertEquals(100L, lossyCounting.getBucketSize()); assertEquals(100L, lossyCounting.getBucketSize());
LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2"); LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null);
assertEquals(50L, lossyCounting2.getBucketSize()); assertEquals(50L, lossyCounting2.getBucketSize());
} }
@Test @Test
public void testAddByOne() { public void testAddByOne() {
LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne"); LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null);
for(int i = 0; i < 100; i++){ for(int i = 0; i < 100; i++){
String key = "" + i; String key = "" + i;
lossyCounting.addByOne(key); lossyCounting.add(key);
} }
assertEquals(100L, lossyCounting.getDataSize()); assertEquals(100L, lossyCounting.getDataSize());
for(int i = 0; i < 100; i++){ for(int i = 0; i < 100; i++){
@ -60,26 +60,27 @@ public class TestLossyCounting {
@Test @Test
public void testSweep1() { public void testSweep1() {
LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1"); LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null);
for(int i = 0; i < 400; i++){ for(int i = 0; i < 400; i++){
String key = "" + i; String key = "" + i;
lossyCounting.addByOne(key); lossyCounting.add(key);
} }
assertEquals(4L, lossyCounting.getCurrentTerm()); assertEquals(4L, lossyCounting.getCurrentTerm());
assertEquals(0L, lossyCounting.getDataSize()); //if total rows added are proportional to bucket size
assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
} }
@Test @Test
public void testSweep2() { public void testSweep2() {
LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2"); LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null);
for(int i = 0; i < 10; i++){ for(int i = 0; i < 10; i++){
String key = "" + i; String key = "" + i;
lossyCounting.addByOne(key); lossyCounting.add(key);
} }
assertEquals(10L, lossyCounting.getDataSize()); assertEquals(10L, lossyCounting.getDataSize());
for(int i = 0; i < 10; i++){ for(int i = 0; i < 10; i++){
String key = "1"; String key = "1";
lossyCounting.addByOne(key); lossyCounting.add(key);
} }
assertEquals(1L, lossyCounting.getDataSize()); assertEquals(1L, lossyCounting.getDataSize());
} }