HBASE-23210 Backport HBASE-15519 (Add per-user metrics) to branch-1 (#755)

HBASE-15519 Add per-user metrics with lossy counting

Introducing property hbase.regionserver.user.metrics.enabled(Default:true)
to disable user metrics in case it accounts for any performance issues

Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
Andrew Purtell 2019-10-28 09:05:36 -07:00 committed by GitHub
parent db2ce23a93
commit d16cbfe91d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1020 additions and 47 deletions

View File

@ -42,6 +42,18 @@ public interface MetricsRegionServerSourceFactory {
*/
MetricsRegionSource createRegion(MetricsRegionWrapper wrapper);
/**
* Create a MetricsUserSource from a user
* @return A metrics user source
*/
MetricsUserSource createUser(String shortUserName);
/**
* Return the singleton instance for MetricsUserAggregateSource
* @return A metrics user aggregate source
*/
MetricsUserAggregateSource getUserAggregate();
/**
* Create a MetricsTableSource from a MetricsTableWrapper.
*

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.metrics.BaseSource;
/**
* This interface will be implemented by a MetricsSource that will export metrics from
* multiple users into the hadoop metrics system.
*/
public interface MetricsUserAggregateSource extends BaseSource {
/**
* The name of the metrics
*/
static final String METRICS_NAME = "Users";
/**
* The name of the metrics context that metrics will be under.
*/
static final String METRICS_CONTEXT = "regionserver";
/**
* Description
*/
static final String METRICS_DESCRIPTION = "Metrics about users connected to the regionserver";
/**
* The name of the metrics context that metrics will be under in jmx
*/
static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
static final String NUM_USERS = "numUsers";
static final String NUMBER_OF_USERS_DESC = "Number of users in the metrics system";
/**
* Returns a MetricsUserSource if already exists, or creates and registers one for this user
* @param user the user name
* @return a metrics user source
*/
MetricsUserSource getOrCreateMetricsUser(String user);
void deregister(MetricsUserSource toRemove);
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
public interface MetricsUserSource extends Comparable<MetricsUserSource> {
String getUser();
void register();
void deregister();
void updatePut(long t);
void updateDelete(long t);
void updateGet(long t);
void updateIncrement(long t);
void updateAppend(long t);
void updateReplay(long t);
void updateScanTime(long t);
}

View File

@ -31,16 +31,26 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
public static enum FactoryStorage {
INSTANCE;
private Object aggLock = new Object();
private MetricsRegionAggregateSourceImpl aggImpl;
private MetricsRegionAggregateSourceImpl regionAggImpl;
private MetricsUserAggregateSourceImpl userAggImpl;
private MetricsTableAggregateSourceImpl tblAggImpl;
}
private synchronized MetricsRegionAggregateSourceImpl getAggregate() {
private synchronized MetricsRegionAggregateSourceImpl getRegionAggregate() {
synchronized (FactoryStorage.INSTANCE.aggLock) {
if (FactoryStorage.INSTANCE.aggImpl == null) {
FactoryStorage.INSTANCE.aggImpl = new MetricsRegionAggregateSourceImpl();
if (FactoryStorage.INSTANCE.regionAggImpl == null) {
FactoryStorage.INSTANCE.regionAggImpl = new MetricsRegionAggregateSourceImpl();
}
return FactoryStorage.INSTANCE.aggImpl;
return FactoryStorage.INSTANCE.regionAggImpl;
}
}
public synchronized MetricsUserAggregateSourceImpl getUserAggregate() {
synchronized (FactoryStorage.INSTANCE.aggLock) {
if (FactoryStorage.INSTANCE.userAggImpl == null) {
FactoryStorage.INSTANCE.userAggImpl = new MetricsUserAggregateSourceImpl();
}
return FactoryStorage.INSTANCE.userAggImpl;
}
}
@ -61,7 +71,7 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
@Override
public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) {
return new MetricsRegionSourceImpl(wrapper, getAggregate());
return new MetricsRegionSourceImpl(wrapper, getRegionAggregate());
}
@Override
@ -72,4 +82,10 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
public MetricsIOSource createIO(MetricsIOWrapper wrapper) {
return new MetricsIOSourceImpl(wrapper);
}
@Override
public org.apache.hadoop.hbase.regionserver.MetricsUserSource createUser(String shortUserName) {
return new org.apache.hadoop.hbase.regionserver.MetricsUserSourceImpl(shortUserName,
getUserAggregate());
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.Interns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
implements MetricsUserAggregateSource {
private static final Logger LOG = LoggerFactory.getLogger(MetricsUserAggregateSourceImpl.class);
private final ConcurrentHashMap<String, MetricsUserSource> userSources =
new ConcurrentHashMap<String, MetricsUserSource>();
public MetricsUserAggregateSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
}
public MetricsUserAggregateSourceImpl(String metricsName,
String metricsDescription,
String metricsContext,
String metricsJmxContext) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
}
@Override
public MetricsUserSource getOrCreateMetricsUser(String user) {
MetricsUserSource source = userSources.get(user);
if (source != null) {
return source;
}
source = new MetricsUserSourceImpl(user, this);
MetricsUserSource prev = userSources.putIfAbsent(user, source);
if (prev != null) {
return prev;
} else {
// register the new metrics now
register(source);
}
return source;
}
public void register(MetricsUserSource source) {
synchronized (this) {
source.register();
}
}
@Override
public void deregister(MetricsUserSource toRemove) {
try {
synchronized (this) {
MetricsUserSource source = userSources.remove(toRemove.getUser());
if (source != null) {
source.deregister();
}
}
} catch (Exception e) {
// Ignored. If this errors out it means that someone is double
// closing the user source and the user metrics is already nulled out.
LOG.info("Error trying to remove " + toRemove + " from " + getClass().getSimpleName(), e);
}
}
@VisibleForTesting
public ConcurrentHashMap<String, MetricsUserSource> getUserSources() {
return userSources;
}
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder mrb = collector.addRecord(metricsName);
if (userSources != null) {
for (MetricsUserSource userMetricSource : userSources.values()) {
if (userMetricSource instanceof MetricsUserSourceImpl) {
((MetricsUserSourceImpl) userMetricSource).snapshot(mrb, all);
}
}
mrb.addGauge(Interns.info(NUM_USERS, NUMBER_OF_USERS_DESC), userSources.size());
metricsRegistry.snapshot(mrb, all);
}
}
}

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class MetricsUserSourceImpl implements MetricsUserSource {
private static final Logger LOG = LoggerFactory.getLogger(MetricsUserSourceImpl.class);
private final String userNamePrefix;
private final String user;
private final String userGetKey;
private final String userScanTimeKey;
private final String userPutKey;
private final String userDeleteKey;
private final String userIncrementKey;
private final String userAppendKey;
private final String userReplayKey;
private MetricHistogram getHisto;
private MetricHistogram scanTimeHisto;
private MetricHistogram putHisto;
private MetricHistogram deleteHisto;
private MetricHistogram incrementHisto;
private MetricHistogram appendHisto;
private MetricHistogram replayHisto;
private final int hashCode;
private AtomicBoolean closed = new AtomicBoolean(false);
private final MetricsUserAggregateSourceImpl agg;
private final DynamicMetricsRegistry registry;
public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
}
this.user = user;
this.agg = agg;
this.registry = agg.getMetricsRegistry();
this.userNamePrefix = "user_" + user + "_metric_";
hashCode = userNamePrefix.hashCode();
userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY;
userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY;
userPutKey = userNamePrefix + MetricsRegionServerSource.PUT_KEY;
userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY;
userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
agg.register(this);
}
@Override
public void register() {
synchronized (this) {
getHisto = registry.newTimeHistogram(userGetKey);
scanTimeHisto = registry.newTimeHistogram(userScanTimeKey);
putHisto = registry.newTimeHistogram(userPutKey);
deleteHisto = registry.newTimeHistogram(userDeleteKey);
incrementHisto = registry.newTimeHistogram(userIncrementKey);
appendHisto = registry.newTimeHistogram(userAppendKey);
replayHisto = registry.newTimeHistogram(userReplayKey);
}
}
@Override
public void deregister() {
boolean wasClosed = closed.getAndSet(true);
// Has someone else already closed this for us?
if (wasClosed) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing user Metrics for user: " + user);
}
synchronized (this) {
registry.removeMetric(userGetKey);
registry.removeMetric(userScanTimeKey);
registry.removeMetric(userPutKey);
registry.removeMetric(userDeleteKey);
registry.removeMetric(userIncrementKey);
registry.removeMetric(userAppendKey);
registry.removeMetric(userReplayKey);
}
}
@Override
public String getUser() {
return user;
}
@Override
public int compareTo(MetricsUserSource source) {
if (source == null) {
return -1;
}
if (!(source instanceof MetricsUserSourceImpl)) {
return -1;
}
MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source;
return Long.compare(hashCode, impl.hashCode);
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public boolean equals(Object obj) {
return obj == this ||
(obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0);
}
void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
// If there is a close that started be double extra sure
// that we're not getting any locks and not putting data
// into the metrics that should be removed. So early out
// before even getting the lock.
if (closed.get()) {
return;
}
// Grab the read
// This ensures that removes of the metrics
// can't happen while we are putting them back in.
synchronized (this) {
// It's possible that a close happened between checking
// the closed variable and getting the lock.
if (closed.get()) {
return;
}
}
}
@Override
public void updatePut(long t) {
putHisto.add(t);
}
@Override
public void updateDelete(long t) {
deleteHisto.add(t);
}
@Override
public void updateGet(long t) {
getHisto.add(t);
}
@Override
public void updateIncrement(long t) {
incrementHisto.add(t);
}
@Override
public void updateAppend(long t) {
appendHisto.add(t);
}
@Override
public void updateReplay(long t) {
replayHisto.add(t);
}
@Override
public void updateScanTime(long t) {
scanTimeHisto.add(t);
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.testclassification.MetricsTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MetricsTests.class, SmallTests.class})
public class TestMetricsUserSourceImpl {
@Test
public void testCompareToHashCodeEquals() throws Exception {
MetricsRegionServerSourceFactory fact
= CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
MetricsUserSource one = fact.createUser("ONE");
MetricsUserSource oneClone = fact.createUser("ONE");
MetricsUserSource two = fact.createUser("TWO");
assertEquals(0, one.compareTo(oneClone));
assertEquals(one.hashCode(), oneClone.hashCode());
assertNotEquals(one, two);
assertTrue(one.compareTo(two) != 0);
assertTrue(two.compareTo(one) != 0);
assertTrue(two.compareTo(one) != one.compareTo(two));
assertTrue(two.compareTo(two) == 0);
}
@Test (expected = RuntimeException.class)
public void testNoGetRegionServerMetricsSourceImpl() throws Exception {
// This should throw an exception because MetricsUserSourceImpl should only
// be created by a factory.
CompatibilitySingletonFactory.getInstance(MetricsUserSource.class);
}
@Test
public void testGetUser() {
MetricsRegionServerSourceFactory fact
= CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
MetricsUserSource one = fact.createUser("ONE");
assertEquals("ONE", one.getUser());
}
}

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.util.LossyCounting;
public class MetaTableMetrics extends BaseRegionObserver {
private MetricRegistry registry;
private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
private LossyCounting<String> clientMetricsLossyCounting, regionMetricsLossyCounting;
private boolean active = false;
private Set<String> metrics = new HashSet<String>();
@ -251,14 +251,15 @@ public class MetaTableMetrics extends BaseRegionObserver {
.equals(TableName.META_TABLE_NAME)) {
RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){
@Override public void sweep(String key) {
registry.remove(key);
metrics.remove(key);
}
LossyCounting.LossyCountingListener<String> listener =
new LossyCounting.LossyCountingListener<String>() {
@Override public void sweep(String key) {
registry.remove(key);
metrics.remove(key);
}
};
clientMetricsLossyCounting = new LossyCounting(listener);
regionMetricsLossyCounting = new LossyCounting(listener);
clientMetricsLossyCounting = new LossyCounting<String>("clientMetaMetrics",listener);
regionMetricsLossyCounting = new LossyCounting<String>("regionMetaMetrics",listener);
// only be active mode when this region holds meta table.
active = true;
}

View File

@ -44,9 +44,10 @@ public class MetricsRegionServer {
"hbase.regionserver.enable.table.latencies";
public static final boolean RS_ENABLE_TABLE_METRICS_DEFAULT = true;
private MetricsRegionServerSource serverSource;
private MetricsRegionServerWrapper regionServerWrapper;
private RegionServerTableMetrics tableMetrics;
private final MetricsRegionServerSource serverSource;
private final MetricsRegionServerWrapper regionServerWrapper;
private final RegionServerTableMetrics tableMetrics;
private final MetricsUserAggregate userAggregate;
private MetricRegistry metricRegistry;
private Timer bulkLoadTimer;
@ -56,8 +57,8 @@ public class MetricsRegionServer {
public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Configuration conf) {
this(regionServerWrapper,
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createServer(regionServerWrapper),
createTableMetrics(conf));
.createServer(regionServerWrapper), createTableMetrics(conf),
MetricsUserAggregateFactory.getMetricsUserAggregate(conf));
// Create hbase-metrics module based metrics. The registry should already be registered by the
// MetricsRegionServerSource
@ -70,11 +71,12 @@ public class MetricsRegionServer {
}
MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
MetricsRegionServerSource serverSource,
RegionServerTableMetrics tableMetrics) {
MetricsRegionServerSource serverSource, RegionServerTableMetrics tableMetrics,
MetricsUserAggregate userAggregate) {
this.regionServerWrapper = regionServerWrapper;
this.serverSource = serverSource;
this.tableMetrics = tableMetrics;
this.userAggregate = userAggregate;
}
/**
@ -92,6 +94,11 @@ public class MetricsRegionServer {
return serverSource;
}
@VisibleForTesting
public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() {
return userAggregate;
}
public MetricsRegionServerWrapper getRegionServerWrapper() {
return regionServerWrapper;
}
@ -111,6 +118,7 @@ public class MetricsRegionServer {
tableMetrics.updatePut(tn, t);
}
serverSource.updatePut(t);
userAggregate.updatePut(t);
}
public void updateDelete(TableName tn, long t) {
@ -118,6 +126,7 @@ public class MetricsRegionServer {
tableMetrics.updateDelete(tn, t);
}
serverSource.updateDelete(t);
userAggregate.updateDelete(t);
}
public void updateDeleteBatch(TableName tn, long t) {
@ -146,6 +155,7 @@ public class MetricsRegionServer {
serverSource.incrSlowGet();
}
serverSource.updateGet(t);
userAggregate.updateGet(t);
}
public void updateIncrement(TableName tn, long t) {
@ -156,6 +166,7 @@ public class MetricsRegionServer {
serverSource.incrSlowIncrement();
}
serverSource.updateIncrement(t);
userAggregate.updateIncrement(t);
}
public void updateAppend(TableName tn, long t) {
@ -166,10 +177,12 @@ public class MetricsRegionServer {
serverSource.incrSlowAppend();
}
serverSource.updateAppend(t);
userAggregate.updateAppend(t);
}
public void updateReplay(long t){
serverSource.updateReplay(t);
userAggregate.updateReplay(t);
}
public void updateScanSize(TableName tn, long scanSize){
@ -184,6 +197,7 @@ public class MetricsRegionServer {
tableMetrics.updateScanTime(tn, t);
}
serverSource.updateScanTime(t);
userAggregate.updateScanTime(t);
}
public void updateSplitTime(long t) {

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface MetricsUserAggregate {
void updatePut(long t);
void updateDelete(long t);
void updateGet(long t);
void updateIncrement(long t);
void updateAppend(long t);
void updateReplay(long t);
void updateScanTime(long t);
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public class MetricsUserAggregateFactory {
private MetricsUserAggregateFactory() {
}
public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled";
public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true;
public static MetricsUserAggregate getMetricsUserAggregate(Configuration conf) {
if (conf.getBoolean(METRIC_USER_ENABLED_CONF, DEFAULT_METRIC_USER_ENABLED_CONF)) {
return new MetricsUserAggregateImpl(conf);
} else {
//NoOpMetricUserAggregate
return new MetricsUserAggregate() {
@Override public void updatePut(long t) {
}
@Override public void updateDelete(long t) {
}
@Override public void updateGet(long t) {
}
@Override public void updateIncrement(long t) {
}
@Override public void updateAppend(long t) {
}
@Override public void updateReplay(long t) {
}
@Override public void updateScanTime(long t) {
}
};
}
}
}

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.LossyCounting;
@InterfaceAudience.Private
public class MetricsUserAggregateImpl implements MetricsUserAggregate{
/** Provider for mapping principal names to Users */
private final UserProvider userProvider;
private final MetricsUserAggregateSource source;
private final LossyCounting<MetricsUserSource> userMetricLossyCounting;
public MetricsUserAggregateImpl(Configuration conf) {
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.getUserAggregate();
userMetricLossyCounting = new LossyCounting<MetricsUserSource>("userMetrics",
new LossyCounting.LossyCountingListener<MetricsUserSource>() {
@Override
public void sweep(MetricsUserSource key) {
source.deregister(key);
}
});
this.userProvider = UserProvider.instantiate(conf);
}
/**
* Returns the active user to which authorization checks should be applied.
* If we are in the context of an RPC call, the remote user is used,
* otherwise the currently logged in user is used.
*/
private String getActiveUser() {
User user = RpcServer.getRequestUser();
if (user == null) {
try {
user = userProvider.getCurrent();
} catch (IOException e) { }
}
return user != null ? user.getShortName() : null;
}
@VisibleForTesting
MetricsUserAggregateSource getSource() {
return source;
}
@Override
public void updatePut(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updatePut(t);
}
}
@Override
public void updateDelete(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateDelete(t);
}
}
@Override
public void updateGet(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateGet(t);
}
}
@Override
public void updateIncrement(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateIncrement(t);
}
}
@Override
public void updateAppend(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateAppend(t);
}
}
@Override
public void updateReplay(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateReplay(t);
}
}
@Override
public void updateScanTime(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateScanTime(t);
}
}
private MetricsUserSource getOrCreateMetricsUser(String user) {
MetricsUserSource userSource = source.getOrCreateMetricsUser(user);
userMetricLossyCounting.add(userSource);
return userSource;
}
}

View File

@ -19,13 +19,20 @@
package org.apache.hadoop.hbase.util;
import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* LossyCounting utility, bounded data structure that maintains approximate high frequency
@ -39,35 +46,41 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public class LossyCounting {
public class LossyCounting<T> {
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
private final ExecutorService executor;
private long bucketSize;
private int currentTerm;
private Map<String, Integer> data;
private Map<T, Integer> data;
private long totalDataCount;
private final String name;
private LossyCountingListener listener;
private static AtomicReference<Future> fut = new AtomicReference<>(null);
public interface LossyCountingListener {
void sweep(String key);
public interface LossyCountingListener<T> {
void sweep(T key);
}
public LossyCounting(double errorRate, LossyCountingListener listener) {
public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
if (errorRate < 0.0 || errorRate > 1.0) {
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
}
this.name = name;
this.bucketSize = (long) Math.ceil(1 / errorRate);
this.currentTerm = 1;
this.totalDataCount = 0;
this.data = new ConcurrentHashMap<>();
this.listener = listener;
calculateCurrentTerm();
executor = Executors.newSingleThreadExecutor();
}
public LossyCounting(LossyCountingListener listener) {
public LossyCounting(String name, LossyCountingListener listener) {
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
listener);
name, listener);
}
private void addByOne(String key) {
private void addByOne(T key) {
//If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
//we create a new entry starting with currentTerm so that it will not be pruned immediately
Integer i = data.get(key);
@ -80,22 +93,28 @@ public class LossyCounting {
calculateCurrentTerm();
}
public void add(String key) {
public void add(T key) {
addByOne(key);
if(totalDataCount % bucketSize == 0) {
//sweep the entries at bucket boundaries
sweep();
//run Sweep
Future future = fut.get();
if (future != null && !future.isDone()){
return;
}
future = executor.submit(new SweepRunnable());
fut.set(future);
}
}
/**
* sweep low frequency data
* @return Names of elements got swept
*/
private void sweep() {
for(Map.Entry<String, Integer> entry : data.entrySet()) {
@VisibleForTesting
public void sweep() {
for(Map.Entry<T, Integer> entry : data.entrySet()) {
if(entry.getValue() < currentTerm) {
String metric = entry.getKey();
T metric = entry.getKey();
data.remove(metric);
if (listener != null) {
listener.sweep(metric);
@ -119,16 +138,33 @@ public class LossyCounting {
return data.size();
}
public boolean contains(String key) {
public boolean contains(T key) {
return data.containsKey(key);
}
public Set<String> getElements(){
public Set<T> getElements(){
return data.keySet();
}
public long getCurrentTerm() {
return currentTerm;
}
class SweepRunnable implements Runnable {
@Override public void run() {
if (LOG.isTraceEnabled()) {
LOG.trace("Starting sweep of lossyCounting-" + name);
}
try {
sweep();
} catch (Exception exception) {
LOG.debug("Error while sweeping lossyCounting-" + name, exception);
}
}
}
@VisibleForTesting public Future getSweepFuture() {
return fut.get();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.TableName;
@ -32,8 +33,8 @@ import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
public class TestMetricsTableLatencies {
public static MetricsAssertHelper HELPER =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
private static MetricsAssertHelper HELPER =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
@Test
public void testTableWrapperAggregateMetrics() throws IOException {

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.security.PrivilegedAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
public class TestMetricsUserAggregate {
public static MetricsAssertHelper HELPER =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
private MetricsRegionServerWrapperStub wrapper;
private MetricsRegionServer rsm;
private MetricsUserAggregateImpl userAgg;
private TableName tableName = TableName.valueOf("testUserAggregateMetrics");
@BeforeClass
public static void classSetUp() {
HELPER.init();
}
@Before
public void setUp() {
wrapper = new MetricsRegionServerWrapperStub();
Configuration conf = HBaseConfiguration.create();
rsm = new MetricsRegionServer(wrapper, conf);
userAgg = (MetricsUserAggregateImpl)rsm.getMetricsUserAggregate();
}
private void doOperations() {
for (int i=0; i < 10; i ++) {
rsm.updateGet(tableName,10);
}
for (int i=0; i < 11; i ++) {
rsm.updateScanTime(tableName,11);
}
for (int i=0; i < 12; i ++) {
rsm.updatePut(tableName,12);
}
for (int i=0; i < 13; i ++) {
rsm.updateDelete(tableName,13);
}
for (int i=0; i < 14; i ++) {
rsm.updateIncrement(tableName,14);
}
for (int i=0; i < 15; i ++) {
rsm.updateAppend(tableName,15);
}
for (int i=0; i < 16; i ++) {
rsm.updateReplay(16);
}
}
@Test
public void testPerUserOperations() {
Configuration conf = HBaseConfiguration.create();
User userFoo = User.createUserForTesting(conf, "FOO", new String[0]);
User userBar = User.createUserForTesting(conf, "BAR", new String[0]);
userFoo.getUGI().doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
doOperations();
return null;
}
});
userBar.getUGI().doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
doOperations();
return null;
}
});
HELPER.assertCounter("userfoometricgetnumops", 10, userAgg.getSource());
HELPER.assertCounter("userfoometricscantimenumops", 11, userAgg.getSource());
HELPER.assertCounter("userfoometricputnumops", 12, userAgg.getSource());
HELPER.assertCounter("userfoometricdeletenumops", 13, userAgg.getSource());
HELPER.assertCounter("userfoometricincrementnumops", 14, userAgg.getSource());
HELPER.assertCounter("userfoometricappendnumops", 15, userAgg.getSource());
HELPER.assertCounter("userfoometricreplaynumops", 16, userAgg.getSource());
HELPER.assertCounter("userbarmetricgetnumops", 10, userAgg.getSource());
HELPER.assertCounter("userbarmetricscantimenumops", 11, userAgg.getSource());
HELPER.assertCounter("userbarmetricputnumops", 12, userAgg.getSource());
HELPER.assertCounter("userbarmetricdeletenumops", 13, userAgg.getSource());
HELPER.assertCounter("userbarmetricincrementnumops", 14, userAgg.getSource());
HELPER.assertCounter("userbarmetricappendnumops", 15, userAgg.getSource());
HELPER.assertCounter("userbarmetricreplaynumops", 16, userAgg.getSource());
}
@Test public void testLossyCountingOfUserMetrics() {
Configuration conf = HBaseConfiguration.create();
int noOfUsers = 10000;
for (int i = 1; i <= noOfUsers; i++) {
User.createUserForTesting(conf, "FOO" + i, new String[0]).getUGI()
.doAs(new PrivilegedAction<Void>() {
@Override public Void run() {
rsm.updateGet(tableName, 10);
return null;
}
});
}
assertTrue(
((MetricsUserAggregateSourceImpl) userAgg.getSource()).getUserSources().size() <= (noOfUsers
/ 10));
for (int i = 1; i <= noOfUsers / 10; i++) {
assertFalse(
HELPER.checkCounterExists("userfoo" + i + "metricgetnumops", userAgg.getSource()));
}
HELPER.assertCounter("userfoo" + noOfUsers + "metricgetnumops", 1, userAgg.getSource());
}
}

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -32,15 +31,15 @@ public class TestLossyCounting {
@Test
public void testBucketSize() {
LossyCounting lossyCounting = new LossyCounting(0.01, null);
LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null);
assertEquals(100L, lossyCounting.getBucketSize());
LossyCounting lossyCounting2 = new LossyCounting(null);
LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null);
assertEquals(50L, lossyCounting2.getBucketSize());
}
@Test
public void testAddByOne() {
LossyCounting lossyCounting = new LossyCounting(0.01, null);
LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null);
for(int i = 0; i < 100; i++){
String key = "" + i;
lossyCounting.add(key);
@ -54,30 +53,46 @@ public class TestLossyCounting {
@Test
public void testSweep1() {
LossyCounting lossyCounting = new LossyCounting(0.01, null);
LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null);
for(int i = 0; i < 400; i++){
String key = "" + i;
lossyCounting.add(key);
}
assertEquals(4L, lossyCounting.getCurrentTerm());
//if total rows added are proportional to bucket size
waitForSweep(lossyCounting);
//Do last one sweep as some sweep will be skipped when first one was running
lossyCounting.sweep();
assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
}
private void waitForSweep(LossyCounting<Object> lossyCounting) {
//wait for sweep thread to complete
int retry = 0;
while (!lossyCounting.getSweepFuture().isDone() && retry < 10) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
retry++;
}
}
@Test
public void testSweep2() {
LossyCounting lossyCounting = new LossyCounting(0.1, null);
LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null);
for(int i = 0; i < 10; i++){
String key = "" + i;
lossyCounting.add(key);
}
waitForSweep(lossyCounting);
assertEquals(10L, lossyCounting.getDataSize());
for(int i = 0; i < 10; i++){
String key = "1";
lossyCounting.add(key);
}
waitForSweep(lossyCounting);
assertEquals(1L, lossyCounting.getDataSize());
}
}