HBASE-21800: RegionServer aborted due to NPE from MetaTableMetrics coprocessor
Have included code refactoring in MetaTableMetrics & LossyCounting
This commit is contained in:
parent
6f16836c20
commit
abaeeace00
|
@ -75,50 +75,40 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
@Override
|
@Override
|
||||||
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
|
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
|
||||||
List<Cell> results) throws IOException {
|
List<Cell> results) throws IOException {
|
||||||
if (!active || !isMetaTableOp(e)) {
|
registerAndMarkMetrics(e, get);
|
||||||
return;
|
|
||||||
}
|
|
||||||
tableMetricRegisterAndMark(e, get);
|
|
||||||
clientMetricRegisterAndMark(e);
|
|
||||||
regionMetricRegisterAndMark(e, get);
|
|
||||||
opMetricRegisterAndMark(e, get);
|
|
||||||
opWithClientMetricRegisterAndMark(e, get);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
|
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
|
||||||
Durability durability) throws IOException {
|
Durability durability) throws IOException {
|
||||||
if (!active || !isMetaTableOp(e)) {
|
registerAndMarkMetrics(e, put);
|
||||||
return;
|
|
||||||
}
|
|
||||||
tableMetricRegisterAndMark(e, put);
|
|
||||||
clientMetricRegisterAndMark(e);
|
|
||||||
regionMetricRegisterAndMark(e, put);
|
|
||||||
opMetricRegisterAndMark(e, put);
|
|
||||||
opWithClientMetricRegisterAndMark(e, put);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
|
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
|
||||||
WALEdit edit, Durability durability) throws IOException {
|
WALEdit edit, Durability durability) throws IOException {
|
||||||
|
registerAndMarkMetrics(e, delete);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerAndMarkMetrics(ObserverContext<RegionCoprocessorEnvironment> e, Row row){
|
||||||
if (!active || !isMetaTableOp(e)) {
|
if (!active || !isMetaTableOp(e)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tableMetricRegisterAndMark(e, delete);
|
tableMetricRegisterAndMark(e, row);
|
||||||
clientMetricRegisterAndMark(e);
|
clientMetricRegisterAndMark(e);
|
||||||
regionMetricRegisterAndMark(e, delete);
|
regionMetricRegisterAndMark(e, row);
|
||||||
opMetricRegisterAndMark(e, delete);
|
opMetricRegisterAndMark(e, row);
|
||||||
opWithClientMetricRegisterAndMark(e, delete);
|
opWithClientMetricRegisterAndMark(e, row);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void markMeterIfPresent(String requestMeter) {
|
private void markMeterIfPresent(String requestMeter) {
|
||||||
if (requestMeter.isEmpty()) {
|
if (requestMeter.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Metric metric =
|
|
||||||
requestsMap.get(requestMeter).isPresent() ? requestsMap.get(requestMeter).get() : null;
|
if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) {
|
||||||
if (metric != null) {
|
Meter metric = (Meter) requestsMap.get(requestMeter).get();
|
||||||
((Meter) metric).mark();
|
metric.mark();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +127,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
/**
|
/**
|
||||||
* Registers and counts lossyCount for Meters that kept by lossy counting.
|
* Registers and counts lossyCount for Meters that kept by lossy counting.
|
||||||
* By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
|
* By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
|
||||||
* e.g. when e is 0.02 by default, at most 50 Clients request metrics will be kept
|
* e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
|
||||||
* also, all kept elements have frequency higher than e * N. (N is total count)
|
* also, all kept elements have frequency higher than e * N. (N is total count)
|
||||||
* @param e Region coprocessor environment
|
* @param e Region coprocessor environment
|
||||||
* @param requestMeter meter to be registered
|
* @param requestMeter meter to be registered
|
||||||
|
@ -202,6 +192,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
|
private void clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||||
|
// Mark client metric
|
||||||
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
|
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
|
||||||
|
|
||||||
String clientRequestMeter = clientRequestMeterName(clientIP);
|
String clientRequestMeter = clientRequestMeterName(clientIP);
|
||||||
|
@ -211,37 +202,43 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
|
|
||||||
private void tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
private void tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
Row op) {
|
Row op) {
|
||||||
// Mark the meta table meter whenever the coprocessor is called
|
// Mark table metric
|
||||||
String tableName = getTableNameFromOp(op);
|
String tableName = getTableNameFromOp(op);
|
||||||
String tableRequestMeter = tableMeterName(tableName);
|
String tableRequestMeter = tableMeterName(tableName);
|
||||||
registerMeterIfNotPresent(e, tableRequestMeter);
|
registerAndMarkMeterIfNotPresent(e, tableRequestMeter);
|
||||||
markMeterIfPresent(tableRequestMeter);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
private void regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
Row op) {
|
Row op) {
|
||||||
// Mark the meta table meter whenever the coprocessor is called
|
// Mark region metric
|
||||||
String regionId = getRegionIdFromOp(op);
|
String regionId = getRegionIdFromOp(op);
|
||||||
String regionRequestMeter = regionMeterName(regionId);
|
String regionRequestMeter = regionMeterName(regionId);
|
||||||
registerMeterIfNotPresent(e, regionRequestMeter);
|
registerAndMarkMeterIfNotPresent(e, regionRequestMeter);
|
||||||
markMeterIfPresent(regionRequestMeter);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
private void opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
Row op) {
|
Row op) {
|
||||||
|
// Mark access type ["get", "put", "delete"] metric
|
||||||
String opMeterName = opMeterName(op);
|
String opMeterName = opMeterName(op);
|
||||||
registerMeterIfNotPresent(e, opMeterName);
|
registerAndMarkMeterIfNotPresent(e, opMeterName);
|
||||||
markMeterIfPresent(opMeterName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
private void opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
Object op) {
|
Object op) {
|
||||||
|
// // Mark client + access type metric
|
||||||
String opWithClientMeterName = opWithClientMeterName(op);
|
String opWithClientMeterName = opWithClientMeterName(op);
|
||||||
registerMeterIfNotPresent(e, opWithClientMeterName);
|
registerAndMarkMeterIfNotPresent(e, opWithClientMeterName);
|
||||||
markMeterIfPresent(opWithClientMeterName);
|
}
|
||||||
|
|
||||||
|
// Helper function to register and mark meter if not present
|
||||||
|
private void registerAndMarkMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
String name) {
|
||||||
|
registerMeterIfNotPresent(e, name);
|
||||||
|
markMeterIfPresent(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String opWithClientMeterName(Object op) {
|
private String opWithClientMeterName(Object op) {
|
||||||
|
// Extract meter name containing the client IP
|
||||||
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
|
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
|
||||||
if (clientIP.isEmpty()) {
|
if (clientIP.isEmpty()) {
|
||||||
return "";
|
return "";
|
||||||
|
@ -265,6 +262,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String opMeterName(Object op) {
|
private String opMeterName(Object op) {
|
||||||
|
// Extract meter name containing the access type
|
||||||
MetaTableOps ops = opsNameMap.get(op.getClass());
|
MetaTableOps ops = opsNameMap.get(op.getClass());
|
||||||
String opMeterName = "";
|
String opMeterName = "";
|
||||||
switch (ops) {
|
switch (ops) {
|
||||||
|
@ -284,10 +282,12 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String tableMeterName(String tableName) {
|
private String tableMeterName(String tableName) {
|
||||||
|
// Extract meter name containing the table name
|
||||||
return String.format("MetaTable_table_%s_request", tableName);
|
return String.format("MetaTable_table_%s_request", tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String clientRequestMeterName(String clientIP) {
|
private String clientRequestMeterName(String clientIP) {
|
||||||
|
// Extract meter name containing the client IP
|
||||||
if (clientIP.isEmpty()) {
|
if (clientIP.isEmpty()) {
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
@ -295,6 +295,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String regionMeterName(String regionId) {
|
private String regionMeterName(String regionId) {
|
||||||
|
// Extract meter name containing the region ID
|
||||||
return String.format("MetaTable_region_%s_request", regionId);
|
return String.format("MetaTable_region_%s_request", regionId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -306,18 +307,16 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start(CoprocessorEnvironment env) throws IOException {
|
public void start(CoprocessorEnvironment env) throws IOException {
|
||||||
|
observer = new ExampleRegionObserverMeta();
|
||||||
if (env instanceof RegionCoprocessorEnvironment
|
if (env instanceof RegionCoprocessorEnvironment
|
||||||
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
|
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
|
||||||
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
|
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
|
||||||
.equals(TableName.META_TABLE_NAME)) {
|
.equals(TableName.META_TABLE_NAME)) {
|
||||||
regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
|
regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
|
||||||
observer = new ExampleRegionObserverMeta();
|
|
||||||
requestsMap = new ConcurrentHashMap<>();
|
requestsMap = new ConcurrentHashMap<>();
|
||||||
clientMetricsLossyCounting = new LossyCounting();
|
clientMetricsLossyCounting = new LossyCounting();
|
||||||
// only be active mode when this region holds meta table.
|
// only be active mode when this region holds meta table.
|
||||||
active = true;
|
active = true;
|
||||||
} else {
|
|
||||||
observer = new ExampleRegionObserverMeta();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,11 +324,10 @@ public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||||
// since meta region can move around, clear stale metrics when stop.
|
// since meta region can move around, clear stale metrics when stop.
|
||||||
if (requestsMap != null) {
|
if (requestsMap != null) {
|
||||||
|
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||||
for (String meterName : requestsMap.keySet()) {
|
for (String meterName : requestsMap.keySet()) {
|
||||||
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
|
||||||
registry.remove(meterName);
|
registry.remove(meterName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -61,27 +60,16 @@ public class LossyCounting {
|
||||||
this.bucketSize = (long) Math.ceil(1 / errorRate);
|
this.bucketSize = (long) Math.ceil(1 / errorRate);
|
||||||
this.currentTerm = 1;
|
this.currentTerm = 1;
|
||||||
this.totalDataCount = 0;
|
this.totalDataCount = 0;
|
||||||
this.errorRate = errorRate;
|
|
||||||
this.data = new ConcurrentHashMap<>();
|
this.data = new ConcurrentHashMap<>();
|
||||||
calculateCurrentTerm();
|
calculateCurrentTerm();
|
||||||
}
|
}
|
||||||
|
|
||||||
public LossyCounting() {
|
public LossyCounting() {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02));
|
||||||
this.errorRate = conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02);
|
|
||||||
this.bucketSize = (long) Math.ceil(1.0 / errorRate);
|
|
||||||
this.currentTerm = 1;
|
|
||||||
this.totalDataCount = 0;
|
|
||||||
this.data = new ConcurrentHashMap<>();
|
|
||||||
calculateCurrentTerm();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<String> addByOne(String key) {
|
public Set<String> addByOne(String key) {
|
||||||
if(data.containsKey(key)) {
|
data.put(key, data.getOrDefault(key, 0) + 1);
|
||||||
data.put(key, data.get(key) +1);
|
|
||||||
} else {
|
|
||||||
data.put(key, 1);
|
|
||||||
}
|
|
||||||
totalDataCount++;
|
totalDataCount++;
|
||||||
calculateCurrentTerm();
|
calculateCurrentTerm();
|
||||||
Set<String> dataToBeSwept = new HashSet<>();
|
Set<String> dataToBeSwept = new HashSet<>();
|
||||||
|
@ -105,7 +93,7 @@ public class LossyCounting {
|
||||||
for(String key : dataToBeSwept) {
|
for(String key : dataToBeSwept) {
|
||||||
data.remove(key);
|
data.remove(key);
|
||||||
}
|
}
|
||||||
LOG.debug(String.format("Swept %d of elements.", dataToBeSwept.size()));
|
LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size()));
|
||||||
return dataToBeSwept;
|
return dataToBeSwept;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,7 +104,7 @@ public class LossyCounting {
|
||||||
this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
|
this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getBuketSize(){
|
public long getBucketSize(){
|
||||||
return bucketSize;
|
return bucketSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -222,10 +222,6 @@ public class TestMetaTableMetrics {
|
||||||
jmxMetrics.stream().filter(metric -> metric.matches(putWithClientMetricNameRegex))
|
jmxMetrics.stream().filter(metric -> metric.matches(putWithClientMetricNameRegex))
|
||||||
.count();
|
.count();
|
||||||
assertEquals(5L, putWithClientMetricsCount);
|
assertEquals(5L, putWithClientMetricsCount);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,9 +39,9 @@ public class TestLossyCounting {
|
||||||
@Test
|
@Test
|
||||||
public void testBucketSize() {
|
public void testBucketSize() {
|
||||||
LossyCounting lossyCounting = new LossyCounting(0.01);
|
LossyCounting lossyCounting = new LossyCounting(0.01);
|
||||||
assertEquals(100L, lossyCounting.getBuketSize());
|
assertEquals(100L, lossyCounting.getBucketSize());
|
||||||
LossyCounting lossyCounting2 = new LossyCounting();
|
LossyCounting lossyCounting2 = new LossyCounting();
|
||||||
assertEquals(50L, lossyCounting2.getBuketSize());
|
assertEquals(50L, lossyCounting2.getBucketSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue