HBASE-19722 Meta query statistics metrics source
Signed-off-by: Andrew Purtell <apurtell@apache.org>
(cherry picked from commit 58ccd3dc7e
)
This commit is contained in:
parent
47099d1b17
commit
d89e5270aa
|
@ -1402,6 +1402,8 @@ public final class HConstants {
|
||||||
public static final String DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME =
|
public static final String DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME =
|
||||||
"hbase-failsafe-{snapshot.name}-{restore.timestamp}";
|
"hbase-failsafe-{snapshot.name}-{restore.timestamp}";
|
||||||
|
|
||||||
|
public static final String DEFAULT_LOSSY_COUNTING_ERROR_RATE =
|
||||||
|
"hbase.util.default.lossycounting.errorrate";
|
||||||
public static final String NOT_IMPLEMENTED = "Not implemented";
|
public static final String NOT_IMPLEMENTED = "Not implemented";
|
||||||
|
|
||||||
private HConstants() {
|
private HConstants() {
|
||||||
|
|
|
@ -0,0 +1,335 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Row;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
|
import org.apache.hadoop.hbase.metrics.Meter;
|
||||||
|
import org.apache.hadoop.hbase.metrics.Metric;
|
||||||
|
import org.apache.hadoop.hbase.metrics.MetricRegistry;
|
||||||
|
import org.apache.hadoop.hbase.util.LossyCounting;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A coprocessor that collects metrics from meta table.
|
||||||
|
* <p>
|
||||||
|
* These metrics will be available through the regular Hadoop metrics2 sinks (ganglia, opentsdb,
|
||||||
|
* etc) as well as JMX output.
|
||||||
|
* </p>
|
||||||
|
* @see MetaTableMetrics
|
||||||
|
*/
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class MetaTableMetrics implements RegionCoprocessor {
|
||||||
|
|
||||||
|
private ExampleRegionObserverMeta observer;
|
||||||
|
private Map<String, Optional<Metric>> requestsMap;
|
||||||
|
private RegionCoprocessorEnvironment regionCoprocessorEnv;
|
||||||
|
private LossyCounting clientMetricsLossyCounting;
|
||||||
|
private boolean active = false;
|
||||||
|
|
||||||
|
enum MetaTableOps {
|
||||||
|
GET, PUT, DELETE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ImmutableMap<Class, MetaTableOps> opsNameMap =
|
||||||
|
ImmutableMap.<Class, MetaTableOps>builder()
|
||||||
|
.put(Put.class, MetaTableOps.PUT)
|
||||||
|
.put(Get.class, MetaTableOps.GET)
|
||||||
|
.put(Delete.class, MetaTableOps.DELETE)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
class ExampleRegionObserverMeta implements RegionCoprocessor, RegionObserver {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<RegionObserver> getRegionObserver() {
|
||||||
|
return Optional.of(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
|
||||||
|
List<Cell> results) throws IOException {
|
||||||
|
if (!active || !isMetaTableOp(e)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tableMetricRegisterAndMark(e, get);
|
||||||
|
clientMetricRegisterAndMark(e);
|
||||||
|
regionMetricRegisterAndMark(e, get);
|
||||||
|
opMetricRegisterAndMark(e, get);
|
||||||
|
opWithClientMetricRegisterAndMark(e, get);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
|
||||||
|
Durability durability) throws IOException {
|
||||||
|
if (!active || !isMetaTableOp(e)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tableMetricRegisterAndMark(e, put);
|
||||||
|
clientMetricRegisterAndMark(e);
|
||||||
|
regionMetricRegisterAndMark(e, put);
|
||||||
|
opMetricRegisterAndMark(e, put);
|
||||||
|
opWithClientMetricRegisterAndMark(e, put);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
|
||||||
|
WALEdit edit, Durability durability) throws IOException {
|
||||||
|
if (!active || !isMetaTableOp(e)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tableMetricRegisterAndMark(e, delete);
|
||||||
|
clientMetricRegisterAndMark(e);
|
||||||
|
regionMetricRegisterAndMark(e, delete);
|
||||||
|
opMetricRegisterAndMark(e, delete);
|
||||||
|
opWithClientMetricRegisterAndMark(e, delete);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void markMeterIfPresent(String requestMeter) {
|
||||||
|
if (requestMeter.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Metric metric =
|
||||||
|
requestsMap.get(requestMeter).isPresent() ? requestsMap.get(requestMeter).get() : null;
|
||||||
|
if (metric != null) {
|
||||||
|
((Meter) metric).mark();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
String requestMeter) {
|
||||||
|
if (requestMeter.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!requestsMap.containsKey(requestMeter)) {
|
||||||
|
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||||
|
registry.meter(requestMeter);
|
||||||
|
requestsMap.put(requestMeter, registry.get(requestMeter));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers and counts lossyCount for Meters that kept by lossy counting.
|
||||||
|
* By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
|
||||||
|
* e.g. when e is 0.02 by default, at most 50 Clients request metrics will be kept
|
||||||
|
* also, all kept elements have frequency higher than e * N. (N is total count)
|
||||||
|
* @param e Region coprocessor environment
|
||||||
|
* @param requestMeter meter to be registered
|
||||||
|
* @param lossyCounting lossyCounting object for one type of meters.
|
||||||
|
*/
|
||||||
|
private void registerLossyCountingMeterIfNotPresent(
|
||||||
|
ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
String requestMeter, LossyCounting lossyCounting) {
|
||||||
|
if (requestMeter.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
|
||||||
|
if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){
|
||||||
|
for(String meter: metersToBeRemoved) {
|
||||||
|
//cleanup requestsMap according swept data from lossy count;
|
||||||
|
requestsMap.remove(meter);
|
||||||
|
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||||
|
registry.remove(meter);
|
||||||
|
}
|
||||||
|
// newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!requestsMap.containsKey(requestMeter)) {
|
||||||
|
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||||
|
registry.meter(requestMeter);
|
||||||
|
requestsMap.put(requestMeter, registry.get(requestMeter));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get table name from Ops such as: get, put, delete.
|
||||||
|
* @param op such as get, put or delete.
|
||||||
|
*/
|
||||||
|
private String getTableNameFromOp(Row op) {
|
||||||
|
String tableName = null;
|
||||||
|
String tableRowKey = new String(((Row) op).getRow(), StandardCharsets.UTF_8);
|
||||||
|
if (tableRowKey.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
tableName = tableRowKey.split(",").length > 0 ? tableRowKey.split(",")[0] : null;
|
||||||
|
return tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get regionId from Ops such as: get, put, delete.
|
||||||
|
* @param op such as get, put or delete.
|
||||||
|
*/
|
||||||
|
private String getRegionIdFromOp(Row op) {
|
||||||
|
String regionId = null;
|
||||||
|
String tableRowKey = new String(((Row) op).getRow(), StandardCharsets.UTF_8);
|
||||||
|
if (tableRowKey.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
regionId = tableRowKey.split(",").length > 2 ? tableRowKey.split(",")[2] : null;
|
||||||
|
return regionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isMetaTableOp(ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||||
|
return TableName.META_TABLE_NAME.toString()
|
||||||
|
.equals(new String(e.getEnvironment().getRegionInfo().getTable().getName(),
|
||||||
|
StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||||
|
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
|
||||||
|
|
||||||
|
String clientRequestMeter = clientRequestMeterName(clientIP);
|
||||||
|
registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting);
|
||||||
|
markMeterIfPresent(clientRequestMeter);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
Row op) {
|
||||||
|
// Mark the meta table meter whenever the coprocessor is called
|
||||||
|
String tableName = getTableNameFromOp(op);
|
||||||
|
String tableRequestMeter = tableMeterName(tableName);
|
||||||
|
registerMeterIfNotPresent(e, tableRequestMeter);
|
||||||
|
markMeterIfPresent(tableRequestMeter);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
Row op) {
|
||||||
|
// Mark the meta table meter whenever the coprocessor is called
|
||||||
|
String regionId = getRegionIdFromOp(op);
|
||||||
|
String regionRequestMeter = regionMeterName(regionId);
|
||||||
|
registerMeterIfNotPresent(e, regionRequestMeter);
|
||||||
|
markMeterIfPresent(regionRequestMeter);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
Row op) {
|
||||||
|
String opMeterName = opMeterName(op);
|
||||||
|
registerMeterIfNotPresent(e, opMeterName);
|
||||||
|
markMeterIfPresent(opMeterName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
Object op) {
|
||||||
|
String opWithClientMeterName = opWithClientMeterName(op);
|
||||||
|
registerMeterIfNotPresent(e, opWithClientMeterName);
|
||||||
|
markMeterIfPresent(opWithClientMeterName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String opWithClientMeterName(Object op) {
|
||||||
|
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
|
||||||
|
if (clientIP.isEmpty()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
MetaTableOps ops = opsNameMap.get(op.getClass());
|
||||||
|
String opWithClientMeterName = "";
|
||||||
|
switch (ops) {
|
||||||
|
case GET:
|
||||||
|
opWithClientMeterName = String.format("MetaTable_client_%s_get_request", clientIP);
|
||||||
|
break;
|
||||||
|
case PUT:
|
||||||
|
opWithClientMeterName = String.format("MetaTable_client_%s_put_request", clientIP);
|
||||||
|
break;
|
||||||
|
case DELETE:
|
||||||
|
opWithClientMeterName = String.format("MetaTable_client_%s_delete_request", clientIP);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return opWithClientMeterName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String opMeterName(Object op) {
|
||||||
|
MetaTableOps ops = opsNameMap.get(op.getClass());
|
||||||
|
String opMeterName = "";
|
||||||
|
switch (ops) {
|
||||||
|
case GET:
|
||||||
|
opMeterName = "MetaTable_get_request";
|
||||||
|
break;
|
||||||
|
case PUT:
|
||||||
|
opMeterName = "MetaTable_put_request";
|
||||||
|
break;
|
||||||
|
case DELETE:
|
||||||
|
opMeterName = "MetaTable_delete_request";
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return opMeterName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String tableMeterName(String tableName) {
|
||||||
|
return String.format("MetaTable_table_%s_request", tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String clientRequestMeterName(String clientIP) {
|
||||||
|
if (clientIP.isEmpty()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
return String.format("MetaTable_client_%s_request", clientIP);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String regionMeterName(String regionId) {
|
||||||
|
return String.format("MetaTable_region_%s_request", regionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<RegionObserver> getRegionObserver() {
|
||||||
|
return Optional.of(observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(CoprocessorEnvironment env) throws IOException {
|
||||||
|
if (env instanceof RegionCoprocessorEnvironment
|
||||||
|
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
|
||||||
|
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable().getName() != null
|
||||||
|
&& new String(((RegionCoprocessorEnvironment) env).getRegionInfo().getTable().getName(),
|
||||||
|
StandardCharsets.UTF_8).equals(TableName.META_TABLE_NAME.toString())) {
|
||||||
|
regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
|
||||||
|
observer = new ExampleRegionObserverMeta();
|
||||||
|
requestsMap = new ConcurrentHashMap<>();
|
||||||
|
clientMetricsLossyCounting = new LossyCounting();
|
||||||
|
// only be active mode when this region holds meta table.
|
||||||
|
active = true;
|
||||||
|
} else {
|
||||||
|
observer = new ExampleRegionObserverMeta();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(CoprocessorEnvironment e) throws IOException {
|
||||||
|
// since meta region can move around, clear stale metrics when stop.
|
||||||
|
for (String meterName : requestsMap.keySet()) {
|
||||||
|
MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
|
||||||
|
registry.remove(meterName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,135 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LossyCounting utility, bounded data structure that maintains approximate high frequency
|
||||||
|
* elements in data stream.
|
||||||
|
*
|
||||||
|
* Bucket size is 1 / error rate. (Error rate is 0.02 by default)
|
||||||
|
* Lemma If element does not appear in set, then is frequency is less than e * N
|
||||||
|
* (N is total element counts until now.)
|
||||||
|
* Based on paper:
|
||||||
|
* http://www.vldb.org/conf/2002/S10P03.pdf
|
||||||
|
*/
|
||||||
|
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public class LossyCounting {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
|
||||||
|
private long bucketSize;
|
||||||
|
private long currentTerm;
|
||||||
|
private double errorRate;
|
||||||
|
private Map<String, Integer> data;
|
||||||
|
private long totalDataCount;
|
||||||
|
|
||||||
|
public LossyCounting(double errorRate) {
|
||||||
|
this.errorRate = errorRate;
|
||||||
|
if (errorRate < 0.0 || errorRate > 1.0) {
|
||||||
|
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
|
||||||
|
}
|
||||||
|
this.bucketSize = (long) Math.ceil(1 / errorRate);
|
||||||
|
this.currentTerm = 1;
|
||||||
|
this.totalDataCount = 0;
|
||||||
|
this.errorRate = errorRate;
|
||||||
|
this.data = new ConcurrentHashMap<>();
|
||||||
|
calculateCurrentTerm();
|
||||||
|
}
|
||||||
|
|
||||||
|
public LossyCounting() {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
this.errorRate = conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02);
|
||||||
|
this.bucketSize = (long) Math.ceil(1.0 / errorRate);
|
||||||
|
this.currentTerm = 1;
|
||||||
|
this.totalDataCount = 0;
|
||||||
|
this.data = new ConcurrentHashMap<>();
|
||||||
|
calculateCurrentTerm();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> addByOne(String key) {
|
||||||
|
if(data.containsKey(key)) {
|
||||||
|
data.put(key, data.get(key) +1);
|
||||||
|
} else {
|
||||||
|
data.put(key, 1);
|
||||||
|
}
|
||||||
|
totalDataCount++;
|
||||||
|
calculateCurrentTerm();
|
||||||
|
Set<String> dataToBeSwept = new HashSet<>();
|
||||||
|
if(totalDataCount % bucketSize == 0) {
|
||||||
|
dataToBeSwept = sweep();
|
||||||
|
}
|
||||||
|
return dataToBeSwept;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sweep low frequency data
|
||||||
|
* @return Names of elements got swept
|
||||||
|
*/
|
||||||
|
private Set<String> sweep() {
|
||||||
|
Set<String> dataToBeSwept = new HashSet<>();
|
||||||
|
for(Map.Entry<String, Integer> entry : data.entrySet()) {
|
||||||
|
if(entry.getValue() + errorRate < currentTerm) {
|
||||||
|
dataToBeSwept.add(entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for(String key : dataToBeSwept) {
|
||||||
|
data.remove(key);
|
||||||
|
}
|
||||||
|
LOG.debug(String.format("Swept %d of elements.", dataToBeSwept.size()));
|
||||||
|
return dataToBeSwept;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate and set current term
|
||||||
|
*/
|
||||||
|
private void calculateCurrentTerm() {
|
||||||
|
this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBuketSize(){
|
||||||
|
return bucketSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDataSize() {
|
||||||
|
return data.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean contains(String key) {
|
||||||
|
return data.containsKey(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCurrentTerm() {
|
||||||
|
return currentTerm;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,231 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import javax.management.MBeanAttributeInfo;
|
||||||
|
import javax.management.MBeanInfo;
|
||||||
|
import javax.management.MBeanServerConnection;
|
||||||
|
import javax.management.ObjectInstance;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import javax.management.remote.JMXConnector;
|
||||||
|
import javax.management.remote.JMXConnectorFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.JMXListener;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
@Category({ CoprocessorTests.class, MediumTests.class })
|
||||||
|
public class TestMetaTableMetrics {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestMetaTableMetrics.class);
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableMetrics.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
private static final TableName NAME1 = TableName.valueOf("TestExampleMetaTableMetricsOne");
|
||||||
|
private static final byte[] FAMILY = Bytes.toBytes("f");
|
||||||
|
private static final byte[] QUALIFIER = Bytes.toBytes("q");
|
||||||
|
private static final ColumnFamilyDescriptor CFD =
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build();
|
||||||
|
private static final int NUM_ROWS = 5;
|
||||||
|
private static final String value = "foo";
|
||||||
|
private static Configuration conf = null;
|
||||||
|
private static int connectorPort = 61120;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
|
||||||
|
conf = UTIL.getConfiguration();
|
||||||
|
// Set system coprocessor so it can be applied to meta regions
|
||||||
|
UTIL.getConfiguration().set("hbase.coprocessor.region.classes",
|
||||||
|
MetaTableMetrics.class.getName());
|
||||||
|
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName());
|
||||||
|
Random rand = new Random();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
do {
|
||||||
|
int sign = i % 2 == 0 ? 1 : -1;
|
||||||
|
connectorPort += sign * rand.nextInt(100);
|
||||||
|
} while (!HBaseTestingUtility.available(connectorPort));
|
||||||
|
try {
|
||||||
|
conf.setInt("regionserver.rmi.registry.port", connectorPort);
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
break;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.debug("Encountered exception when starting cluster. Trying port " + connectorPort, e);
|
||||||
|
try {
|
||||||
|
// this is to avoid "IllegalStateException: A mini-cluster is already running"
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.debug("Encountered exception shutting down cluster", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
UTIL.getAdmin()
|
||||||
|
.createTable(TableDescriptorBuilder.newBuilder(NAME1)
|
||||||
|
.setColumnFamily(CFD)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeData(Table t) throws IOException {
|
||||||
|
List<Put> puts = new ArrayList<>(NUM_ROWS);
|
||||||
|
for (int i = 0; i < NUM_ROWS; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes(i + 1));
|
||||||
|
p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value));
|
||||||
|
puts.add(p);
|
||||||
|
}
|
||||||
|
t.put(puts);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> readJmxMetricsWithRetry() throws IOException {
|
||||||
|
final int count = 0;
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Set<String> metrics = readJmxMetrics();
|
||||||
|
if (metrics != null) {
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
LOG.warn("Failed to get jmxmetrics... sleeping, retrying; " + i + " of " + count + " times");
|
||||||
|
Threads.sleep(1000);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX
|
||||||
|
* @throws IOException when fails to retrieve jmx metrics.
|
||||||
|
*/
|
||||||
|
// this method comes from this class: TestStochasticBalancerJmxMetrics with minor modifications.
|
||||||
|
private Set<String> readJmxMetrics() throws IOException {
|
||||||
|
JMXConnector connector = null;
|
||||||
|
ObjectName target = null;
|
||||||
|
MBeanServerConnection mb = null;
|
||||||
|
try {
|
||||||
|
connector =
|
||||||
|
JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort));
|
||||||
|
mb = connector.getMBeanServerConnection();
|
||||||
|
|
||||||
|
@SuppressWarnings("JdkObsolete")
|
||||||
|
Hashtable<String, String> pairs = new Hashtable<>();
|
||||||
|
pairs.put("service", "HBase");
|
||||||
|
pairs.put("name", "RegionServer");
|
||||||
|
pairs.put("sub",
|
||||||
|
"Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor"
|
||||||
|
+ ".MetaTableMetrics");
|
||||||
|
target = new ObjectName("Hadoop", pairs);
|
||||||
|
MBeanInfo beanInfo = mb.getMBeanInfo(target);
|
||||||
|
|
||||||
|
Set<String> existingAttrs = new HashSet<>();
|
||||||
|
for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) {
|
||||||
|
existingAttrs.add(attrInfo.getName());
|
||||||
|
}
|
||||||
|
return existingAttrs;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to get bean." + target, e);
|
||||||
|
if (mb != null) {
|
||||||
|
Set<ObjectInstance> instances = mb.queryMBeans(null, null);
|
||||||
|
Iterator<ObjectInstance> iterator = instances.iterator();
|
||||||
|
LOG.warn("MBean Found:");
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
ObjectInstance instance = iterator.next();
|
||||||
|
LOG.warn("Class Name: " + instance.getClassName());
|
||||||
|
LOG.warn("Object Name: " + instance.getObjectName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (connector != null) {
|
||||||
|
try {
|
||||||
|
connector.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// verifies meta table metrics exist from jmx
|
||||||
|
// for one table, there should be 5 MetaTable_table_<TableName> metrics.
|
||||||
|
// such as:
|
||||||
|
// [Time-limited test] example.TestMetaTableMetrics(204): ==
|
||||||
|
// MetaTable_table_TestExampleMetaTableMetricsOne_request_count
|
||||||
|
// [Time-limited test] example.TestMetaTableMetrics(204): ==
|
||||||
|
// MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate
|
||||||
|
// [Time-limited test] example.TestMetaTableMetrics(204): ==
|
||||||
|
// MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate
|
||||||
|
// [Time-limited test] example.TestMetaTableMetrics(204): ==
|
||||||
|
// MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate
|
||||||
|
// [Time-limited test] example.TestMetaTableMetrics(204): ==
|
||||||
|
// MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate
|
||||||
|
@Test
|
||||||
|
public void test() throws IOException, InterruptedException {
|
||||||
|
try (Table t = UTIL.getConnection().getTable(NAME1)) {
|
||||||
|
writeData(t);
|
||||||
|
// Flush the data
|
||||||
|
UTIL.flush(NAME1);
|
||||||
|
// Issue a compaction
|
||||||
|
UTIL.compact(NAME1, true);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
}
|
||||||
|
Set<String> jmxMetrics = readJmxMetricsWithRetry();
|
||||||
|
assertNotNull(jmxMetrics);
|
||||||
|
long name1TableMetricsCount =
|
||||||
|
jmxMetrics.stream().filter(metric -> metric.contains("MetaTable_table_" + NAME1)).count();
|
||||||
|
assertEquals(5L, name1TableMetricsCount);
|
||||||
|
|
||||||
|
String putWithClientMetricNameRegex = "MetaTable_client_.+_put_request.*";
|
||||||
|
long putWithClientMetricsCount =
|
||||||
|
jmxMetrics.stream().filter(metric -> metric.matches(putWithClientMetricNameRegex))
|
||||||
|
.count();
|
||||||
|
assertEquals(5L, putWithClientMetricsCount);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({MiscTests.class, SmallTests.class})
|
||||||
|
public class TestLossyCounting {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestLossyCounting.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBucketSize() {
|
||||||
|
LossyCounting lossyCounting = new LossyCounting(0.01);
|
||||||
|
assertEquals(100L, lossyCounting.getBuketSize());
|
||||||
|
LossyCounting lossyCounting2 = new LossyCounting();
|
||||||
|
assertEquals(50L, lossyCounting2.getBuketSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddByOne() {
|
||||||
|
LossyCounting lossyCounting = new LossyCounting(0.01);
|
||||||
|
for(int i = 0; i < 100; i++){
|
||||||
|
String key = "" + i;
|
||||||
|
lossyCounting.addByOne(key);
|
||||||
|
}
|
||||||
|
assertEquals(100L, lossyCounting.getDataSize());
|
||||||
|
for(int i = 0; i < 100; i++){
|
||||||
|
String key = "" + i;
|
||||||
|
assertEquals(true, lossyCounting.contains(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSweep1() {
|
||||||
|
LossyCounting lossyCounting = new LossyCounting(0.01);
|
||||||
|
for(int i = 0; i < 400; i++){
|
||||||
|
String key = "" + i;
|
||||||
|
lossyCounting.addByOne(key);
|
||||||
|
}
|
||||||
|
assertEquals(4L, lossyCounting.getCurrentTerm());
|
||||||
|
assertEquals(0L, lossyCounting.getDataSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSweep2() {
|
||||||
|
LossyCounting lossyCounting = new LossyCounting(0.1);
|
||||||
|
for(int i = 0; i < 10; i++){
|
||||||
|
String key = "" + i;
|
||||||
|
lossyCounting.addByOne(key);
|
||||||
|
}
|
||||||
|
assertEquals(10L, lossyCounting.getDataSize());
|
||||||
|
for(int i = 0; i < 10; i++){
|
||||||
|
String key = "1";
|
||||||
|
lossyCounting.addByOne(key);
|
||||||
|
}
|
||||||
|
assertEquals(1L, lossyCounting.getDataSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue