diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java index d542d2f2bcb..70e8df122c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java @@ -1,23 +1,31 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.TableName; @@ -27,12 +35,11 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.metrics.Meter; -import org.apache.hadoop.hbase.metrics.Metric; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.util.LossyCounting; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -49,10 +56,10 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; public class MetaTableMetrics implements RegionCoprocessor { private ExampleRegionObserverMeta observer; - private Map> requestsMap; private MetricRegistry registry; private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private boolean active = false; + private Set metrics = new HashSet(); enum MetaTableOps { GET, PUT, DELETE; @@ -101,66 +108,6 @@ public class MetaTableMetrics implements RegionCoprocessor { opWithClientMetricRegisterAndMark(row); } - private void markMeterIfPresent(String requestMeter) { - if (requestMeter.isEmpty()) { - return; - } - - Optional optionalMetric = requestsMap.get(requestMeter); - if (optionalMetric != null && optionalMetric.isPresent()) { - Meter metric = (Meter) optionalMetric.get(); - metric.mark(); - } - } - - private void registerMeterIfNotPresent(String requestMeter) { - if (requestMeter.isEmpty()) { - return; - } - if (!requestsMap.containsKey(requestMeter)) { - registry.meter(requestMeter); - requestsMap.put(requestMeter, registry.get(requestMeter)); - } - } - - /** - * Registers and counts lossyCount for Meters that kept by lossy counting. - * By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate) - * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept - * also, all kept elements have frequency higher than e * N. (N is total count) - * @param requestMeter meter to be registered - * @param lossyCounting lossyCounting object for one type of meters. - */ - private void registerLossyCountingMeterIfNotPresent(String requestMeter, - LossyCounting lossyCounting) { - - if (requestMeter.isEmpty()) { - return; - } - synchronized (lossyCounting) { - Set metersToBeRemoved = lossyCounting.addByOne(requestMeter); - - boolean isNewMeter = !requestsMap.containsKey(requestMeter); - boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter); - if (isNewMeter) { - if (requestMeterRemoved) { - // if the new metric is swept off by lossyCounting then don't add in the map - metersToBeRemoved.remove(requestMeter); - } else { - // else register the new metric and add in the map - registry.meter(requestMeter); - requestsMap.put(requestMeter, registry.get(requestMeter)); - } - } - - for (String meter : metersToBeRemoved) { - //cleanup requestsMap according to the swept data from lossy count; - requestsMap.remove(meter); - registry.remove(meter); - } - } - } - /** * Get table name from Ops such as: get, put, delete. * @param op such as get, put or delete. @@ -196,13 +143,13 @@ public class MetaTableMetrics implements RegionCoprocessor { private void clientMetricRegisterAndMark() { // Mark client metric - String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : ""; + String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null; if (clientIP == null || clientIP.isEmpty()) { return; } String clientRequestMeter = clientRequestMeterName(clientIP); - registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting); - markMeterIfPresent(clientRequestMeter); + clientMetricsLossyCounting.add(clientRequestMeter); + registerAndMarkMeter(clientRequestMeter); } private void tableMetricRegisterAndMark(Row op) { @@ -212,7 +159,7 @@ public class MetaTableMetrics implements RegionCoprocessor { return; } String tableRequestMeter = tableMeterName(tableName); - registerAndMarkMeterIfNotPresent(tableRequestMeter); + registerAndMarkMeter(tableRequestMeter); } private void regionMetricRegisterAndMark(Row op) { @@ -222,8 +169,8 @@ public class MetaTableMetrics implements RegionCoprocessor { return; } String regionRequestMeter = regionMeterName(regionId); - registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting); - markMeterIfPresent(regionRequestMeter); + regionMetricsLossyCounting.add(regionRequestMeter); + registerAndMarkMeter(regionRequestMeter); } private void opMetricRegisterAndMark(Row op) { @@ -232,7 +179,7 @@ public class MetaTableMetrics implements RegionCoprocessor { if (opMeterName == null || opMeterName.isEmpty()) { return; } - registerAndMarkMeterIfNotPresent(opMeterName); + registerAndMarkMeter(opMeterName); } private void opWithClientMetricRegisterAndMark(Object op) { @@ -241,13 +188,18 @@ public class MetaTableMetrics implements RegionCoprocessor { if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) { return; } - registerAndMarkMeterIfNotPresent(opWithClientMeterName); + registerAndMarkMeter(opWithClientMeterName); } // Helper function to register and mark meter if not present - private void registerAndMarkMeterIfNotPresent(String name) { - registerMeterIfNotPresent(name); - markMeterIfPresent(name); + private void registerAndMarkMeter(String requestMeter) { + if (requestMeter.isEmpty()) { + return; + } + if(!registry.get(requestMeter).isPresent()){ + metrics.add(requestMeter); + } + registry.meter(requestMeter).mark(); } private String opWithClientMeterName(Object op) { @@ -327,9 +279,14 @@ public class MetaTableMetrics implements RegionCoprocessor { .equals(TableName.META_TABLE_NAME)) { RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); - requestsMap = new ConcurrentHashMap<>(); - clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics"); - regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics"); + LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){ + @Override public void sweep(String key) { + registry.remove(key); + metrics.remove(key); + } + }; + clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics",listener); + regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics",listener); // only be active mode when this region holds meta table. active = true; } @@ -338,10 +295,8 @@ public class MetaTableMetrics implements RegionCoprocessor { @Override public void stop(CoprocessorEnvironment env) throws IOException { // since meta region can move around, clear stale metrics when stop. - if (requestsMap != null) { - for (String meterName : requestsMap.keySet()) { - registry.remove(meterName); - } + for(String metric:metrics){ + registry.remove(metric); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java index be8b59217ca..ca1a014e773 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.util; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -47,13 +46,18 @@ import org.slf4j.LoggerFactory; public class LossyCounting { private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); private long bucketSize; - private long currentTerm; + private int currentTerm; private double errorRate; private Map data; private long totalDataCount; private String name; + private LossyCountingListener listener; - public LossyCounting(double errorRate, String name) { + public interface LossyCountingListener { + void sweep(String key); + } + + public LossyCounting(double errorRate, String name, LossyCountingListener listener) { this.errorRate = errorRate; this.name = name; if (errorRate < 0.0 || errorRate > 1.0) { @@ -63,48 +67,55 @@ public class LossyCounting { this.currentTerm = 1; this.totalDataCount = 0; this.data = new ConcurrentHashMap<>(); + this.listener = listener; calculateCurrentTerm(); } - public LossyCounting(String name) { + public LossyCounting(String name, LossyCountingListener listener) { this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), - name); + name, listener); } - public Set addByOne(String key) { - data.put(key, data.getOrDefault(key, 0) + 1); + private void addByOne(String key) { + //If entry exists, we update the entry by incrementing its frequency by one. Otherwise, + //we create a new entry starting with currentTerm so that it will not be pruned immediately + data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1); + + //update totalDataCount and term totalDataCount++; calculateCurrentTerm(); - Set dataToBeSwept = new HashSet<>(); - if(totalDataCount % bucketSize == 0) { - dataToBeSwept = sweep(); - } - return dataToBeSwept; } + public void add(String key) { + addByOne(key); + if(totalDataCount % bucketSize == 0) { + //sweep the entries at bucket boundaries + sweep(); + } + } + + /** * sweep low frequency data * @return Names of elements got swept */ - private Set sweep() { - Set dataToBeSwept = new HashSet<>(); + private void sweep() { for(Map.Entry entry : data.entrySet()) { - if(entry.getValue() + errorRate < currentTerm) { - dataToBeSwept.add(entry.getKey()); + if(entry.getValue() < currentTerm) { + String metric = entry.getKey(); + data.remove(metric); + if (listener != null) { + listener.sweep(metric); + } } } - for(String key : dataToBeSwept) { - data.remove(key); - } - LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size())); - return dataToBeSwept; } /** * Calculate and set current term */ private void calculateCurrentTerm() { - this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize); + this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize); } public long getBucketSize(){ @@ -119,6 +130,10 @@ public class LossyCounting { return data.containsKey(key); } + public Set getElements(){ + return data.keySet(); + } + public long getCurrentTerm() { return currentTerm; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java index 97f24ee25ff..9fe8c12a4f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java @@ -1,12 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.coprocessor; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java index e4f19391ec8..5240c4045c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java @@ -38,18 +38,18 @@ public class TestLossyCounting { @Test public void testBucketSize() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize"); + LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null); assertEquals(100L, lossyCounting.getBucketSize()); - LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2"); + LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null); assertEquals(50L, lossyCounting2.getBucketSize()); } @Test public void testAddByOne() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne"); + LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null); for(int i = 0; i < 100; i++){ String key = "" + i; - lossyCounting.addByOne(key); + lossyCounting.add(key); } assertEquals(100L, lossyCounting.getDataSize()); for(int i = 0; i < 100; i++){ @@ -60,26 +60,27 @@ public class TestLossyCounting { @Test public void testSweep1() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1"); + LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null); for(int i = 0; i < 400; i++){ String key = "" + i; - lossyCounting.addByOne(key); + lossyCounting.add(key); } assertEquals(4L, lossyCounting.getCurrentTerm()); - assertEquals(0L, lossyCounting.getDataSize()); + //if total rows added are proportional to bucket size + assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize()); } @Test public void testSweep2() { - LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2"); + LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null); for(int i = 0; i < 10; i++){ String key = "" + i; - lossyCounting.addByOne(key); + lossyCounting.add(key); } assertEquals(10L, lossyCounting.getDataSize()); for(int i = 0; i < 10; i++){ String key = "1"; - lossyCounting.addByOne(key); + lossyCounting.add(key); } assertEquals(1L, lossyCounting.getDataSize()); }